Skip to content

Commit cb52532

Browse files
committed
feat: simplify event channel interfaces
- Updated event channel methods to use unified naming conventions for publish, ack, and iteration methods across async and sync configurations. - Replaced `*_async` methods with their synchronous counterparts in tests and implementations. - Adjusted test cases to reflect the new method names and ensure compatibility with the updated event channel structure. - Enhanced type checking in tests by disabling specific pyright warnings where necessary. - Removed deprecated portal bridge tests and ensured proper error handling for async/sync configuration mismatches.
1 parent 9b4aaaa commit cb52532

File tree

26 files changed

+693
-903
lines changed

26 files changed

+693
-903
lines changed

docs/guides/events/database-event-channels.md

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# Database Event Channels
22

3-
SQLSpec now ships a portable event channel API that wraps native LISTEN/NOTIFY
3+
SQLSpec ships a portable event channel API that wraps native LISTEN/NOTIFY
44
(or tapers down to a durable queue table when a driver lacks native support).
55
This guide documents the queue-backed fallback delivered in
6-
`sqlspec.extensions.events.EventChannel`, the native PostgreSQL backend, and how to enable
6+
`sqlspec.extensions.events`, the native PostgreSQL backend, and how to enable
77
each option across adapters.
88

99
## Quick start
@@ -51,14 +51,14 @@ config = AsyncpgConfig(connection_config={"dsn": "postgresql://localhost/db"})
5151
spec.add_config(config)
5252
channel = spec.event_channel(config)
5353

54-
event_id = await channel.publish_async("notifications", {"type": "native"})
55-
async for message in channel.iter_events_async("notifications"):
54+
event_id = await channel.publish("notifications", {"type": "native"})
55+
async for message in channel.iter_events("notifications"):
5656
assert message.event_id == event_id
5757
break
5858
```
5959

6060
Native events are fire-and-forget: there is no durable queue row, so
61-
`ack_async()` becomes a no-op used solely for API parity. If you need durable
61+
`ack()` becomes a no-op used solely for API parity. If you need durable
6262
storage (e.g., for retries or multi-consumer fan-out) keep the queue backend
6363
enabled as described below.
6464

@@ -117,14 +117,26 @@ When `in_memory=True` and the adapter dialect is Oracle, the migration adds the
117117
`INMEMORY PRIORITY HIGH` clause so queue rows live in the column store. Other
118118
adapters ignore this flag.
119119

120+
## Sync vs Async Channels
121+
122+
SQLSpec provides separate channel classes for sync and async configurations:
123+
124+
- **`SyncEventChannel`** - For sync database configs (SqliteConfig, DuckdbConfig, PsycopgSyncConfig, etc.)
125+
- **`AsyncEventChannel`** - For async database configs (AsyncpgConfig, AiosqliteConfig, PsycopgAsyncConfig, etc.)
126+
127+
The `SQLSpec.event_channel()` factory method automatically returns the correct type
128+
based on your configuration's `is_async` attribute.
129+
120130
## Publishing events
121131

122-
Both sync and async adapters share the same API surface. Call the method that
123-
matches your driver type:
132+
Both sync and async channels share the same API surface with clean method names:
124133

125134
```python
135+
# Sync channel (SqliteConfig, DuckdbConfig, PsycopgSyncConfig, etc.)
126136
channel.publish("notifications", {"type": "user_update", "user_id": 42})
127-
await channel.publish_async("notifications", {"type": "refresh", "user_id": 42})
137+
138+
# Async channel (AsyncpgConfig, AiosqliteConfig, PsycopgAsyncConfig, etc.)
139+
await channel.publish("notifications", {"type": "refresh", "user_id": 42})
128140
```
129141

130142
Payloads must be JSON-serialisable. Optional metadata maps can be stored via the
@@ -145,23 +157,23 @@ async def handle(message: EventMessage) -> None:
145157
# do work, then ack when auto_ack=False
146158
print(message.channel, message.payload)
147159

148-
listener = channel.listen_async(
160+
listener = channel.listen(
149161
"notifications",
150162
handle,
151163
poll_interval=0.5,
152164
auto_ack=True,
153165
)
154166

155167
# later
156-
await channel.stop_listener_async(listener.id)
168+
await channel.stop_listener(listener.id)
157169
```
158170

159171
For manual iteration instead of background tasks:
160172

161173
```python
162-
async for message in channel.iter_events_async("notifications", poll_interval=1):
174+
async for message in channel.iter_events("notifications", poll_interval=1):
163175
await process(message)
164-
await channel.ack_async(message.event_id)
176+
await channel.ack(message.event_id)
165177
```
166178

167179
### Sync listeners
@@ -184,16 +196,6 @@ channel.stop_listener(listener.id)
184196
Manual iteration is also available via `channel.iter_events(...)` which yields
185197
`EventMessage` objects until you break the loop.
186198

187-
#### Using sync APIs with async adapters
188-
189-
When you call `SQLSpec.event_channel()` with an async adapter (AsyncPG,
190-
AioSQLite, etc.) the extension automatically enables a *portal bridge* so the
191-
sync APIs (`publish`, `iter_events`, `listen`, `ack`) remain usable. Under the
192-
hood SQLSpec runs the async backend inside a background event loop via
193-
`sqlspec.utils.portal`. Disable this by setting
194-
`extension_config["events"]["portal_bridge"] = False` if you prefer to guard
195-
against accidental sync usage.
196-
197199
## Configuration reference
198200

199201
| Option | Default | Description |
@@ -244,21 +246,32 @@ The durable `table_queue` backend supports returning a claimed message to the
244246
queue for redelivery:
245247

246248
```python
247-
message = await channel.dequeue_async("notifications", poll_interval=1.0)
248-
if message is not None:
249-
await channel.nack_async(message.event_id)
249+
# Async channel
250+
async for message in channel.iter_events("notifications", poll_interval=1.0):
251+
await channel.nack(message.event_id)
252+
break
253+
254+
# Sync channel
255+
for message in channel.iter_events("notifications", poll_interval=1.0):
256+
channel.nack(message.event_id)
257+
break
250258
```
251259

252260
Native `listen_notify` backends are fire-and-forget and do not support nacking
253261
because they do not create a durable queue row.
254262

255263
When using native PostgreSQL backends (or hybrid backends that keep a dedicated
256-
listener connection), call `shutdown_async()` before closing the pool to release
264+
listener connection), call `shutdown()` before closing the pool to release
257265
listener resources cleanly:
258266

259267
```python
260-
await channel.shutdown_async()
268+
# Async channel
269+
await channel.shutdown()
261270
await config.close_pool()
271+
272+
# Sync channel
273+
channel.shutdown()
274+
config.close_pool()
262275
```
263276

264277
## Architecture

sqlspec/__init__.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@
3939
from sqlspec.core import filters as filters
4040
from sqlspec.driver import AsyncDriverAdapterBase, ExecutionResult, SyncDriverAdapterBase
4141
from sqlspec.exceptions import StackExecutionError
42-
from sqlspec.extensions.events import AsyncEventListener, EventChannel, EventMessage, SyncEventListener
42+
from sqlspec.extensions.events import (
43+
AsyncEventChannel,
44+
AsyncEventListener,
45+
EventMessage,
46+
SyncEventChannel,
47+
SyncEventListener,
48+
)
4349
from sqlspec.loader import SQLFile, SQLFileLoader
4450
from sqlspec.typing import ConnectionT, PoolT, SchemaT, StatementParameters, SupportedSchemaModel
4551
from sqlspec.utils.logging import suppress_erroneous_sqlglot_log_messages
@@ -51,6 +57,7 @@
5157
"ArrowResult",
5258
"AsyncDatabaseConfig",
5359
"AsyncDriverAdapterBase",
60+
"AsyncEventChannel",
5461
"AsyncEventListener",
5562
"CacheConfig",
5663
"CacheStats",
@@ -60,7 +67,6 @@
6067
"CreateTable",
6168
"Delete",
6269
"DropTable",
63-
"EventChannel",
6470
"EventMessage",
6571
"ExecutionResult",
6672
"FunctionColumn",
@@ -90,6 +96,7 @@
9096
"SupportedSchemaModel",
9197
"SyncDatabaseConfig",
9298
"SyncDriverAdapterBase",
99+
"SyncEventChannel",
93100
"SyncEventListener",
94101
"Update",
95102
"__version__",

sqlspec/adapters/asyncpg/events/backend.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
import asyncio
55
import contextlib
6-
import uuid
76
from datetime import datetime, timezone
87
from typing import TYPE_CHECKING, Any
98

@@ -13,6 +12,7 @@
1312
from sqlspec.extensions.events._queue import QueueEventBackend, TableEventQueue, build_queue_backend
1413
from sqlspec.utils.logging import get_logger
1514
from sqlspec.utils.serializers import from_json, to_json
15+
from sqlspec.utils.uuids import uuid4
1616

1717
if TYPE_CHECKING:
1818
from sqlspec.adapters.asyncpg.config import AsyncpgConfig
@@ -46,7 +46,7 @@ def __init__(self, config: "AsyncpgConfig", queue: "QueueEventBackend") -> None:
4646
async def publish_async(
4747
self, channel: str, payload: "dict[str, Any]", metadata: "dict[str, Any] | None" = None
4848
) -> str:
49-
event_id = uuid.uuid4().hex
49+
event_id = uuid4().hex
5050
await self._publish_durable(channel, event_id, payload, metadata)
5151
self._runtime.increment_metric("events.publish.native")
5252
return event_id
@@ -183,7 +183,7 @@ def __init__(self, config: "AsyncpgConfig") -> None:
183183
async def publish_async(
184184
self, channel: str, payload: "dict[str, Any]", metadata: "dict[str, Any] | None" = None
185185
) -> str:
186-
event_id = uuid.uuid4().hex
186+
event_id = uuid4().hex
187187
envelope = self._encode_payload(event_id, payload, metadata)
188188
async with self._config.provide_session() as driver:
189189
await driver.execute(SQL("SELECT pg_notify($1, $2)", channel, envelope))
@@ -314,7 +314,7 @@ def _decode_payload(self, channel: str, payload: str) -> "EventMessage":
314314
data = from_json(payload)
315315
if not isinstance(data, dict):
316316
data = {"payload": data}
317-
event_id = data.get("event_id", uuid.uuid4().hex)
317+
event_id = data.get("event_id", uuid4().hex)
318318
payload_obj = data.get("payload")
319319
if not isinstance(payload_obj, dict):
320320
payload_obj = {"value": payload_obj}

sqlspec/adapters/oracledb/events/backend.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
"""Oracle Advanced Queuing backend for EventChannel."""
22

33
import contextlib
4-
import uuid
54
from datetime import datetime, timezone
65
from typing import TYPE_CHECKING, Any
76

87
from sqlspec.exceptions import EventChannelError, ImproperConfigurationError, MissingDependencyError
98
from sqlspec.extensions.events._models import EventMessage
109
from sqlspec.utils.logging import get_logger
10+
from sqlspec.utils.uuids import uuid4
1111

1212
if TYPE_CHECKING:
1313
from sqlspec.config import DatabaseConfigProtocol
@@ -49,7 +49,7 @@ def __init__(self, config: "DatabaseConfigProtocol[Any, Any, Any]", settings: di
4949
self._wait_seconds: int = int(settings.get("aq_wait_seconds", 5))
5050

5151
def publish_sync(self, channel: str, payload: dict[str, Any], metadata: dict[str, Any] | None = None) -> str:
52-
event_id = uuid.uuid4().hex
52+
event_id = uuid4().hex
5353
envelope = self._build_envelope(channel, event_id, payload, metadata)
5454
session_cm = self._config.provide_session()
5555
with session_cm as driver: # type: ignore[union-attr]
@@ -97,7 +97,7 @@ def dequeue_sync(self, channel: str, poll_interval: float) -> EventMessage | Non
9797
payload = {"payload": payload}
9898
payload_channel = payload.get("channel")
9999
message_channel = payload_channel if isinstance(payload_channel, str) else channel
100-
event_id = payload.get("event_id", uuid.uuid4().hex)
100+
event_id = payload.get("event_id", uuid4().hex)
101101
body = payload.get("payload")
102102
if not isinstance(body, dict):
103103
body = {"value": body}

sqlspec/adapters/psqlpy/events/backend.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import asyncio
44
import contextlib
5-
import uuid
65
from datetime import datetime, timezone
76
from typing import TYPE_CHECKING, Any
87

@@ -12,6 +11,7 @@
1211
from sqlspec.extensions.events._queue import QueueEventBackend, build_queue_backend
1312
from sqlspec.utils.logging import get_logger
1413
from sqlspec.utils.serializers import from_json, to_json
14+
from sqlspec.utils.uuids import uuid4
1515

1616
if TYPE_CHECKING:
1717
from psqlpy import Listener
@@ -52,7 +52,7 @@ def __init__(self, config: "PsqlpyConfig") -> None:
5252
async def publish_async(
5353
self, channel: str, payload: "dict[str, Any]", metadata: "dict[str, Any] | None" = None
5454
) -> str:
55-
event_id = uuid.uuid4().hex
55+
event_id = uuid4().hex
5656
envelope = self._encode_payload(event_id, payload, metadata)
5757
if len(envelope.encode("utf-8")) > MAX_NOTIFY_BYTES:
5858
msg = "PostgreSQL NOTIFY payload exceeds 8 KB limit"
@@ -144,7 +144,7 @@ def _decode_payload(channel: str, payload: str) -> EventMessage:
144144
data = from_json(payload)
145145
if not isinstance(data, dict):
146146
data = {"payload": data}
147-
event_id = data.get("event_id", uuid.uuid4().hex)
147+
event_id = data.get("event_id", uuid4().hex)
148148
payload_obj = data.get("payload")
149149
if not isinstance(payload_obj, dict):
150150
payload_obj = {"value": payload_obj}
@@ -208,7 +208,7 @@ def __init__(self, config: "PsqlpyConfig", queue_backend: "QueueEventBackend") -
208208
async def publish_async(
209209
self, channel: str, payload: "dict[str, Any]", metadata: "dict[str, Any] | None" = None
210210
) -> str:
211-
event_id = uuid.uuid4().hex
211+
event_id = uuid4().hex
212212
await self._publish_durable_async(channel, event_id, payload, metadata)
213213
self._runtime.increment_metric("events.publish.native")
214214
return event_id

sqlspec/adapters/psycopg/events/backend.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
"""Psycopg LISTEN/NOTIFY and hybrid event backends."""
33

44
import contextlib
5-
import uuid
65
from datetime import datetime, timezone
76
from typing import TYPE_CHECKING, Any
87

@@ -13,6 +12,7 @@
1312
from sqlspec.extensions.events._store import normalize_event_channel_name
1413
from sqlspec.utils.logging import get_logger
1514
from sqlspec.utils.serializers import from_json, to_json
15+
from sqlspec.utils.uuids import uuid4
1616

1717
if TYPE_CHECKING:
1818
from sqlspec.adapters.psycopg.config import PsycopgAsyncConfig, PsycopgSyncConfig
@@ -54,7 +54,7 @@ def __init__(self, config: "PsycopgAsyncConfig | PsycopgSyncConfig") -> None:
5454
async def publish_async(
5555
self, channel: str, payload: "dict[str, Any]", metadata: "dict[str, Any] | None" = None
5656
) -> str:
57-
event_id = uuid.uuid4().hex
57+
event_id = uuid4().hex
5858
envelope = self._encode_payload(event_id, payload, metadata)
5959
session_cm = self._config.provide_session()
6060
async with session_cm as driver: # type: ignore[union-attr]
@@ -64,7 +64,7 @@ async def publish_async(
6464
return event_id
6565

6666
def publish_sync(self, channel: str, payload: "dict[str, Any]", metadata: "dict[str, Any] | None" = None) -> str:
67-
event_id = uuid.uuid4().hex
67+
event_id = uuid4().hex
6868
envelope = self._encode_payload(event_id, payload, metadata)
6969
session_cm = self._config.provide_session()
7070
with session_cm as driver: # type: ignore[union-attr]
@@ -179,7 +179,7 @@ def _decode_payload(channel: str, payload: str) -> EventMessage:
179179
data = from_json(payload)
180180
if not isinstance(data, dict):
181181
data = {"payload": data}
182-
event_id = data.get("event_id", uuid.uuid4().hex)
182+
event_id = data.get("event_id", uuid4().hex)
183183
payload_obj = data.get("payload")
184184
if not isinstance(payload_obj, dict):
185185
payload_obj = {"value": payload_obj}
@@ -249,13 +249,13 @@ def __init__(self, config: "DatabaseConfigProtocol[Any, Any, Any]", queue_backen
249249
async def publish_async(
250250
self, channel: str, payload: "dict[str, Any]", metadata: "dict[str, Any] | None" = None
251251
) -> str:
252-
event_id = uuid.uuid4().hex
252+
event_id = uuid4().hex
253253
await self._publish_durable_async(channel, event_id, payload, metadata)
254254
self._runtime.increment_metric("events.publish.native")
255255
return event_id
256256

257257
def publish_sync(self, channel: str, payload: "dict[str, Any]", metadata: "dict[str, Any] | None" = None) -> str:
258-
event_id = uuid.uuid4().hex
258+
event_id = uuid4().hex
259259
self._publish_durable_sync(channel, event_id, payload, metadata)
260260
self._runtime.increment_metric("events.publish.native")
261261
return event_id

0 commit comments

Comments
 (0)