Skip to content

Commit 7b14ce7

Browse files
committed
feat(channels): implement SQLSpecChannelsBackend for Litestar integration and add related tests
1 parent a07199f commit 7b14ce7

File tree

13 files changed

+500
-21
lines changed

13 files changed

+500
-21
lines changed

docs/guides/extensions/litestar.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,39 @@ app = Litestar(
150150

151151
Every adapter exposes a store class (e.g., `AsyncpgStore`, `AiosqliteStore`, `DuckdbStore`) in its `litestar` submodule. Each subclass inherits `BaseSQLSpecStore`, enforces consistent schema DDL, and provides utilities such as `delete_expired()`. Run `delete_expired()` periodically or use `litestar sessions delete-expired` from the CLI.
152152

153+
## Channels Backend (Database Broker)
154+
155+
Litestar ships with a Channels plugin for broadcasting event streams to subscribers (for example WebSocket clients). SQLSpec can act as a Channels backend by reusing the `events` extension (table queue or native backends where available).
156+
157+
```python
158+
from litestar import Litestar
159+
from litestar.channels.plugin import ChannelsPlugin
160+
161+
from sqlspec import SQLSpec
162+
from sqlspec.adapters.aiosqlite import AiosqliteConfig
163+
from sqlspec.extensions.events import EventChannel
164+
from sqlspec.extensions.litestar import SQLSpecChannelsBackend, SQLSpecPlugin
165+
166+
sqlspec = SQLSpec()
167+
config = sqlspec.add_config(
168+
AiosqliteConfig(
169+
connection_config={"database": "app.db"},
170+
migration_config={"script_location": "migrations", "include_extensions": ["events"]},
171+
extension_config={"events": {}},
172+
)
173+
)
174+
175+
channels_backend = SQLSpecChannelsBackend(EventChannel(config), channel_prefix="litestar")
176+
channels_plugin = ChannelsPlugin(backend=channels_backend, channels=["notifications"])
177+
178+
app = Litestar(plugins=[SQLSpecPlugin(sqlspec), channels_plugin])
179+
```
180+
181+
Notes:
182+
183+
- Run migrations with `include_extensions=["events"]` so the queue table exists.
184+
- Litestar channels may use arbitrary names; the backend maps them to database-safe event channel identifiers deterministically.
185+
153186
## CLI Integration
154187

155188
The `SQLSpecPlugin` automatically registers the `db` command group with the Litestar CLI. No additional configuration is required.

sqlspec/adapters/asyncmy/events/store.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,32 @@ def _column_types(self) -> tuple[str, str, str]:
5555
"""
5656
return "JSON", "JSON", "DATETIME(6)"
5757

58+
def _build_create_table_sql(self) -> str:
59+
"""Build MySQL-specific CREATE TABLE SQL.
60+
61+
MySQL requires CURRENT_TIMESTAMP(6) for DATETIME(6) columns,
62+
not just CURRENT_TIMESTAMP which is only valid for TIMESTAMP type.
63+
64+
Returns:
65+
CREATE TABLE SQL statement with MySQL-specific defaults.
66+
"""
67+
payload_type, metadata_type, timestamp_type = self._column_types()
68+
table_clause = self._table_clause()
69+
return (
70+
f"CREATE TABLE {self.table_name} ("
71+
"event_id VARCHAR(64) PRIMARY KEY,"
72+
" channel VARCHAR(128) NOT NULL,"
73+
f" payload_json {payload_type} NOT NULL,"
74+
f" metadata_json {metadata_type},"
75+
" status VARCHAR(32) NOT NULL DEFAULT 'pending',"
76+
f" available_at {timestamp_type} NOT NULL DEFAULT CURRENT_TIMESTAMP(6),"
77+
f" lease_expires_at {timestamp_type},"
78+
" attempts INTEGER NOT NULL DEFAULT 0,"
79+
f" created_at {timestamp_type} NOT NULL DEFAULT CURRENT_TIMESTAMP(6),"
80+
f" acknowledged_at {timestamp_type}"
81+
f") {table_clause}"
82+
)
83+
5884
def _build_index_sql(self) -> str | None:
5985
"""Build MySQL conditional index creation SQL.
6086

sqlspec/adapters/asyncpg/events/backend.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ async def ack_async(self, event_id: str) -> None:
7777
await self._queue.ack_async(event_id)
7878
self._runtime.increment_metric("events.ack")
7979

80+
async def nack_async(self, event_id: str) -> None:
81+
await self._queue.nack_async(event_id)
82+
self._runtime.increment_metric("events.nack")
83+
8084
def publish_sync(self, *_: Any, **__: Any) -> str:
8185
msg = "publish_sync is not supported for async-only Postgres backend"
8286
raise ImproperConfigurationError(msg)

sqlspec/adapters/oracledb/events/store.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ class _OracleEventQueueStoreMixin:
7272
Slots are defined in the concrete subclasses to allow proper attribute assignment.
7373
"""
7474

75+
__slots__ = ()
76+
7577
if TYPE_CHECKING:
7678
_in_memory: bool
7779
_json_storage: "JSONStorageType | None"
@@ -85,20 +87,21 @@ def _init_oracle_settings(self) -> None:
8587
"""Initialize Oracle-specific settings from extension config.
8688
8789
Must be called from subclass __init__ after super().__init__().
90+
Note: Attributes assigned here are defined in subclass __slots__.
8891
"""
8992
events_config = self._extension_settings
90-
self._in_memory = bool(events_config.get("in_memory", False))
91-
self._oracle_version_info = None
93+
self._in_memory = bool(events_config.get("in_memory", False)) # type: ignore[misc]
94+
self._oracle_version_info = None # type: ignore[misc]
9295

9396
json_storage_override = events_config.get("json_storage")
9497
if json_storage_override == "json":
95-
self._json_storage = JSONStorageType.JSON_NATIVE
98+
self._json_storage = JSONStorageType.JSON_NATIVE # type: ignore[misc]
9699
elif json_storage_override == "blob_json":
97-
self._json_storage = JSONStorageType.BLOB_JSON
100+
self._json_storage = JSONStorageType.BLOB_JSON # type: ignore[misc]
98101
elif json_storage_override == "blob":
99-
self._json_storage = JSONStorageType.BLOB_PLAIN
102+
self._json_storage = JSONStorageType.BLOB_PLAIN # type: ignore[misc]
100103
else:
101-
self._json_storage = None
104+
self._json_storage = None # type: ignore[misc]
102105

103106
def _column_types(self) -> "tuple[str, str, str]":
104107
"""Return Oracle column types based on storage mode."""

sqlspec/adapters/psqlpy/events/backend.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,10 @@ async def ack_async(self, event_id: str) -> None:
246246
await self._queue.ack_async(event_id)
247247
self._runtime.increment_metric("events.ack")
248248

249+
async def nack_async(self, event_id: str) -> None:
250+
await self._queue.nack_async(event_id)
251+
self._runtime.increment_metric("events.nack")
252+
249253
def ack_sync(self, _event_id: str) -> None:
250254
msg = "ack_sync is not supported for async-only Psqlpy backend"
251255
raise ImproperConfigurationError(msg)

sqlspec/adapters/psycopg/events/backend.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,24 @@ def ack_sync(self, event_id: str) -> None:
327327
self._queue.ack_sync(event_id)
328328
self._runtime.increment_metric("events.ack")
329329

330+
async def nack_async(self, event_id: str) -> None:
331+
"""Return an event to the durable queue for redelivery asynchronously.
332+
333+
Args:
334+
event_id: The event identifier to return to the queue.
335+
"""
336+
await self._queue.nack_async(event_id)
337+
self._runtime.increment_metric("events.nack")
338+
339+
def nack_sync(self, event_id: str) -> None:
340+
"""Return an event to the durable queue for redelivery synchronously.
341+
342+
Args:
343+
event_id: The event identifier to return to the queue.
344+
"""
345+
self._queue.nack_sync(event_id)
346+
self._runtime.increment_metric("events.nack")
347+
330348
async def shutdown_async(self) -> None:
331349
"""Shutdown the async listener and release resources."""
332350
if self._listen_connection_async_cm is not None:

sqlspec/extensions/events/_queue.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class TableEventQueue:
5454
"_json_passthrough",
5555
"_lease_seconds",
5656
"_max_claim_attempts",
57+
"_nack_sql",
5758
"_retention_seconds",
5859
"_runtime",
5960
"_select_for_update",
@@ -95,6 +96,7 @@ def __init__(
9596
self._select_sql = self._build_select_sql()
9697
self._claim_sql = self._build_claim_sql()
9798
self._ack_sql = self._build_ack_sql()
99+
self._nack_sql = self._build_nack_sql()
98100
self._acked_cleanup_sql = self._build_cleanup_sql()
99101

100102
@property
@@ -135,6 +137,9 @@ def _build_claim_sql(self) -> str:
135137
def _build_ack_sql(self) -> str:
136138
return f"UPDATE {self._table_name} SET status = :acked, acknowledged_at = :acked_at WHERE event_id = :event_id"
137139

140+
def _build_nack_sql(self) -> str:
141+
return f"UPDATE {self._table_name} SET status = :pending, lease_expires_at = NULL WHERE event_id = :event_id"
142+
138143
def _build_cleanup_sql(self) -> str:
139144
return f"DELETE FROM {self._table_name} WHERE status = :acked AND acknowledged_at IS NOT NULL AND acknowledged_at <= :cutoff"
140145

@@ -260,6 +265,26 @@ def ack_sync(self, event_id: str) -> None:
260265
self._cleanup_sync(now)
261266
self._runtime.increment_metric("events.ack")
262267

268+
async def nack_async(self, event_id: str) -> None:
269+
"""Return an event to the queue for redelivery asynchronously.
270+
271+
Resets the event status to pending and clears the lease, allowing it
272+
to be picked up by another consumer. The attempts counter remains
273+
unchanged (was already incremented during claim).
274+
"""
275+
await self._execute_async(self._nack_sql, {"pending": _PENDING_STATUS, "event_id": event_id})
276+
self._runtime.increment_metric("events.nack")
277+
278+
def nack_sync(self, event_id: str) -> None:
279+
"""Return an event to the queue for redelivery synchronously.
280+
281+
Resets the event status to pending and clears the lease, allowing it
282+
to be picked up by another consumer. The attempts counter remains
283+
unchanged (was already incremented during claim).
284+
"""
285+
self._execute_sync(self._nack_sql, {"pending": _PENDING_STATUS, "event_id": event_id})
286+
self._runtime.increment_metric("events.nack")
287+
263288
async def _cleanup_async(self, reference: "datetime") -> None:
264289
cutoff = reference - timedelta(seconds=self._retention_seconds)
265290
await self._execute_async(self._acked_cleanup_sql, {"acked": _ACKED_STATUS, "cutoff": cutoff})
@@ -415,6 +440,12 @@ async def ack_async(self, event_id: str) -> None:
415440
def ack_sync(self, event_id: str) -> None:
416441
self._queue.ack_sync(event_id)
417442

443+
async def nack_async(self, event_id: str) -> None:
444+
await self._queue.nack_async(event_id)
445+
446+
def nack_sync(self, event_id: str) -> None:
447+
self._queue.nack_sync(event_id)
448+
418449
@staticmethod
419450
def _to_message(event: QueueEvent) -> EventMessage:
420451
return EventMessage(

sqlspec/extensions/events/channel.py

Lines changed: 119 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,17 @@ def listen(
250250
thread.start()
251251
return listener
252252

253+
def listen_sync(
254+
self, channel: str, handler: "SyncEventHandler", *, poll_interval: float | None = None, auto_ack: bool = True
255+
) -> SyncEventListener:
256+
"""Start a sync listener thread.
257+
258+
This is an alias of :meth:`listen`, provided for API symmetry with
259+
:meth:`listen_async`.
260+
"""
261+
262+
return self.listen(channel, handler, poll_interval=poll_interval, auto_ack=auto_ack)
263+
253264
def stop_listener(self, listener_id: str) -> None:
254265
"""Stop a running sync listener."""
255266

@@ -259,6 +270,15 @@ def stop_listener(self, listener_id: str) -> None:
259270
listener.stop()
260271
self._runtime.increment_metric("events.listener.stop")
261272

273+
def stop_listener_sync(self, listener_id: str) -> None:
274+
"""Stop a running sync listener.
275+
276+
This is an alias of :meth:`stop_listener`, provided for API symmetry
277+
with :meth:`stop_listener_async`.
278+
"""
279+
280+
self.stop_listener(listener_id)
281+
262282
def _run_sync_listener(
263283
self,
264284
listener_id: str,
@@ -391,6 +411,51 @@ def ack_sync(self, event_id: str) -> None:
391411
raise
392412
self._end_event_span(span, result="acked")
393413

414+
async def nack_async(self, event_id: str) -> None:
415+
"""Return an event to the queue for redelivery asynchronously.
416+
417+
Resets the event status to pending, clearing the lease and allowing
418+
it to be picked up by another consumer.
419+
"""
420+
if not self._is_async:
421+
msg = "nack_async requires an async configuration"
422+
raise ImproperConfigurationError(msg)
423+
nack_method = getattr(self._backend, "nack_async", None)
424+
if nack_method is None:
425+
msg = "Current events backend does not support nack"
426+
raise ImproperConfigurationError(msg)
427+
span = self._start_event_span("nack", mode="async")
428+
try:
429+
await nack_method(event_id)
430+
except Exception as error:
431+
self._end_event_span(span, error=error)
432+
raise
433+
self._end_event_span(span, result="nacked")
434+
435+
def nack_sync(self, event_id: str) -> None:
436+
"""Return an event to the queue for redelivery synchronously.
437+
438+
Resets the event status to pending, clearing the lease and allowing
439+
it to be picked up by another consumer.
440+
"""
441+
if self._is_async:
442+
if self._should_bridge_sync_calls():
443+
self._bridge_sync_call(self.nack_async, event_id)
444+
return
445+
msg = "nack_sync requires a sync configuration"
446+
raise ImproperConfigurationError(msg)
447+
nack_method = getattr(self._backend, "nack_sync", None)
448+
if nack_method is None:
449+
msg = "Current events backend does not support nack"
450+
raise ImproperConfigurationError(msg)
451+
span = self._start_event_span("nack", mode="sync")
452+
try:
453+
nack_method(event_id)
454+
except Exception as error:
455+
self._end_event_span(span, error=error)
456+
raise
457+
self._end_event_span(span, result="nacked")
458+
394459
# Loading helpers -----------------------------------------------------------
395460

396461
@staticmethod
@@ -520,23 +585,18 @@ def _normalize_channel_name(channel: str) -> str:
520585
raise ImproperConfigurationError(str(error)) from error
521586

522587
def _start_event_span(self, operation: str, channel: "str | None" = None, mode: str = "sync") -> Any:
523-
span_manager = getattr(self._runtime, "span_manager", None)
524-
if span_manager is None or not getattr(span_manager, "is_enabled", False):
588+
if not getattr(self._runtime.span_manager, "is_enabled", False):
525589
return None
526590
attributes: dict[str, Any] = {
527591
"sqlspec.events.operation": operation,
528592
"sqlspec.events.backend": self._backend_name,
529593
"sqlspec.events.mode": mode,
530-
"sqlspec.config": type(self._config).__name__,
531594
}
532595
if self._adapter_name:
533596
attributes["sqlspec.events.adapter"] = self._adapter_name
534-
bind_key = getattr(self._config, "bind_key", None)
535-
if bind_key:
536-
attributes["sqlspec.bind_key"] = bind_key
537597
if channel:
538598
attributes["sqlspec.events.channel"] = channel
539-
return span_manager.start_span(f"sqlspec.events.{operation}", attributes)
599+
return self._runtime.start_span(f"sqlspec.events.{operation}", attributes=attributes)
540600

541601
def _end_event_span(self, span: Any, *, error: "Exception | None" = None, result: "str | None" = None) -> None:
542602
if span is None:
@@ -545,4 +605,55 @@ def _end_event_span(self, span: Any, *, error: "Exception | None" = None, result
545605
setter = getattr(span, "set_attribute", None)
546606
if setter is not None:
547607
setter("sqlspec.events.result", result)
548-
self._runtime.span_manager.end_span(span, error=error)
608+
self._runtime.end_span(span, error=error)
609+
610+
# Shutdown helpers ------------------------------------------------------------
611+
612+
async def shutdown_async(self) -> None:
613+
"""Shutdown the event channel and release backend resources.
614+
615+
Stops all async listeners and calls the backend's shutdown_async method
616+
to release any held connections (e.g., LISTEN/NOTIFY listener connections).
617+
618+
Must be called before closing the database pool when using native backends.
619+
"""
620+
span = self._start_event_span("shutdown", mode="async")
621+
try:
622+
for listener_id in list(self._listeners_async):
623+
await self.stop_listener_async(listener_id)
624+
625+
backend_shutdown = getattr(self._backend, "shutdown_async", None)
626+
if backend_shutdown is not None and callable(backend_shutdown):
627+
result = backend_shutdown()
628+
if result is not None:
629+
await result # type: ignore[misc]
630+
except Exception as error:
631+
self._end_event_span(span, error=error)
632+
raise
633+
self._end_event_span(span, result="shutdown")
634+
self._runtime.increment_metric("events.shutdown")
635+
636+
def shutdown_sync(self) -> None:
637+
"""Shutdown the event channel and release backend resources.
638+
639+
Stops all sync listeners. For async backends with sync bridging,
640+
use shutdown_async instead.
641+
"""
642+
if self._is_async:
643+
if self._should_bridge_sync_calls():
644+
self._bridge_sync_call(self.shutdown_async)
645+
return
646+
msg = "shutdown_sync requires a sync configuration"
647+
raise ImproperConfigurationError(msg)
648+
span = self._start_event_span("shutdown", mode="sync")
649+
try:
650+
for listener_id in list(self._listeners_sync):
651+
self.stop_listener(listener_id)
652+
backend_shutdown = getattr(self._backend, "shutdown_sync", None)
653+
if backend_shutdown is not None and callable(backend_shutdown):
654+
backend_shutdown()
655+
except Exception as error:
656+
self._end_event_span(span, error=error)
657+
raise
658+
self._end_event_span(span, result="shutdown")
659+
self._runtime.increment_metric("events.shutdown")

0 commit comments

Comments
 (0)