Skip to content

Commit cb1902f

Browse files
committed
feat(events): enhance event backend configuration and runtime hints for AsyncPG, Psqlpy, and Psycopg adapters
1 parent de9696c commit cb1902f

File tree

21 files changed

+264
-150
lines changed

21 files changed

+264
-150
lines changed

docs/guides/adapters/asyncpg.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,11 @@ For comprehensive examples and migration guides, see:
248248
- Publishing uses `connection.notify()` under the hood; consumers rely on
249249
`connection.add_listener()` with dedicated connections so the shared pool
250250
stays available for transactional work.
251+
- For durability and retries, set `driver_features["events_backend"] =
252+
"listen_notify_durable"` and include the `events` extension migrations.
251253
- Force the durable queue fallback (for deterministic testing or multi-tenant
252-
workloads) by overriding `driver_features["events_backend"] = "table_queue"`.
254+
workloads) by overriding `driver_features["events_backend"] = "table_queue"`
255+
and including the `events` migrations.
253256

254257
## Common Issues
255258

docs/guides/adapters/psqlpy.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -254,12 +254,16 @@ For comprehensive examples and migration guides, see:
254254

255255
## Event Channels
256256

257-
- Psqlpy adapters use the queue-backed event channel today. Include the
258-
`events` extension migrations and call `spec.event_channel(config)` to
259-
publish/consume with retryable leases.
260-
- Psqlpy exposes a built-in `Listener` class. When the native backend ships,
261-
SQLSpec will wrap that listener automatically; until then, prefer the durable
262-
queue backend for portability across adapters.
257+
- Psqlpy enables native LISTEN/NOTIFY by default
258+
(`driver_features["events_backend"] = "listen_notify"`). Call
259+
`spec.event_channel(config)` to publish or consume without migrations.
260+
- Native listeners use the `Listener` API and a dedicated connection so the
261+
shared pool remains available for normal queries.
262+
- For durability and retries, set `driver_features["events_backend"] =
263+
"listen_notify_durable"` and include the `events` extension migrations.
264+
- The queue-only fallback remains available by setting
265+
`driver_features["events_backend"] = "table_queue"` alongside the
266+
`events` migrations.
263267

264268
## Best Practices
265269

docs/guides/adapters/psycopg.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -133,16 +133,16 @@ For comprehensive examples and migration guides, see:
133133

134134
## Event Channels
135135

136-
- Psycopg currently routes `EventChannel` through the durable queue backend by
137-
default (`driver_features["events_backend"] = "table_queue"`). Include the
138-
`events` extension migrations, then call `spec.event_channel(config)` to
139-
publish/consume events.
140-
- Native LISTEN/NOTIFY support is coming soon; until then you can still
141-
subscribe to channels via the durable queue and rely on leases/acks for retry
142-
safety.
143-
- When you migrate to the native backend, set
144-
`driver_features["events_backend"] = "listen_notify"` to opt in once the
145-
backend lands.
136+
- Psycopg enables native LISTEN/NOTIFY support by default
137+
(`driver_features["events_backend"] = "listen_notify"`). Call
138+
`spec.event_channel(config)` to publish or consume without migrations.
139+
- Listeners run on dedicated connections so the pool remains available for
140+
transactional work.
141+
- For durability and retries, set `driver_features["events_backend"] =
142+
"listen_notify_durable"` and include the `events` extension migrations.
143+
- The queue-only fallback remains available by setting
144+
`driver_features["events_backend"] = "table_queue"` alongside the
145+
`events` migrations.
146146

147147
## Common Issues
148148

docs/guides/adapters/spanner.md

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,26 @@ This guide provides specific instructions for the `spanner` adapter.
88

99
## Key Information
1010

11-
- **Driver:** `google-cloud-spanner`
12-
- **Parameter Style:** `named` with `@` prefix (e.g., `@name`)
13-
- **Dialect:** `spanner` (custom dialect extending BigQuery)
14-
- **Transactional DDL:** Not supported (DDL uses separate admin operations)
11+
- **Driver:** `google-cloud-spanner`
12+
- **Parameter Style:** `named` with `@` prefix (e.g., `@name`)
13+
- **Dialect:** `spanner` (custom dialect extending BigQuery)
14+
- **Transactional DDL:** Not supported (DDL uses separate admin operations)
1515

1616
## Parameter Profile
1717

18-
- **Registry Key:** `"spanner"`
19-
- **JSON Strategy:** `helper`
20-
- **Default Style:** `NAMED_AT` (parameters prefixed with `@`)
18+
- **Registry Key:** `"spanner"`
19+
- **JSON Strategy:** `helper`
20+
- **Default Style:** `NAMED_AT` (parameters prefixed with `@`)
2121

2222
## Features
2323

24-
- **Full ACID Transactions:** Spanner provides global transactions with strong consistency
25-
- **Interleaved Tables:** Physical co-location of parent-child rows for performance
26-
- **Row-Level TTL:** Automatic row expiration via TTL policies
27-
- **Session Pooling:** Built-in session pool management
28-
- **UUID Handling:** Automatic UUID-to-bytes conversion
29-
- **BYTES Handling:** Automatic base64 encoding/decoding for BYTES columns
30-
- **JSON Support:** Native JSON type handling
24+
- **Full ACID Transactions:** Spanner provides global transactions with strong consistency
25+
- **Interleaved Tables:** Physical co-location of parent-child rows for performance
26+
- **Row-Level TTL:** Automatic row expiration via TTL policies
27+
- **Session Pooling:** Built-in session pool management
28+
- **UUID Handling:** Automatic UUID-to-bytes conversion
29+
- **BYTES Handling:** Automatic base64 encoding/decoding for BYTES columns
30+
- **JSON Support:** Native JSON type handling
3131

3232
## Configuration
3333

@@ -124,9 +124,31 @@ with config.provide_session() as session:
124124
```
125125

126126
The encoding/decoding is transparent - you work with Python bytes directly. The adapter uses:
127+
127128
- `bytes_to_spanner()` - Base64-encode bytes for storage
128129
- `spanner_to_bytes()` - Base64-decode bytes from storage
129130

131+
### JSON Handling
132+
133+
Spanner JSON parameters should use Python structures instead of pre-serialized strings. SQLSpec wraps JSON
134+
values with the Spanner client `JsonObject` when available.
135+
136+
- Dicts are treated as JSON objects.
137+
- Nested sequences (lists/tuples containing dicts or nested lists) are treated as JSON arrays.
138+
- Flat sequences of scalars are preserved as ARRAY parameters.
139+
140+
If you need a JSON array of scalars or a scalar JSON value, wrap it explicitly:
141+
142+
```python
143+
from sqlspec.adapters.spanner import spanner_json
144+
145+
payload = spanner_json([1, 2, 3])
146+
session.execute(
147+
"INSERT INTO events (payload_json) VALUES (@payload)",
148+
{"payload": payload},
149+
)
150+
```
151+
130152
### UUID Handling
131153

132154
UUIDs are automatically converted to 16-byte BYTES(16) format:
@@ -210,6 +232,7 @@ CREATE TABLE orders (
210232
```
211233

212234
Interleaved tables provide:
235+
213236
- Automatic co-location of related data
214237
- Efficient joins between parent and child tables
215238
- Cascading deletes for data integrity
@@ -317,17 +340,17 @@ events = store.list_events(session.id)
317340

318341
## Common Issues
319342

320-
- **DDL Operations:** DDL statements (CREATE TABLE, ALTER TABLE, etc.) cannot be executed through the driver's `execute()` method. Use `database.update_ddl()` for DDL operations.
343+
- **DDL Operations:** DDL statements (CREATE TABLE, ALTER TABLE, etc.) cannot be executed through the driver's `execute()` method. Use `database.update_ddl()` for DDL operations.
321344

322-
- **Mutation Limit:** Spanner has a 20,000 mutation limit per transaction. For bulk inserts, batch operations into multiple transactions.
345+
- **Mutation Limit:** Spanner has a 20,000 mutation limit per transaction. For bulk inserts, batch operations into multiple transactions.
323346

324-
- **Read-Only Snapshots:** The default session context uses read-only snapshots. For write operations, use `database.run_in_transaction()` or configure a transaction context.
347+
- **Read-Only Snapshots:** The default session context uses read-only snapshots. For write operations, use `database.run_in_transaction()` or configure a transaction context.
325348

326-
- **Emulator Limitations:** The Spanner emulator doesn't support all features (e.g., some complex queries, backups). Test critical functionality against a real Spanner instance.
349+
- **Emulator Limitations:** The Spanner emulator doesn't support all features (e.g., some complex queries, backups). Test critical functionality against a real Spanner instance.
327350

328-
- **`google.api_core.exceptions.AlreadyExists`:** Resource already exists. Check if the table or index already exists before creating.
351+
- **`google.api_core.exceptions.AlreadyExists`:** Resource already exists. Check if the table or index already exists before creating.
329352

330-
- **`google.api_core.exceptions.NotFound`:** Resource not found. Verify the instance, database, and table names are correct.
353+
- **`google.api_core.exceptions.NotFound`:** Resource not found. Verify the instance, database, and table names are correct.
331354

332355
## Best Practices
333356

docs/guides/events/database-event-channels.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ against accidental sync usage.
213213

214214
| Adapter | Backend | Default poll interval | Lease window | Locking hints |
215215
| --- | --- | --- | --- | --- |
216-
| AsyncPG / Psycopg / Psqlpy | Native LISTEN/NOTIFY | `N/A` (native notifications) | `N/A` | Dedicated listener connections reuse the driver's native APIs. |
216+
| AsyncPG / Psycopg / Psqlpy | `listen_notify` (default), `listen_notify_durable` | `0.5s` (queue-backed paths only) | `30s` (queue-backed paths only) | Queue backends add `FOR UPDATE SKIP LOCKED`; native backends use dedicated listener connections. |
217217
| Oracle | `advanced_queue` (sync adapters) | `aq_wait_seconds` (default `5s`) | `N/A` – AQ removes messages when dequeued | Exposes AQ dequeue options via `extension_config`. |
218218
| Asyncmy (MySQL) | Queue fallback | `0.25s` | `5s` | Adds `FOR UPDATE SKIP LOCKED` to reduce contention. |
219219
| DuckDB | Queue fallback | `0.15s` | `15s` | Favor short leases/poll windows so embedded engines do not spin. |
@@ -295,6 +295,10 @@ config = AsyncpgConfig(
295295
)
296296
```
297297

298+
Queue-backed backends (`listen_notify_durable`, `table_queue`) require the
299+
events extension migrations. Ensure `migration_config["include_extensions"]`
300+
contains `"events"` before publishing or consuming.
301+
298302
### Event Queue Stores
299303

300304
Stores generate adapter-specific DDL for the queue table. Each adapter has
@@ -310,8 +314,8 @@ Example store implementations:
310314
| Adapter | Payload Type | Timestamp Type | Special Handling |
311315
| --- | --- | --- | --- |
312316
| AsyncPG / Psycopg / Psqlpy | JSONB | TIMESTAMPTZ | Standard PostgreSQL |
313-
| Oracle | CLOB | TIMESTAMP | PL/SQL exception blocks for idempotent DDL |
314-
| MySQL / Asyncmy | JSON | DATETIME(6) | FOR UPDATE SKIP LOCKED |
317+
| Oracle | JSON or BLOB | TIMESTAMP | PL/SQL exception blocks, auto-detected JSON storage |
318+
| MySQL / Asyncmy | JSON | DATETIME(6) | Conditional index creation via procedural SQL |
315319
| DuckDB | JSON | TIMESTAMP | Short poll intervals |
316320
| BigQuery | JSON | TIMESTAMP | CLUSTER BY for partitioning |
317321
| Spanner | JSON | TIMESTAMP | Separate DDL execution (no IF NOT EXISTS) |

sqlspec/adapters/asyncpg/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
)
2222
from sqlspec.config import AsyncDatabaseConfig, ExtensionConfigs
2323
from sqlspec.exceptions import ImproperConfigurationError, MissingDependencyError
24+
from sqlspec.extensions.events._hints import EventRuntimeHints
2425
from sqlspec.typing import ALLOYDB_CONNECTOR_INSTALLED, CLOUD_SQL_CONNECTOR_INSTALLED, PGVECTOR_INSTALLED
2526
from sqlspec.utils.config_normalization import apply_pool_deprecations, normalize_connection_config
2627
from sqlspec.utils.serializers import from_json, to_json
@@ -490,3 +491,8 @@ def get_signature_namespace(self) -> "dict[str, Any]":
490491
"AsyncpgPreparedStatement": AsyncpgPreparedStatement,
491492
})
492493
return namespace
494+
495+
def get_event_runtime_hints(self) -> "EventRuntimeHints":
496+
"""Return polling defaults for PostgreSQL queue fallback."""
497+
498+
return EventRuntimeHints(poll_interval=0.5, select_for_update=True, skip_locked=True)

sqlspec/adapters/asyncpg/events/backend.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from sqlspec.core import SQL
1111
from sqlspec.exceptions import EventChannelError, ImproperConfigurationError
1212
from sqlspec.extensions.events import EventMessage
13-
from sqlspec.extensions.events._queue import QueueEventBackend, TableEventQueue
13+
from sqlspec.extensions.events._queue import QueueEventBackend, TableEventQueue, build_queue_backend
1414
from sqlspec.utils.logging import get_logger
1515
from sqlspec.utils.serializers import from_json, to_json
1616

@@ -363,16 +363,7 @@ def create_event_backend(
363363
except ImproperConfigurationError:
364364
return None
365365
if backend_name == "listen_notify_durable":
366-
queue = TableEventQueue(
367-
config,
368-
queue_table=extension_settings.get("queue_table"),
369-
lease_seconds=extension_settings.get("lease_seconds"),
370-
retention_seconds=extension_settings.get("retention_seconds"),
371-
select_for_update=extension_settings.get("select_for_update"),
372-
skip_locked=extension_settings.get("skip_locked"),
373-
json_passthrough=extension_settings.get("json_passthrough"),
374-
)
375-
queue_backend = QueueEventBackend(queue)
366+
queue_backend = build_queue_backend(config, extension_settings, adapter_name="asyncpg")
376367
try:
377368
return AsyncpgHybridEventsBackend(config, queue_backend)
378369
except ImproperConfigurationError:

sqlspec/adapters/psqlpy/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,4 +275,4 @@ def get_signature_namespace(self) -> "dict[str, Any]":
275275
def get_event_runtime_hints(self) -> "EventRuntimeHints":
276276
"""Return LISTEN/NOTIFY defaults for Psqlpy adapters."""
277277

278-
return EventRuntimeHints(json_passthrough=True)
278+
return EventRuntimeHints(poll_interval=0.5, select_for_update=True, skip_locked=True, json_passthrough=True)

sqlspec/adapters/psqlpy/events/backend.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from sqlspec.core import SQL
1010
from sqlspec.exceptions import EventChannelError, ImproperConfigurationError
1111
from sqlspec.extensions.events import EventMessage
12-
from sqlspec.extensions.events._queue import QueueEventBackend, TableEventQueue
12+
from sqlspec.extensions.events._queue import QueueEventBackend, build_queue_backend
1313
from sqlspec.utils.logging import get_logger
1414
from sqlspec.utils.serializers import from_json, to_json
1515

@@ -319,16 +319,7 @@ def create_event_backend(
319319
except ImproperConfigurationError:
320320
return None
321321
if backend_name == "listen_notify_durable":
322-
queue = TableEventQueue(
323-
config,
324-
queue_table=extension_settings.get("queue_table"),
325-
lease_seconds=extension_settings.get("lease_seconds"),
326-
retention_seconds=extension_settings.get("retention_seconds"),
327-
select_for_update=extension_settings.get("select_for_update"),
328-
skip_locked=extension_settings.get("skip_locked"),
329-
json_passthrough=extension_settings.get("json_passthrough"),
330-
)
331-
queue_backend = QueueEventBackend(queue)
322+
queue_backend = build_queue_backend(config, extension_settings, adapter_name="psqlpy")
332323
try:
333324
return PsqlpyHybridEventsBackend(config, queue_backend)
334325
except ImproperConfigurationError:

sqlspec/adapters/psycopg/config.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
psycopg_statement_config,
2222
)
2323
from sqlspec.config import AsyncDatabaseConfig, ExtensionConfigs, SyncDatabaseConfig
24+
from sqlspec.extensions.events._hints import EventRuntimeHints
2425
from sqlspec.typing import PGVECTOR_INSTALLED
2526
from sqlspec.utils.config_normalization import apply_pool_deprecations, normalize_connection_config
2627
from sqlspec.utils.serializers import to_json
@@ -299,6 +300,11 @@ def get_signature_namespace(self) -> "dict[str, Any]":
299300
})
300301
return namespace
301302

303+
def get_event_runtime_hints(self) -> "EventRuntimeHints":
304+
"""Return polling defaults for PostgreSQL queue fallback."""
305+
306+
return EventRuntimeHints(poll_interval=0.5, select_for_update=True, skip_locked=True)
307+
302308

303309
class PsycopgAsyncConfig(AsyncDatabaseConfig[PsycopgAsyncConnection, AsyncConnectionPool, PsycopgAsyncDriver]):
304310
"""Configuration for Psycopg asynchronous database connections with direct field-based configuration."""
@@ -495,3 +501,8 @@ def get_signature_namespace(self) -> "dict[str, Any]":
495501
"PsycopgPoolParams": PsycopgPoolParams,
496502
})
497503
return namespace
504+
505+
def get_event_runtime_hints(self) -> "EventRuntimeHints":
506+
"""Return polling defaults for PostgreSQL queue fallback."""
507+
508+
return EventRuntimeHints(poll_interval=0.5, select_for_update=True, skip_locked=True)

0 commit comments

Comments
 (0)