| 
 | 1 | +import contextlib  | 
1 | 2 | import logging  | 
2 | 3 | import time  | 
 | 4 | +from collections.abc import AsyncIterator  | 
3 | 5 | from datetime import timedelta  | 
4 | 6 | 
 
  | 
5 | 7 | from models_library.healthchecks import IsNonResponsive, IsResponsive, LivenessResult  | 
 | 
8 | 10 | from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine  | 
9 | 11 | from tenacity import retry  | 
10 | 12 | 
 
  | 
 | 13 | +from .logging_utils import log_context  | 
11 | 14 | from .retry_policies import PostgresRetryPolicyUponInitialization  | 
12 | 15 | 
 
  | 
13 | 16 | _logger = logging.getLogger(__name__)  | 
@@ -64,3 +67,34 @@ async def check_postgres_liveness(engine: AsyncEngine) -> LivenessResult:  | 
64 | 67 |         return IsResponsive(elapsed=timedelta(seconds=elapsed_time))  | 
65 | 68 |     except SQLAlchemyError as err:  | 
66 | 69 |         return IsNonResponsive(reason=f"{err}")  | 
 | 70 | + | 
 | 71 | + | 
 | 72 | +@contextlib.asynccontextmanager  | 
 | 73 | +async def with_async_pg_engine(  | 
 | 74 | +    settings: PostgresSettings,  | 
 | 75 | +) -> AsyncIterator[AsyncEngine]:  | 
 | 76 | +    """  | 
 | 77 | +    Creates an asyncpg engine and ensures it is properly closed after use.  | 
 | 78 | +    """  | 
 | 79 | +    try:  | 
 | 80 | +        with log_context(  | 
 | 81 | +            _logger,  | 
 | 82 | +            logging.DEBUG,  | 
 | 83 | +            f"connection to db {settings.dsn_with_async_sqlalchemy}",  | 
 | 84 | +        ):  | 
 | 85 | +            server_settings = None  | 
 | 86 | +            if settings.POSTGRES_CLIENT_NAME:  | 
 | 87 | +                assert isinstance(settings.POSTGRES_CLIENT_NAME, str)  | 
 | 88 | + | 
 | 89 | +            engine = create_async_engine(  | 
 | 90 | +                settings.dsn_with_async_sqlalchemy,  | 
 | 91 | +                pool_size=settings.POSTGRES_MINSIZE,  | 
 | 92 | +                max_overflow=settings.POSTGRES_MAXSIZE - settings.POSTGRES_MINSIZE,  | 
 | 93 | +                connect_args={"server_settings": server_settings},  | 
 | 94 | +                pool_pre_ping=True,  # https://docs.sqlalchemy.org/en/14/core/pooling.html#dealing-with-disconnects  | 
 | 95 | +                future=True,  # this uses sqlalchemy 2.0 API, shall be removed when sqlalchemy 2.0 is released  | 
 | 96 | +            )  | 
 | 97 | +        yield engine  | 
 | 98 | +    finally:  | 
 | 99 | +        with log_context(_logger, logging.DEBUG, f"db disconnect of {engine}"):  | 
 | 100 | +            await engine.dispose()  | 
0 commit comments