Skip to content

Commit 1471480

Browse files
committed
feat(events): standardize sync method naming and enhance event backends
Rename all sync event methods to use _sync suffix for consistency with async _async suffix pattern across the events extension: - publish -> publish_sync - dequeue -> dequeue_sync - ack -> ack_sync - iter_events -> iter_events_sync - shutdown -> shutdown_sync Also includes fixes: - Fix unused import in asyncpg events store - Fix pyright type errors in channel.py extension_settings handling - Comprehensive test coverage for events extension
1 parent 721fb9a commit 1471480

File tree

64 files changed

+6096
-457
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+6096
-457
lines changed

AGENTS.md

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ For detailed implementation patterns, consult these guides:
321321
| Parameter Profiles | `docs/guides/adapters/parameter-profile-registry.md` |
322322
| Adapter Guides | `docs/guides/adapters/{adapter}.md` |
323323
| Framework Extensions | `docs/guides/extensions/{framework}.md` |
324+
| Database Events | `docs/guides/events/database-event-channels.md` |
324325
| Quick Reference | `docs/guides/quick-reference/quick-reference.md` |
325326
| Code Standards | `docs/guides/development/code-standards.md` |
326327
| Implementation Patterns | `docs/guides/development/implementation-patterns.md` |
@@ -857,6 +858,70 @@ if async_configs:
857858

858859
**Reference implementation:** `sqlspec/cli.py` (lines 218-255, 311-724)
859860

861+
### Events Extension Pattern
862+
863+
The events extension provides database-agnostic pub/sub via two layers:
864+
865+
**Backends** handle communication (LISTEN/NOTIFY, Oracle AQ, table polling):
866+
867+
```python
868+
# Backend selection via driver_features
869+
class AdapterDriverFeatures(TypedDict):
870+
enable_events: NotRequired[bool]
871+
events_backend: NotRequired[Literal["listen_notify", "listen_notify_durable", "table_queue", "advanced_queue"]]
872+
873+
# Auto-detection in config __init__
874+
if "enable_events" not in driver_features:
875+
driver_features["enable_events"] = extension_config.get("events") is not None
876+
if "events_backend" not in driver_features:
877+
driver_features["events_backend"] = "listen_notify" # or adapter-specific default
878+
```
879+
880+
**Stores** generate adapter-specific DDL for the queue table:
881+
882+
```python
883+
# In sqlspec/adapters/{adapter}/events/store.py
884+
class AdapterEventQueueStore(BaseEventQueueStore[AdapterConfig]):
885+
__slots__ = ()
886+
887+
def _column_types(self) -> tuple[str, str, str]:
888+
# Return (payload_type, metadata_type, timestamp_type)
889+
return "JSONB", "JSONB", "TIMESTAMPTZ" # PostgreSQL
890+
891+
def _build_create_table_sql(self) -> str:
892+
# Override for database-specific DDL syntax
893+
return super()._build_create_table_sql()
894+
895+
def _wrap_create_statement(self, statement: str, object_type: str) -> str:
896+
# Wrap with IF NOT EXISTS, PL/SQL blocks, etc.
897+
return statement.replace("CREATE TABLE", "CREATE TABLE IF NOT EXISTS", 1)
898+
```
899+
900+
**Backend factory pattern** for native backends:
901+
902+
```python
903+
# In sqlspec/adapters/{adapter}/events/backend.py
904+
def create_event_backend(
905+
config: "AdapterConfig", backend_name: str, extension_settings: dict[str, Any]
906+
) -> AdapterEventsBackend | None:
907+
if backend_name == "listen_notify":
908+
return AdapterEventsBackend(config)
909+
if backend_name == "listen_notify_durable":
910+
queue = TableEventQueue(config, **extension_settings)
911+
return AdapterHybridEventsBackend(config, QueueEventBackend(queue))
912+
return None # Falls back to table_queue
913+
```
914+
915+
**Key principles:**
916+
917+
- Backends implement `publish_async`, `dequeue_async`, `ack_async` (and sync variants)
918+
- Stores inherit from `BaseEventQueueStore` and override `_column_types()`
919+
- Use `driver_features` for backend selection, `extension_config["events"]` for settings
920+
- Always support `table_queue` fallback for databases without native pub/sub
921+
- Separate DDL execution for databases without transactional DDL (Spanner, BigQuery)
922+
923+
**Reference implementation:** `sqlspec/extensions/events/`, `sqlspec/adapters/*/events/`
924+
860925
## Collaboration Guidelines
861926

862927
- Challenge suboptimal requests constructively

docs/guides/events/database-event-channels.md

Lines changed: 86 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ from sqlspec.adapters.sqlite import SqliteConfig
1414

1515
spec = SQLSpec()
1616
config = SqliteConfig(
17-
pool_config={"database": ":memory:"},
17+
connection_config={"database": ":memory:"},
1818
migration_config={
1919
"script_location": "migrations",
2020
"include_extensions": ["events"],
@@ -37,16 +37,17 @@ for message in channel.iter_events("notifications"):
3737

3838
## PostgreSQL native notifications
3939

40-
Async PostgreSQL adapters (AsyncPG today, with psycopg/psqlpy on deck) default
41-
to `events_backend="listen_notify"`, which means no migrations are required;
42-
events flow directly through `LISTEN/NOTIFY`.
40+
All async PostgreSQL adapters (AsyncPG, Psycopg async, and Psqlpy) support
41+
native `LISTEN/NOTIFY` via `events_backend="listen_notify"`. When enabled,
42+
events flow directly through PostgreSQL's notification system with no
43+
migrations required.
4344

4445
```python
4546
from sqlspec import SQLSpec
4647
from sqlspec.adapters.asyncpg import AsyncpgConfig
4748

4849
spec = SQLSpec()
49-
config = AsyncpgConfig(pool_config={"dsn": "postgresql://localhost/db"})
50+
config = AsyncpgConfig(connection_config={"dsn": "postgresql://localhost/db"})
5051
spec.add_config(config)
5152
channel = spec.event_channel(config)
5253

@@ -217,6 +218,7 @@ against accidental sync usage.
217218
| Asyncmy (MySQL) | Queue fallback | `0.25s` | `5s` | Adds `FOR UPDATE SKIP LOCKED` to reduce contention. |
218219
| DuckDB | Queue fallback | `0.15s` | `15s` | Favor short leases/poll windows so embedded engines do not spin. |
219220
| BigQuery / ADBC | Queue fallback | `2.0s` | `60s` | Coarser cadence avoids hammering remote warehouses; still safe to override. |
221+
| Spanner | Queue fallback | `1.0s` | `30s` | Uses Spanner-native JSON and TIMESTAMP types; requires separate DDL execution. |
220222
| SQLite / AioSQLite | Queue fallback | `1.0s` | `30s` | General-purpose defaults that suit most local deployments. |
221223

222224
## Telemetry & observability
@@ -231,6 +233,84 @@ exports structured traces. The counters and spans flow through
231233
`SQLSpec.telemetry_snapshot()` plus the Prometheus helper when
232234
`extension_config["prometheus"]` is enabled.
233235

236+
## Architecture
237+
238+
The events extension consists of two layers that work together:
239+
240+
### Event Backends
241+
242+
Backends handle the actual pub/sub communication mechanism. SQLSpec supports
243+
three backend types:
244+
245+
| Backend | Description | When to use |
246+
| --- | --- | --- |
247+
| `listen_notify` | Native PostgreSQL LISTEN/NOTIFY | Real-time, fire-and-forget events |
248+
| `listen_notify_durable` | Hybrid: queue table + NOTIFY wakeups | Real-time with durability and retries |
249+
| `advanced_queue` | Oracle Advanced Queuing | Enterprise Oracle deployments |
250+
| `table_queue` | Polling-based queue table | Universal fallback for any database |
251+
252+
Configure the backend via `driver_features["events_backend"]`:
253+
254+
```python
255+
from sqlspec.adapters.asyncpg import AsyncpgConfig
256+
257+
# Native LISTEN/NOTIFY (default for PostgreSQL adapters)
258+
config = AsyncpgConfig(
259+
connection_config={"dsn": "postgresql://localhost/db"},
260+
driver_features={"events_backend": "listen_notify"},
261+
)
262+
263+
# Hybrid: durable queue with NOTIFY wakeups
264+
config = AsyncpgConfig(
265+
connection_config={"dsn": "postgresql://localhost/db"},
266+
driver_features={"events_backend": "listen_notify_durable"},
267+
)
268+
```
269+
270+
### Event Queue Stores
271+
272+
Stores generate adapter-specific DDL for the queue table. Each adapter has
273+
a store class in `sqlspec/adapters/{adapter}/events/store.py` that handles:
274+
275+
- Column type mapping (JSON, JSONB, CLOB, etc.)
276+
- Timestamp types (TIMESTAMPTZ, DATETIME, TIMESTAMP)
277+
- Index creation strategies
278+
- Database-specific DDL wrapping (IF NOT EXISTS, PL/SQL blocks, etc.)
279+
280+
Example store implementations:
281+
282+
| Adapter | Payload Type | Timestamp Type | Special Handling |
283+
| --- | --- | --- | --- |
284+
| AsyncPG / Psycopg / Psqlpy | JSONB | TIMESTAMPTZ | Standard PostgreSQL |
285+
| Oracle | CLOB | TIMESTAMP | PL/SQL exception blocks for idempotent DDL |
286+
| MySQL / Asyncmy | JSON | DATETIME(6) | FOR UPDATE SKIP LOCKED |
287+
| DuckDB | JSON | TIMESTAMP | Short poll intervals |
288+
| BigQuery | JSON | TIMESTAMP | CLUSTER BY for partitioning |
289+
| Spanner | JSON | TIMESTAMP | Separate DDL execution (no IF NOT EXISTS) |
290+
| ADBC | Dialect-detected | Dialect-detected | Multi-dialect support based on connection URI |
291+
| SQLite / AioSQLite | TEXT | TIMESTAMP | General-purpose defaults |
292+
293+
### ADBC Multi-Dialect Support
294+
295+
The ADBC adapter auto-detects the underlying database from the connection URI
296+
and generates appropriate DDL:
297+
298+
```python
299+
from sqlspec.adapters.adbc import AdbcConfig
300+
301+
# Connects to PostgreSQL via ADBC - uses JSONB columns
302+
config = AdbcConfig(
303+
connection_config={"uri": "postgresql://localhost/db"},
304+
extension_config={"events": {}},
305+
)
306+
307+
# Connects to BigQuery via ADBC - uses JSON with CLUSTER BY
308+
config = AdbcConfig(
309+
connection_config={"driver_name": "bigquery"},
310+
extension_config={"events": {}},
311+
)
312+
```
313+
234314
## Litestar/ADK integration
235315

236316
- The Litestar and ADK plugins already call `SQLSpec.event_channel()` for you.
@@ -246,7 +326,7 @@ from sqlspec.extensions.litestar import SQLSpecPlugin
246326
sql = SQLSpec()
247327
config = sql.add_config(
248328
AsyncpgConfig(
249-
pool_config={"dsn": "postgresql://localhost/db"},
329+
connection_config={"dsn": "postgresql://localhost/db"},
250330
extension_config={"events": {"queue_table": "app_events"}},
251331
)
252332
)

sqlspec/adapters/adbc/data_dictionary.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -216,14 +216,23 @@ def _get_dialect(self, driver: SyncDriverAdapterBase) -> str:
216216
def get_version(self, driver: SyncDriverAdapterBase) -> "VersionInfo | None":
217217
"""Get database version information based on detected dialect.
218218
219+
Uses caching to avoid repeated database queries within the same
220+
driver session.
221+
219222
Args:
220-
driver: ADBC driver instance
223+
driver: ADBC driver instance.
221224
222225
Returns:
223-
Database version information or None if detection fails
226+
Database version information or None if detection fails.
224227
"""
228+
driver_id = id(driver)
229+
was_cached, cached_version = self.get_cached_version(driver_id)
230+
if was_cached:
231+
return cached_version
232+
225233
dialect = self._get_dialect(driver)
226234
adbc_driver = cast("AdbcDriver", driver)
235+
version_info: VersionInfo | None = None
227236

228237
try:
229238
if dialect == "postgres":
@@ -234,39 +243,40 @@ def get_version(self, driver: SyncDriverAdapterBase) -> "VersionInfo | None":
234243
major = int(match.group(1))
235244
minor = int(match.group(2))
236245
patch = int(match.group(3)) if match.group(3) else 0
237-
return VersionInfo(major, minor, patch)
246+
version_info = VersionInfo(major, minor, patch)
238247

239248
elif dialect == "sqlite":
240249
version_str = adbc_driver.select_value("SELECT sqlite_version()")
241250
if version_str:
242251
match = SQLITE_VERSION_PATTERN.match(str(version_str))
243252
if match:
244253
major, minor, patch = map(int, match.groups())
245-
return VersionInfo(major, minor, patch)
254+
version_info = VersionInfo(major, minor, patch)
246255

247256
elif dialect == "duckdb":
248257
version_str = adbc_driver.select_value("SELECT version()")
249258
if version_str:
250259
match = DUCKDB_VERSION_PATTERN.search(str(version_str))
251260
if match:
252261
major, minor, patch = map(int, match.groups())
253-
return VersionInfo(major, minor, patch)
262+
version_info = VersionInfo(major, minor, patch)
254263

255264
elif dialect == "mysql":
256265
version_str = adbc_driver.select_value("SELECT VERSION()")
257266
if version_str:
258267
match = MYSQL_VERSION_PATTERN.search(str(version_str))
259268
if match:
260269
major, minor, patch = map(int, match.groups())
261-
return VersionInfo(major, minor, patch)
270+
version_info = VersionInfo(major, minor, patch)
262271

263272
elif dialect == "bigquery":
264-
return VersionInfo(1, 0, 0)
273+
version_info = VersionInfo(1, 0, 0)
265274

266275
except Exception:
267276
logger.warning("Failed to get %s version", dialect)
268277

269-
return None
278+
self.cache_version(driver_id, version_info)
279+
return version_info
270280

271281
def get_feature_flag(self, driver: SyncDriverAdapterBase, feature: str) -> bool:
272282
"""Check if database supports a specific feature based on detected dialect.

0 commit comments

Comments
 (0)