|
| 1 | +import contextlib |
1 | 2 | import logging |
2 | 3 | import time |
| 4 | +from collections.abc import AsyncIterator |
3 | 5 | from datetime import timedelta |
4 | 6 |
|
5 | 7 | from models_library.healthchecks import IsNonResponsive, IsResponsive, LivenessResult |
|
8 | 10 | from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine |
9 | 11 | from tenacity import retry |
10 | 12 |
|
| 13 | +from .logging_utils import log_context |
11 | 14 | from .retry_policies import PostgresRetryPolicyUponInitialization |
12 | 15 |
|
13 | 16 | _logger = logging.getLogger(__name__) |
@@ -64,3 +67,34 @@ async def check_postgres_liveness(engine: AsyncEngine) -> LivenessResult: |
64 | 67 | return IsResponsive(elapsed=timedelta(seconds=elapsed_time)) |
65 | 68 | except SQLAlchemyError as err: |
66 | 69 | return IsNonResponsive(reason=f"{err}") |
| 70 | + |
| 71 | + |
| 72 | +@contextlib.asynccontextmanager |
| 73 | +async def with_async_pg_engine( |
| 74 | + settings: PostgresSettings, |
| 75 | +) -> AsyncIterator[AsyncEngine]: |
| 76 | + """ |
| 77 | + Creates an asyncpg engine and ensures it is properly closed after use. |
| 78 | + """ |
| 79 | + try: |
| 80 | + with log_context( |
| 81 | + _logger, |
| 82 | + logging.DEBUG, |
| 83 | + f"connection to db {settings.dsn_with_async_sqlalchemy}", |
| 84 | + ): |
| 85 | + server_settings = None |
| 86 | + if settings.POSTGRES_CLIENT_NAME: |
| 87 | + assert isinstance(settings.POSTGRES_CLIENT_NAME, str) |
| 88 | + |
| 89 | + engine = create_async_engine( |
| 90 | + settings.dsn_with_async_sqlalchemy, |
| 91 | + pool_size=settings.POSTGRES_MINSIZE, |
| 92 | + max_overflow=settings.POSTGRES_MAXSIZE - settings.POSTGRES_MINSIZE, |
| 93 | + connect_args={"server_settings": server_settings}, |
| 94 | + pool_pre_ping=True, # https://docs.sqlalchemy.org/en/14/core/pooling.html#dealing-with-disconnects |
| 95 | + future=True, # this uses sqlalchemy 2.0 API, shall be removed when sqlalchemy 2.0 is released |
| 96 | + ) |
| 97 | + yield engine |
| 98 | + finally: |
| 99 | + with log_context(_logger, logging.DEBUG, f"db disconnect of {engine}"): |
| 100 | + await engine.dispose() |
0 commit comments