"""Shared asyncpg connection pool.""" from __future__ import annotations import logging from typing import Optional import asyncpg logger = logging.getLogger(__name__) _pool: Optional[asyncpg.Pool] = None async def init_pool( host: str, port: int, dbname: str, user: str, password: str, min_size: int = 1, max_size: int = 5, ) -> asyncpg.Pool: """Create the shared connection pool. Call once during app startup.""" global _pool _pool = await asyncpg.create_pool( host=host, port=port, database=dbname, user=user, password=password, min_size=min_size, max_size=max_size, ) logger.info("Database pool initialized (%s:%d/%s)", host, port, dbname) return _pool async def get_pool() -> asyncpg.Pool: """Return the shared pool. Raises if not yet initialized.""" if _pool is None: raise RuntimeError("Database pool not initialized — call init_pool() first") return _pool async def close_pool() -> None: """Close the shared pool. Call during app shutdown.""" global _pool if _pool is not None: await _pool.close() _pool = None logger.info("Database pool closed")