Skip to content

Commit 53d2c1b

Browse files
committed
feat: refactor event channel tests for improved structure and clarity
1 parent 7f50a2f commit 53d2c1b

File tree

1 file changed

+25
-25
lines changed
  • tests/integration/test_adapters/test_psycopg/test_extensions

1 file changed

+25
-25
lines changed

tests/integration/test_adapters/test_psycopg/test_extensions/test_events.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,19 @@ def test_psycopg_sync_event_channel_queue_fallback(tmp_path, postgres_service: P
3636
spec.add_config(config)
3737
channel = spec.event_channel(config)
3838

39-
event_id = channel.publish("notifications", {"action": "queue"})
40-
iterator = channel.iter_events("notifications", poll_interval=0.1)
41-
message = next(iterator)
42-
channel.ack(message.event_id)
43-
44-
with config.provide_session() as driver:
45-
row = driver.select_one(
46-
"SELECT status FROM sqlspec_event_queue WHERE event_id = :event_id", {"event_id": event_id}
47-
)
39+
try:
40+
event_id = channel.publish("notifications", {"action": "queue"})
41+
iterator = channel.iter_events("notifications", poll_interval=0.1)
42+
message = next(iterator)
43+
channel.ack(message.event_id)
4844

49-
assert row["status"] == "acked"
45+
with config.provide_session() as driver:
46+
row = driver.select_one(
47+
"SELECT status FROM sqlspec_event_queue WHERE event_id = :event_id", {"event_id": event_id}
48+
)
5049

51-
if config.connection_instance:
50+
assert row["status"] == "acked"
51+
finally:
5252
config.close_pool()
5353

5454

@@ -72,20 +72,20 @@ async def test_psycopg_async_event_channel_queue_fallback(tmp_path, postgres_ser
7272
spec.add_config(config)
7373
channel = spec.event_channel(config)
7474

75-
event_id = await channel.publish("notifications", {"action": "async_queue"})
76-
iterator = channel.iter_events("notifications", poll_interval=0.1)
7775
try:
78-
message = await asyncio.wait_for(iterator.__anext__(), timeout=5)
76+
event_id = await channel.publish("notifications", {"action": "async_queue"})
77+
iterator = channel.iter_events("notifications", poll_interval=0.1)
78+
try:
79+
message = await asyncio.wait_for(iterator.__anext__(), timeout=5)
80+
finally:
81+
await iterator.aclose()
82+
await channel.ack(message.event_id)
83+
84+
async with config.provide_session() as driver:
85+
row = await driver.select_one(
86+
"SELECT status FROM sqlspec_event_queue WHERE event_id = :event_id", {"event_id": event_id}
87+
)
88+
89+
assert row["status"] == "acked"
7990
finally:
80-
await iterator.aclose()
81-
await channel.ack(message.event_id)
82-
83-
async with config.provide_session() as driver:
84-
row = await driver.select_one(
85-
"SELECT status FROM sqlspec_event_queue WHERE event_id = :event_id", {"event_id": event_id}
86-
)
87-
88-
assert row["status"] == "acked"
89-
90-
if config.connection_instance:
9191
await config.close_pool()

0 commit comments

Comments
 (0)