daily-briefing/server/db.py

53 lines
1.2 KiB
Python
Raw Normal View History

"""Shared asyncpg connection pool."""
from __future__ import annotations
import logging
from typing import Optional
import asyncpg
logger = logging.getLogger(__name__)
_pool: Optional[asyncpg.Pool] = None
async def init_pool(
host: str,
port: int,
dbname: str,
user: str,
password: str,
min_size: int = 1,
max_size: int = 5,
) -> asyncpg.Pool:
"""Create the shared connection pool. Call once during app startup."""
global _pool
_pool = await asyncpg.create_pool(
host=host,
port=port,
database=dbname,
user=user,
password=password,
min_size=min_size,
max_size=max_size,
)
logger.info("Database pool initialized (%s:%d/%s)", host, port, dbname)
return _pool
async def get_pool() -> asyncpg.Pool:
"""Return the shared pool. Raises if not yet initialized."""
if _pool is None:
raise RuntimeError("Database pool not initialized — call init_pool() first")
return _pool
async def close_pool() -> None:
"""Close the shared pool. Call during app shutdown."""
global _pool
if _pool is not None:
await _pool.close()
_pool = None
logger.info("Database pool closed")