Skip to content

Commit f7da4e8

Browse files
committed
feat: event channel and queue tests to use new store structure
- Updated test imports to reflect changes in the event channel and queue structure. - Replaced instances of `TableEventQueue` with `SyncTableEventQueue` in tests. - Changed error handling in normalization functions to raise `EventChannelError` instead of `ValueError`. - Introduced new tests for hook methods in `BaseEventQueueStore` to ensure DDL generation consistency. - Added tests for dialect-specific overrides in string, integer, and timestamp types for various database adapters. - Ensured all required columns and default values are correctly included in DDL generation tests.
1 parent 0a4cb8e commit f7da4e8

File tree

25 files changed

+1482
-904
lines changed

25 files changed

+1482
-904
lines changed

AGENTS.md

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -970,26 +970,37 @@ Available backends:
970970
PostgreSQL adapters (asyncpg, psycopg, psqlpy) default to `listen_notify`.
971971
All other adapters default to `table_queue`.
972972

973-
**Stores** generate adapter-specific DDL for the queue table:
973+
**Stores** generate adapter-specific DDL using a hook-based pattern:
974974

975975
```python
976976
# In sqlspec/adapters/{adapter}/events/store.py
977977
class AdapterEventQueueStore(BaseEventQueueStore[AdapterConfig]):
978978
__slots__ = ()
979979

980+
# REQUIRED: Return (payload_type, metadata_type, timestamp_type)
980981
def _column_types(self) -> tuple[str, str, str]:
981-
# Return (payload_type, metadata_type, timestamp_type)
982982
return "JSONB", "JSONB", "TIMESTAMPTZ" # PostgreSQL
983983

984-
def _build_create_table_sql(self) -> str:
985-
# Override for database-specific DDL syntax
986-
return super()._build_create_table_sql()
984+
# OPTIONAL hooks for dialect variations:
985+
def _string_type(self, length: int) -> str:
986+
return f"VARCHAR({length})" # Override for STRING(N), VARCHAR2(N), etc.
987987

988-
def _wrap_create_statement(self, statement: str, object_type: str) -> str:
989-
# Wrap with IF NOT EXISTS, PL/SQL blocks, etc.
990-
return statement.replace("CREATE TABLE", "CREATE TABLE IF NOT EXISTS", 1)
988+
def _integer_type(self) -> str:
989+
return "INTEGER" # Override for INT64, NUMBER(10), etc.
990+
991+
def _timestamp_default(self) -> str:
992+
return "CURRENT_TIMESTAMP" # Override for CURRENT_TIMESTAMP(6), SYSTIMESTAMP, etc.
993+
994+
def _primary_key_syntax(self) -> str:
995+
return "" # Override for " PRIMARY KEY (event_id)" if PK must be inline
996+
997+
def _table_clause(self) -> str:
998+
return "" # Override for " CLUSTER BY ..." or " INMEMORY ..."
991999
```
9921000

1001+
Most adapters only override `_column_types()`. Complex dialects may override
1002+
`_build_create_table_sql()` directly (Oracle PL/SQL, BigQuery CLUSTER BY, Spanner no-DEFAULT).
1003+
9931004
**Backend factory pattern** for native backends:
9941005

9951006
```python

docs/guides/events/database-event-channels.md

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -318,13 +318,23 @@ use `migration_config={"exclude_extensions": ["events"]}`.
318318

319319
### Event Queue Stores
320320

321-
Stores generate adapter-specific DDL for the queue table. Each adapter has
322-
a store class in `sqlspec/adapters/{adapter}/events/store.py` that handles:
323-
324-
- Column type mapping (JSON, JSONB, CLOB, etc.)
325-
- Timestamp types (TIMESTAMPTZ, DATETIME, TIMESTAMP)
326-
- Index creation strategies
327-
- Database-specific DDL wrapping (IF NOT EXISTS, PL/SQL blocks, etc.)
321+
Stores generate adapter-specific DDL for the queue table using a hook-based
322+
pattern. The base class `BaseEventQueueStore` provides a template for DDL
323+
generation, and adapters override hook methods for dialect-specific variations:
324+
325+
**Required hook:**
326+
- `_column_types()` - Return tuple of (payload_type, metadata_type, timestamp_type)
327+
328+
**Optional hooks for dialect variations:**
329+
- `_string_type(length)` - String type syntax (default: `VARCHAR(N)`)
330+
- `_integer_type()` - Integer type syntax (default: `INTEGER`)
331+
- `_timestamp_default()` - Timestamp default expression (default: `CURRENT_TIMESTAMP`)
332+
- `_primary_key_syntax()` - Inline PRIMARY KEY clause (default: empty, PK on column)
333+
- `_table_clause()` - Additional table options (default: empty)
334+
335+
Most adapters only need to override `_column_types()`. Complex dialects
336+
(Oracle PL/SQL, BigQuery CLUSTER BY, Spanner no-DEFAULT) may override
337+
`_build_create_table_sql()` directly.
328338

329339
Example store implementations:
330340

sqlspec/adapters/asyncmy/events/store.py

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -55,31 +55,16 @@ def _column_types(self) -> tuple[str, str, str]:
5555
"""
5656
return "JSON", "JSON", "DATETIME(6)"
5757

58-
def _build_create_table_sql(self) -> str:
59-
"""Build MySQL-specific CREATE TABLE SQL.
58+
def _timestamp_default(self) -> str:
59+
"""Return MySQL timestamp default expression.
6060
6161
MySQL requires CURRENT_TIMESTAMP(6) for DATETIME(6) columns,
6262
not just CURRENT_TIMESTAMP which is only valid for TIMESTAMP type.
6363
6464
Returns:
65-
CREATE TABLE SQL statement with MySQL-specific defaults.
65+
MySQL-specific timestamp default with microsecond precision.
6666
"""
67-
payload_type, metadata_type, timestamp_type = self._column_types()
68-
table_clause = self._table_clause()
69-
return (
70-
f"CREATE TABLE {self.table_name} ("
71-
"event_id VARCHAR(64) PRIMARY KEY,"
72-
" channel VARCHAR(128) NOT NULL,"
73-
f" payload_json {payload_type} NOT NULL,"
74-
f" metadata_json {metadata_type},"
75-
" status VARCHAR(32) NOT NULL DEFAULT 'pending',"
76-
f" available_at {timestamp_type} NOT NULL DEFAULT CURRENT_TIMESTAMP(6),"
77-
f" lease_expires_at {timestamp_type},"
78-
" attempts INTEGER NOT NULL DEFAULT 0,"
79-
f" created_at {timestamp_type} NOT NULL DEFAULT CURRENT_TIMESTAMP(6),"
80-
f" acknowledged_at {timestamp_type}"
81-
f") {table_clause}"
82-
)
67+
return "CURRENT_TIMESTAMP(6)"
8368

8469
def _build_index_sql(self) -> str | None:
8570
"""Build MySQL conditional index creation SQL.

sqlspec/adapters/asyncpg/events/backend.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010
from sqlspec.exceptions import EventChannelError, ImproperConfigurationError
1111
from sqlspec.extensions.events import EventMessage
1212
from sqlspec.extensions.events._payload import decode_notify_payload, encode_notify_payload
13-
from sqlspec.extensions.events._queue import AsyncQueueEventBackend, TableEventQueue, build_queue_backend
13+
from sqlspec.extensions.events._queue import AsyncTableEventQueue, build_queue_backend
1414
from sqlspec.extensions.events._store import normalize_event_channel_name
1515
from sqlspec.utils.logging import get_logger
1616
from sqlspec.utils.serializers import to_json
17-
from sqlspec.utils.type_guards import has_attr, is_notification
17+
from sqlspec.utils.type_guards import is_notification
1818
from sqlspec.utils.uuids import uuid4
1919

2020
if TYPE_CHECKING:
@@ -34,7 +34,7 @@ class AsyncpgHybridEventsBackend:
3434
supports_async = True
3535
backend_name = "listen_notify_durable"
3636

37-
def __init__(self, config: "AsyncpgConfig", queue: "AsyncQueueEventBackend") -> None:
37+
def __init__(self, config: "AsyncpgConfig", queue: "AsyncTableEventQueue") -> None:
3838
if not config.is_async:
3939
msg = "Asyncpg hybrid backend requires an async adapter"
4040
raise ImproperConfigurationError(msg)
@@ -56,7 +56,7 @@ async def dequeue(self, channel: str, poll_interval: float) -> EventMessage | No
5656
if notifies_queue is not None:
5757
message = await self._dequeue_with_notifies(connection, channel, poll_interval)
5858
else:
59-
message = await self._queue.dequeue(channel, poll_interval=poll_interval)
59+
message = await self._queue.dequeue(channel, poll_interval)
6060
return message
6161

6262
async def ack(self, event_id: str) -> None:
@@ -90,37 +90,27 @@ async def _publish_durable(
9090
) -> None:
9191
"""Insert event into durable queue and send NOTIFY wakeup signal."""
9292
now = datetime.now(timezone.utc)
93-
queue_backend = self._get_queue_backend()
94-
statement_config = queue_backend.statement_config
9593
async with self._config.provide_session() as driver:
9694
await driver.execute(
9795
SQL(
98-
queue_backend._upsert_sql,
96+
self._queue._upsert_sql,
9997
{
10098
"event_id": event_id,
10199
"channel": channel,
102-
"payload_json": queue_backend._encode_json(payload),
103-
"metadata_json": queue_backend._encode_json(metadata),
100+
"payload_json": to_json(payload),
101+
"metadata_json": to_json(metadata) if metadata else None,
104102
"status": "pending",
105103
"available_at": now,
106104
"lease_expires_at": None,
107105
"attempts": 0,
108106
"created_at": now,
109107
},
110-
statement_config=statement_config,
108+
statement_config=self._queue._statement_config,
111109
)
112110
)
113111
await driver.execute(SQL("SELECT pg_notify($1, $2)", channel, to_json({"event_id": event_id})))
114112
await driver.commit()
115113

116-
def _get_queue_backend(self) -> "TableEventQueue":
117-
"""Return the underlying TableEventQueue from the wrapper."""
118-
queue_backend = self._queue._queue if has_attr(self._queue, "_queue") else None
119-
if queue_backend is None:
120-
msg = "Hybrid queue backend missing queue reference"
121-
raise EventChannelError(msg)
122-
return queue_backend
123-
124114
async def _dequeue_with_notifies(self, connection: Any, channel: str, poll_interval: float) -> EventMessage | None:
125115
"""Wait for a NOTIFY wakeup then dequeue from the durable table."""
126116
try:
@@ -181,6 +171,9 @@ async def ack(self, _event_id: str) -> None:
181171
"""Acknowledge an event. Native notifications are fire-and-forget."""
182172
self._runtime.increment_metric("events.ack")
183173

174+
async def nack(self, _event_id: str) -> None:
175+
"""Return an event to the queue (no-op for native LISTEN/NOTIFY)."""
176+
184177
async def shutdown(self) -> None:
185178
"""Shutdown the listener connection and release resources."""
186179
if self._listen_connection_cm is not None:
@@ -253,7 +246,9 @@ def create_event_backend(
253246
except ImproperConfigurationError:
254247
return None
255248
case "listen_notify_durable":
256-
queue_backend = cast("AsyncQueueEventBackend", build_queue_backend(config, extension_settings, adapter_name="asyncpg"))
249+
queue_backend = cast(
250+
"AsyncTableEventQueue", build_queue_backend(config, extension_settings, adapter_name="asyncpg")
251+
)
257252
try:
258253
return AsyncpgHybridEventsBackend(config, queue_backend)
259254
except ImproperConfigurationError:

sqlspec/adapters/bigquery/events/store.py

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,29 +60,51 @@ def _column_types(self) -> tuple[str, str, str]:
6060
"""
6161
return "JSON", "JSON", "TIMESTAMP"
6262

63+
def _string_type(self, length: int) -> str:
64+
"""Return BigQuery STRING type (length is ignored)."""
65+
del length
66+
return "STRING"
67+
68+
def _integer_type(self) -> str:
69+
"""Return BigQuery INT64 type."""
70+
return "INT64"
71+
72+
def _timestamp_default(self) -> str:
73+
"""Return BigQuery timestamp default expression."""
74+
return "CURRENT_TIMESTAMP()"
75+
76+
def _table_clause(self) -> str:
77+
"""Return BigQuery CLUSTER BY clause for query optimization."""
78+
return " CLUSTER BY channel, status, available_at"
79+
6380
def _build_create_table_sql(self) -> str:
6481
"""Build BigQuery CREATE TABLE with CLUSTER BY optimization.
6582
66-
Returns:
67-
DDL statement for creating the event queue table.
83+
BigQuery uses CLUSTER BY for query optimization instead of indexes.
84+
The clustering columns match the typical polling query pattern.
6885
69-
Notes:
70-
BigQuery uses CLUSTER BY for query optimization instead of indexes.
71-
The clustering columns match the typical polling query pattern.
86+
Note: BigQuery does not support column-level PRIMARY KEY, so we
87+
omit it entirely. event_id uniqueness must be enforced at insert time.
7288
"""
89+
payload_type, metadata_type, timestamp_type = self._column_types()
90+
string_type = self._string_type(0)
91+
integer_type = self._integer_type()
92+
ts_default = self._timestamp_default()
93+
table_clause = self._table_clause()
94+
7395
return (
7496
f"CREATE TABLE IF NOT EXISTS {self.table_name} ("
75-
"event_id STRING NOT NULL,"
76-
" channel STRING NOT NULL,"
77-
" payload_json JSON NOT NULL,"
78-
" metadata_json JSON,"
79-
" status STRING NOT NULL DEFAULT 'pending',"
80-
" available_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP(),"
81-
" lease_expires_at TIMESTAMP,"
82-
" attempts INT64 NOT NULL DEFAULT 0,"
83-
" created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP(),"
84-
" acknowledged_at TIMESTAMP"
85-
") CLUSTER BY channel, status, available_at"
97+
f"event_id {string_type} NOT NULL,"
98+
f" channel {string_type} NOT NULL,"
99+
f" payload_json {payload_type} NOT NULL,"
100+
f" metadata_json {metadata_type},"
101+
f" status {string_type} NOT NULL DEFAULT 'pending',"
102+
f" available_at {timestamp_type} NOT NULL DEFAULT {ts_default},"
103+
f" lease_expires_at {timestamp_type},"
104+
f" attempts {integer_type} NOT NULL DEFAULT 0,"
105+
f" created_at {timestamp_type} NOT NULL DEFAULT {ts_default},"
106+
f" acknowledged_at {timestamp_type}"
107+
f"){table_clause}"
86108
)
87109

88110
def _build_index_sql(self) -> str | None:

sqlspec/adapters/oracledb/events/backend.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,12 @@ def ack(self, _event_id: str) -> None:
102102
"""
103103
self._runtime.increment_metric("events.ack")
104104

105+
def nack(self, _event_id: str) -> None:
106+
"""Return an event to the queue (no-op for Oracle AQ).
107+
108+
Oracle AQ does not support returning messages after commit.
109+
"""
110+
105111
def shutdown(self) -> None:
106112
"""Shutdown the backend (no-op for Oracle AQ)."""
107113

@@ -183,6 +189,12 @@ async def ack(self, _event_id: str) -> None:
183189
"""
184190
self._runtime.increment_metric("events.ack")
185191

192+
async def nack(self, _event_id: str) -> None:
193+
"""Return an event to the queue (no-op for Oracle AQ).
194+
195+
Oracle AQ does not support returning messages after commit.
196+
"""
197+
186198
async def shutdown(self) -> None:
187199
"""Shutdown the backend (no-op for Oracle AQ)."""
188200

0 commit comments

Comments
 (0)