Skip to content

Commit 772af1e

Browse files
authored
fix(FIR-51029): Database cache logic (#475)
1 parent ab2e59e commit 772af1e

File tree

9 files changed

+715
-10
lines changed

9 files changed

+715
-10
lines changed

src/firebolt/async_db/cursor.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,9 +310,10 @@ async def use_engine(self, engine: str, cache: bool = True) -> None:
310310
self._update_set_parameters(cache_obj.engines[engine].params)
311311
else:
312312
await self.execute(f'USE ENGINE "{engine}"')
313-
cache_obj.engines[engine] = EngineInfo(
314-
self.engine_url, self.parameters | self._set_parameters
315-
)
313+
params = self.parameters | self._set_parameters
314+
# Ensure 'database' parameter is not cached with engine info
315+
params = {k: v for k, v in params.items() if k != "database"}
316+
cache_obj.engines[engine] = EngineInfo(self.engine_url, params)
316317
self.set_cache_record(cache_obj)
317318
else:
318319
await self.execute(f'USE ENGINE "{engine}"')

src/firebolt/db/cursor.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,9 +314,10 @@ def use_engine(self, engine: str, cache: bool = True) -> None:
314314
self._update_set_parameters(cache_obj.engines[engine].params)
315315
else:
316316
self.execute(f'USE ENGINE "{engine}"')
317-
cache_obj.engines[engine] = EngineInfo(
318-
self.engine_url, self.parameters | self._set_parameters
319-
)
317+
params = self.parameters | self._set_parameters
318+
# Ensure 'database' parameter is not cached with engine info
319+
params = {k: v for k, v in params.items() if k != "database"}
320+
cache_obj.engines[engine] = EngineInfo(self.engine_url, params)
320321
self.set_cache_record(cache_obj)
321322
else:
322323
self.execute(f'USE ENGINE "{engine}"')

tests/integration/dbapi/async/V2/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ async def connection_factory(
3333
async def factory(**kwargs: Any) -> Connection:
3434
if request.param == "core":
3535
base_kwargs = {
36-
"database": "firebolt",
36+
"database": kwargs.pop("database", "firebolt"),
3737
"auth": core_auth,
3838
"url": core_url,
3939
}
4040
else:
4141
base_kwargs = {
4242
"engine_name": engine_name,
43-
"database": database_name,
43+
"database": kwargs.pop("database", database_name),
4444
"auth": auth,
4545
"account_name": account_name,
4646
"api_endpoint": api_endpoint,

tests/integration/dbapi/async/V2/test_queries_async.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,3 +1188,85 @@ async def test_connection_close_no_rollback_with_autocommit_on(
11881188
await check_data_visibility_async(
11891189
table_name, 1, connection_factory, True, [1, "autocommit_close_test"]
11901190
)
1191+
1192+
1193+
async def test_database_switching_with_same_engine_preserves_database_context(
1194+
database_name: str,
1195+
connection_factory: Callable[..., Connection],
1196+
) -> None:
1197+
"""
1198+
Async integration test for database context preservation with caching on Firebolt.
1199+
1200+
This test verifies against a live Firebolt instance:
1201+
1. Connect with database1 + engine1 (cache entry created)
1202+
2. Connect with database2 + engine1 (should add database2 to cache)
1203+
3. Cursors from second connection should have database2, not database1
1204+
"""
1205+
first_db_name = database_name
1206+
second_db_name = f"{database_name}_second_async"
1207+
1208+
# Create a system connection to set up test databases
1209+
async with await connection_factory() as system_connection:
1210+
system_cursor = system_connection.cursor()
1211+
1212+
try:
1213+
# Create the second test database
1214+
await system_cursor.execute(
1215+
f'CREATE DATABASE IF NOT EXISTS "{second_db_name}"'
1216+
)
1217+
1218+
# First connection: database1 + engine1
1219+
async with await connection_factory(database=first_db_name) as connection1:
1220+
cursor1 = connection1.cursor()
1221+
await cursor1.execute("SELECT current_database()")
1222+
result1 = await cursor1.fetchone()
1223+
1224+
# Verify first connection has correct database
1225+
assert (
1226+
result1[0] == first_db_name
1227+
), f"First cursor should have database {first_db_name}"
1228+
assert (
1229+
cursor1.database == first_db_name
1230+
), f"First cursor database property should be {first_db_name}"
1231+
1232+
# Second connection: database2 + engine1 (same engine)
1233+
async with await connection_factory(database=second_db_name) as connection2:
1234+
cursor2 = connection2.cursor()
1235+
await cursor2.execute("SELECT current_database()")
1236+
result2 = await cursor2.fetchone()
1237+
1238+
# Verify second connection has correct database
1239+
assert result2[0] == second_db_name, (
1240+
f"Second cursor should have database {second_db_name}, "
1241+
f"but got {result2[0]}. This indicates the database context was overwritten."
1242+
)
1243+
assert cursor2.database == second_db_name, (
1244+
f"Second cursor database property should be {second_db_name}, "
1245+
f"but has {cursor2.database}. This indicates the database context was overwritten."
1246+
)
1247+
1248+
# Third connection: back to database1 + engine1 (should use cache)
1249+
async with await connection_factory(database=first_db_name) as connection3:
1250+
cursor3 = connection3.cursor()
1251+
await cursor3.execute("SELECT current_database()")
1252+
result3 = await cursor3.fetchone()
1253+
1254+
# Verify third connection has correct database (should be from cache)
1255+
assert result3[0] == first_db_name, (
1256+
f"Third cursor should have database {first_db_name}, "
1257+
f"but got {result3[0]}. This indicates cached database context is incorrect."
1258+
)
1259+
assert cursor3.database == first_db_name, (
1260+
f"Third cursor database property should be {first_db_name}, "
1261+
f"but has {cursor3.database}. This indicates cached database context is incorrect."
1262+
)
1263+
1264+
finally:
1265+
# Clean up: Drop the test database
1266+
try:
1267+
await system_cursor.execute(
1268+
f'DROP DATABASE IF EXISTS "{second_db_name}"'
1269+
)
1270+
except Exception:
1271+
# Ignore cleanup errors to avoid masking the real test failure
1272+
pass

tests/integration/dbapi/sync/V2/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ def connection_factory(
4141
def factory(**kwargs: Any) -> Connection:
4242
if request.param == "core":
4343
base_kwargs = {
44-
"database": "firebolt",
44+
"database": kwargs.pop("database", "firebolt"),
4545
"auth": core_auth,
4646
"url": core_url,
4747
}
4848
else:
4949
base_kwargs = {
5050
"engine_name": engine_name,
51-
"database": database_name,
51+
"database": kwargs.pop("database", database_name),
5252
"auth": auth,
5353
"account_name": account_name,
5454
"api_endpoint": api_endpoint,

tests/integration/dbapi/sync/V2/test_queries.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,3 +1176,81 @@ def test_connection_close_no_rollback_with_autocommit_on(
11761176
check_data_visibility(
11771177
table_name, 1, connection_factory, True, [1, "autocommit_close_test"]
11781178
)
1179+
1180+
1181+
def test_database_switching_with_same_engine_preserves_database_context(
1182+
database_name: str,
1183+
connection_factory: Callable[..., Connection],
1184+
) -> None:
1185+
"""
1186+
Integration test for database context preservation with caching on Firebolt.
1187+
1188+
This test verifies against a live Firebolt instance:
1189+
1. Connect with database1 + engine1 (cache entry created)
1190+
2. Connect with database2 + engine1 (should add database2 to cache)
1191+
3. Cursors from second connection should have database2, not database1
1192+
"""
1193+
first_db_name = database_name
1194+
second_db_name = f"{database_name}_second"
1195+
1196+
# Create a system connection to set up test databases
1197+
with connection_factory() as system_connection:
1198+
system_cursor = system_connection.cursor()
1199+
1200+
try:
1201+
# Create the second test database
1202+
system_cursor.execute(f'CREATE DATABASE IF NOT EXISTS "{second_db_name}"')
1203+
1204+
# First connection: database1 + engine1
1205+
with connection_factory(database=first_db_name) as connection1:
1206+
cursor1 = connection1.cursor()
1207+
cursor1.execute("SELECT current_database()")
1208+
result1 = cursor1.fetchone()
1209+
1210+
# Verify first connection has correct database
1211+
assert (
1212+
result1[0] == first_db_name
1213+
), f"First cursor should have database {first_db_name}"
1214+
assert (
1215+
cursor1.database == first_db_name
1216+
), f"First cursor database property should be {first_db_name}"
1217+
1218+
# Second connection: database2 + engine1 (same engine)
1219+
with connection_factory(database=second_db_name) as connection2:
1220+
cursor2 = connection2.cursor()
1221+
cursor2.execute("SELECT current_database()")
1222+
result2 = cursor2.fetchone()
1223+
1224+
# Verify second connection has correct database
1225+
assert result2[0] == second_db_name, (
1226+
f"Second cursor should have database {second_db_name}, "
1227+
f"but got {result2[0]}. This indicates the database context was overwritten."
1228+
)
1229+
assert cursor2.database == second_db_name, (
1230+
f"Second cursor database property should be {second_db_name}, "
1231+
f"but has {cursor2.database}. This indicates the database context was overwritten."
1232+
)
1233+
1234+
# Third connection: back to database1 + engine1 (should use cache)
1235+
with connection_factory(database=first_db_name) as connection3:
1236+
cursor3 = connection3.cursor()
1237+
cursor3.execute("SELECT current_database()")
1238+
result3 = cursor3.fetchone()
1239+
1240+
# Verify third connection has correct database (should be from cache)
1241+
assert result3[0] == first_db_name, (
1242+
f"Third cursor should have database {first_db_name}, "
1243+
f"but got {result3[0]}. This indicates cached database context is incorrect."
1244+
)
1245+
assert cursor3.database == first_db_name, (
1246+
f"Third cursor database property should be {first_db_name}, "
1247+
f"but has {cursor3.database}. This indicates cached database context is incorrect."
1248+
)
1249+
1250+
finally:
1251+
# Clean up: Drop the test database
1252+
try:
1253+
system_cursor.execute(f'DROP DATABASE IF EXISTS "{second_db_name}"')
1254+
except Exception:
1255+
# Ignore cleanup errors to avoid masking the real test failure
1256+
pass

0 commit comments

Comments
 (0)