66from pytest_databases .docker .postgres import PostgresService
77from sqlalchemy import text
88from sqlalchemy .ext .asyncio import create_async_engine
9+ from sqlalchemy .pool import NullPool
910
1011from app .config import Settings
1112
@@ -28,51 +29,74 @@ async def session_database(
2829 test_db_url = f"{ base_url } /{ db_name } "
2930
3031 # Connect to default postgres db to create our session test database
31- default_engine = create_async_engine (f"{ base_url } /postgres" , echo = False , isolation_level = "AUTOCOMMIT" )
32+ default_engine = create_async_engine (
33+ f"{ base_url } /postgres" , echo = False , isolation_level = "AUTOCOMMIT" , poolclass = NullPool
34+ )
3235 try :
3336 async with default_engine .connect () as conn :
3437 await conn .execute (text (f"CREATE DATABASE { db_name } " ))
3538 finally :
3639 await default_engine .dispose ()
3740
3841 # Create database tables
39- engine = create_async_engine (test_db_url , echo = False )
42+ engine = create_async_engine (test_db_url , echo = False , poolclass = NullPool )
4043 try :
4144 async with engine .begin () as conn :
4245 assert sqlalchemy_config .metadata is not None
4346 await conn .run_sync (sqlalchemy_config .metadata .create_all )
44-
45- # Yield the database info for the session
46- yield {"url" : test_db_url , "db_name" : db_name , "base_url" : base_url }
47-
4847 finally :
4948 await engine .dispose ()
5049
51- # Drop the session test database
52- cleanup_engine = create_async_engine (f"{ base_url } /postgres" , echo = False , isolation_level = "AUTOCOMMIT" )
53- try :
54- async with cleanup_engine .connect () as conn :
55- # Terminate connections to the test database first
56- await conn .execute (
57- text (f"""
58- SELECT pg_terminate_backend(pid)
59- FROM pg_stat_activity
60- WHERE datname = '{ db_name } ' AND pid <> pg_backend_pid()
61- """ )
62- )
63- await conn .execute (text (f"DROP DATABASE IF EXISTS { db_name } " ))
64- finally :
65- await cleanup_engine .dispose ()
50+ # Yield the database info for the session
51+ yield {"url" : test_db_url , "db_name" : db_name , "base_url" : base_url }
52+
53+ # Drop the session test database
54+ cleanup_engine = create_async_engine (
55+ f"{ base_url } /postgres" , echo = False , isolation_level = "AUTOCOMMIT" , poolclass = NullPool
56+ )
57+ try :
58+ async with cleanup_engine .connect () as conn :
59+ # Terminate connections to the test database first
60+ await conn .execute (
61+ text (f"""
62+ SELECT pg_terminate_backend(pid)
63+ FROM pg_stat_activity
64+ WHERE datname = '{ db_name } ' AND pid <> pg_backend_pid()
65+ """ )
66+ )
67+ await conn .execute (text (f"DROP DATABASE IF EXISTS { db_name } " ))
68+ finally :
69+ await cleanup_engine .dispose ()
6670
6771
6872@pytest_asyncio .fixture (scope = "function" )
6973async def clean_database (session_database : dict [str , str ]) -> AsyncIterator [None ]:
70- """Clean database between tests by truncating all tables."""
74+ """
75+ Clean database between tests by truncating all tables.
76+
77+ Uses two engines to avoid connection leaks:
78+ 1. terminator_engine: Kills orphaned connections to test DB (connects to postgres db)
79+ 2. clean_engine: Performs TRUNCATE operations (connects to test db)
80+ """
7181 from app .db import sqlalchemy_config
7282
73- clean_engine = create_async_engine (session_database ["url" ], echo = False )
83+ terminator_engine = create_async_engine (
84+ f"{ session_database ['base_url' ]} /postgres" ,
85+ echo = False ,
86+ isolation_level = "AUTOCOMMIT" ,
87+ poolclass = NullPool ,
88+ )
89+ clean_engine = create_async_engine (session_database ["url" ], echo = False , poolclass = NullPool )
7490
7591 try :
92+ # Terminate stray connections from previous tests to prevent connection pool exhaustion
93+ async with terminator_engine .connect () as conn :
94+ await conn .execute (
95+ text (
96+ f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{ session_database ['db_name' ]} ' AND pid <> pg_backend_pid()"
97+ )
98+ )
99+
76100 # Clean tables before the test
77101 async with clean_engine .begin () as conn :
78102 assert sqlalchemy_config .metadata is not None
@@ -83,6 +107,7 @@ async def clean_database(session_database: dict[str, str]) -> AsyncIterator[None
83107
84108 finally :
85109 await clean_engine .dispose ()
110+ await terminator_engine .dispose ()
86111
87112
88113@pytest_asyncio .fixture (scope = "function" )
@@ -123,6 +148,8 @@ def settings_customise_sources(
123148 app_settings = test_settings ,
124149 title = "Test PM API" ,
125150 enable_structlog = False ,
151+ pool_size = 1 ,
152+ max_overflow = 0 ,
126153 )
127154
128155 async with AsyncTestClient (app = test_app ) as test_client :
0 commit comments