Skip to content

Commit 29f7176

Browse files
authored
fix(events): use extension_config pattern for events configuration (#295)
Moves events configuration from `driver_features` to `extension_config` pattern, consistent with other extensions (litestar, adk, starlette, flask). Also adds consistent auto-migration inclusion for all extensions with `exclude_extensions` opt-out.
1 parent dc89f5f commit 29f7176

File tree

29 files changed

+389
-189
lines changed

29 files changed

+389
-189
lines changed

AGENTS.md

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -865,18 +865,28 @@ The events extension provides database-agnostic pub/sub via two layers:
865865
**Backends** handle communication (LISTEN/NOTIFY, Oracle AQ, table polling):
866866

867867
```python
868-
# Backend selection via driver_features
869-
class AdapterDriverFeatures(TypedDict):
870-
enable_events: NotRequired[bool]
871-
events_backend: NotRequired[Literal["listen_notify", "listen_notify_durable", "table_queue", "advanced_queue"]]
872-
873-
# Auto-detection in config __init__
874-
if "enable_events" not in driver_features:
875-
driver_features["enable_events"] = extension_config.get("events") is not None
876-
if "events_backend" not in driver_features:
877-
driver_features["events_backend"] = "listen_notify" # or adapter-specific default
868+
# Backend selection via extension_config (NOT driver_features)
869+
config = AsyncpgConfig(
870+
connection_config={"dsn": "postgresql://..."},
871+
extension_config={
872+
"events": {
873+
"backend": "listen_notify_durable", # Backend selection here
874+
"queue_table": "event_queue_custom",
875+
"lease_seconds": 60,
876+
}
877+
}
878+
)
878879
```
879880

881+
Available backends:
882+
- `listen_notify`: Real-time PostgreSQL LISTEN/NOTIFY (ephemeral, no migrations)
883+
- `table_queue`: Durable table-backed queue with retries (all adapters)
884+
- `listen_notify_durable`: Hybrid combining both (PostgreSQL only)
885+
- `advanced_queue`: Oracle Advanced Queueing
886+
887+
PostgreSQL adapters (asyncpg, psycopg, psqlpy) default to `listen_notify`.
888+
All other adapters default to `table_queue`.
889+
880890
**Stores** generate adapter-specific DDL for the queue table:
881891

882892
```python
@@ -916,10 +926,48 @@ def create_event_backend(
916926

917927
- Backends implement `publish_async`, `dequeue_async`, `ack_async` (and sync variants)
918928
- Stores inherit from `BaseEventQueueStore` and override `_column_types()`
919-
- Use `driver_features` for backend selection, `extension_config["events"]` for settings
929+
- Use `extension_config["events"]["backend"]` for backend selection, other keys for settings
920930
- Always support `table_queue` fallback for databases without native pub/sub
921931
- Separate DDL execution for databases without transactional DDL (Spanner, BigQuery)
922932

933+
**Auto-migration inclusion:**
934+
935+
Extensions with migration support are automatically included in
936+
`migration_config["include_extensions"]` based on their settings:
937+
938+
- **litestar**: Only when ``session_table`` is set (``True`` or custom name)
939+
- **adk**: When any adk settings are present
940+
- **events**: When any events settings are present
941+
942+
Use `exclude_extensions` to opt out:
943+
944+
```python
945+
# Auto-includes events migrations when extension_config["events"] is set
946+
config = AsyncpgConfig(
947+
connection_config={"dsn": "postgresql://..."},
948+
extension_config={"events": {"backend": "table_queue"}},
949+
)
950+
951+
# Litestar with session storage - auto-includes migrations
952+
config = AsyncpgConfig(
953+
connection_config={"dsn": "postgresql://..."},
954+
extension_config={"litestar": {"session_table": True}}, # or "my_sessions"
955+
)
956+
957+
# Litestar for DI only - no migrations needed
958+
config = AsyncpgConfig(
959+
connection_config={"dsn": "postgresql://..."},
960+
extension_config={"litestar": {"session_key": "db"}}, # No session_table = no migrations
961+
)
962+
963+
# Exclude events migrations when using ephemeral backend
964+
config = AsyncpgConfig(
965+
connection_config={"dsn": "postgresql://..."},
966+
migration_config={"exclude_extensions": ["events"]}, # Skip queue table migration
967+
extension_config={"events": {"backend": "listen_notify"}},
968+
)
969+
```
970+
923971
**Reference implementation:** `sqlspec/extensions/events/`, `sqlspec/adapters/*/events/`
924972

925973
## Collaboration Guidelines

docs/guides/adapters/asyncpg.md

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -241,17 +241,15 @@ For comprehensive examples and migration guides, see:
241241

242242
## Event Channels
243243

244-
- AsyncPG enables native LISTEN/NOTIFY support automatically by setting
245-
`driver_features["events_backend"] = "listen_notify"` during config
246-
construction. Call `spec.event_channel(config)` to obtain a channel—no
247-
migrations are required.
244+
- AsyncPG defaults to native LISTEN/NOTIFY support (`backend="listen_notify"`).
245+
Call `spec.event_channel(config)` to obtain a channel—no migrations required.
248246
- Publishing uses `connection.notify()` under the hood; consumers rely on
249247
`connection.add_listener()` with dedicated connections so the shared pool
250248
stays available for transactional work.
251-
- For durability and retries, set `driver_features["events_backend"] =
252-
"listen_notify_durable"` and include the `events` extension migrations.
249+
- For durability and retries, set `extension_config={"events": {"backend": "listen_notify_durable"}}`
250+
and include the `events` extension migrations.
253251
- Force the durable queue fallback (for deterministic testing or multi-tenant
254-
workloads) by overriding `driver_features["events_backend"] = "table_queue"`
252+
workloads) by setting `extension_config={"events": {"backend": "table_queue"}}`
255253
and including the `events` migrations.
256254

257255
## Common Issues

docs/guides/adapters/oracledb.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -902,9 +902,9 @@ For comprehensive examples and migration guides, see:
902902

903903
## Event Channels
904904

905-
- Set `driver_features["events_backend"] = "advanced_queue"` to enable native
905+
- Set `extension_config={"events": {"backend": "advanced_queue"}}` to enable native
906906
Advanced Queuing support. Event publishing uses `connection.queue()` and
907-
inherits the AQ options surfaced via `extension_config["events"]`
907+
inherits the AQ options from `extension_config["events"]`
908908
(`aq_queue`, `aq_wait_seconds`, `aq_visibility`).
909909
- AQ requires DBA-provisioned queues plus enqueue/dequeue privileges. When the
910910
driver detects missing privileges it logs a warning and falls back to the

docs/guides/adapters/psqlpy.md

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -254,16 +254,15 @@ For comprehensive examples and migration guides, see:
254254

255255
## Event Channels
256256

257-
- Psqlpy enables native LISTEN/NOTIFY by default
258-
(`driver_features["events_backend"] = "listen_notify"`). Call
259-
`spec.event_channel(config)` to publish or consume without migrations.
257+
- Psqlpy defaults to native LISTEN/NOTIFY support (`backend="listen_notify"`).
258+
Call `spec.event_channel(config)` to obtain a channel—no migrations required.
260259
- Native listeners use the `Listener` API and a dedicated connection so the
261260
shared pool remains available for normal queries.
262-
- For durability and retries, set `driver_features["events_backend"] =
263-
"listen_notify_durable"` and include the `events` extension migrations.
264-
- The queue-only fallback remains available by setting
265-
`driver_features["events_backend"] = "table_queue"` alongside the
266-
`events` migrations.
261+
- For durability and retries, set `extension_config={"events": {"backend": "listen_notify_durable"}}`
262+
and include the `events` extension migrations.
263+
- Force the durable queue fallback (for deterministic testing or multi-tenant
264+
workloads) by setting `extension_config={"events": {"backend": "table_queue"}}`
265+
and including the `events` migrations.
267266

268267
## Best Practices
269268

docs/guides/adapters/psycopg.md

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -133,16 +133,15 @@ For comprehensive examples and migration guides, see:
133133

134134
## Event Channels
135135

136-
- Psycopg enables native LISTEN/NOTIFY support by default
137-
(`driver_features["events_backend"] = "listen_notify"`). Call
138-
`spec.event_channel(config)` to publish or consume without migrations.
139-
- Listeners run on dedicated connections so the pool remains available for
140-
transactional work.
141-
- For durability and retries, set `driver_features["events_backend"] =
142-
"listen_notify_durable"` and include the `events` extension migrations.
143-
- The queue-only fallback remains available by setting
144-
`driver_features["events_backend"] = "table_queue"` alongside the
145-
`events` migrations.
136+
- Psycopg defaults to native LISTEN/NOTIFY support (`backend="listen_notify"`).
137+
Call `spec.event_channel(config)` to obtain a channel—no migrations required.
138+
- Publishing uses `connection.notify()` under the hood; consumers rely on
139+
dedicated connections so the shared pool stays available for transactional work.
140+
- For durability and retries, set `extension_config={"events": {"backend": "listen_notify_durable"}}`
141+
and include the `events` extension migrations.
142+
- Force the durable queue fallback (for deterministic testing or multi-tenant
143+
workloads) by setting `extension_config={"events": {"backend": "table_queue"}}`
144+
and including the `events` migrations.
146145

147146
## Common Issues
148147

docs/guides/events/database-event-channels.md

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,15 @@ from sqlspec.adapters.sqlite import SqliteConfig
1515
spec = SQLSpec()
1616
config = SqliteConfig(
1717
connection_config={"database": ":memory:"},
18-
migration_config={
19-
"script_location": "migrations",
20-
"include_extensions": ["events"],
21-
},
18+
migration_config={"script_location": "migrations"},
2219
extension_config={
2320
"events": {
2421
"queue_table": "app_events",
2522
}
2623
},
2724
)
2825
spec.add_config(config)
26+
# Events migrations are auto-included when extension_config["events"] is present
2927

3028
channel = spec.event_channel(config)
3129
channel.publish("notifications", {"type": "cache_invalidate", "key": "user:1"})
@@ -38,7 +36,7 @@ for message in channel.iter_events("notifications"):
3836
## PostgreSQL native notifications
3937

4038
All async PostgreSQL adapters (AsyncPG, Psycopg async, and Psqlpy) support
41-
native `LISTEN/NOTIFY` via `events_backend="listen_notify"`. When enabled,
39+
native `LISTEN/NOTIFY` via `extension_config["events"]["backend"] = "listen_notify"`. When enabled,
4240
events flow directly through PostgreSQL's notification system with no
4341
migrations required.
4442

@@ -65,7 +63,7 @@ enabled as described below.
6563

6664
## Oracle Advanced Queuing (sync adapters)
6765

68-
Set ``config.driver_features["events_backend"] = "advanced_queue"`` to opt into native AQ.
66+
Set ``extension_config["events"]["backend"] = "advanced_queue"`` to opt into native AQ.
6967
When enabled, ``EventChannel`` publishes JSON payloads via ``connection.queue`` and
7068
skips the durable table migrations. The backend currently targets synchronous drivers
7169
(async configs fall back to the queue extension automatically).
@@ -81,21 +79,22 @@ and transparently falls back to the table-backed queue backend.
8179

8280
## Enabling the events extension
8381

84-
1. **Include the extension migrations**
82+
1. **Extension migrations are auto-included**
83+
84+
When `extension_config["events"]` is present, SQLSpec automatically
85+
includes the events extension in `migration_config["include_extensions"]`.
86+
Running `sqlspec upgrade` (or `config.migrate_up()`) applies
87+
`ext_events_0001`, which creates the queue table and composite index.
88+
89+
To skip the migrations (e.g., for ephemeral `listen_notify` backend):
8590

8691
```python
8792
migration_config={
8893
"script_location": "migrations",
89-
"include_extensions": ["events"],
94+
"exclude_extensions": ["events"], # Skip queue table migration
9095
}
9196
```
9297

93-
When `extension_config["events"]` is present SQLSpec automatically
94-
appends `"events"` to `include_extensions`, but setting it explicitly makes
95-
the intent clear and mirrors other extension guides. Running
96-
`sqlspec upgrade` (or `config.migrate_up()`) applies
97-
`ext_events_0001`, which creates the queue table and composite index.
98-
9998
2. **Provide extension settings (optional)**
10099

101100
```python
@@ -200,15 +199,19 @@ Manual iteration is also available via `channel.iter_events(...)` which yields
200199

201200
| Option | Default | Description |
202201
| ------------------ | ----------------------- | ----------- |
202+
| `backend` | adapter-specific | Backend implementation: `listen_notify`, `table_queue`, `listen_notify_durable`, or `advanced_queue`. |
203203
| `queue_table` | `sqlspec_event_queue` | Table name used by migrations and runtime. |
204204
| `lease_seconds` | `30` | How long a consumer owns a message before it can be retried. |
205205
| `retention_seconds`| `86400` | How long acknowledged rows remain before automatic cleanup. |
206206
| `poll_interval` | adapter-specific | Default sleep window between dequeue attempts; see table below. |
207207
| `in_memory` | `False` | Oracle-only flag that adds `INMEMORY PRIORITY HIGH` to the queue table. |
208-
| `aq_queue` | `SQLSPEC_EVENTS_QUEUE` | Native AQ queue name when `events_backend="advanced_queue"`. |
208+
| `aq_queue` | `SQLSPEC_EVENTS_QUEUE` | Native AQ queue name when `backend="advanced_queue"`. |
209209
| `aq_wait_seconds` | `5` | Wait timeout (seconds) for AQ dequeue operations. |
210210
| `aq_visibility` | *unset* | Optional visibility constant (e.g., `AQMSG_VISIBLE`). |
211211

212+
To skip migrations for ephemeral backends (e.g., `listen_notify`), use
213+
`migration_config={"exclude_extensions": ["events"]}`.
214+
212215
### Adapter defaults
213216

214217
`EventChannel` ships with tuned defaults per adapter so you rarely have to tweak the queue knobs. Override any value via `extension_config["events"]` when your workload differs from the defaults.
@@ -290,27 +293,28 @@ three backend types:
290293
| `advanced_queue` | Oracle Advanced Queuing | Enterprise Oracle deployments |
291294
| `table_queue` | Polling-based queue table | Universal fallback for any database |
292295

293-
Configure the backend via `driver_features["events_backend"]`:
296+
Configure the backend via `extension_config["events"]["backend"]`:
294297

295298
```python
296299
from sqlspec.adapters.asyncpg import AsyncpgConfig
297300

298301
# Native LISTEN/NOTIFY (default for PostgreSQL adapters)
299302
config = AsyncpgConfig(
300303
connection_config={"dsn": "postgresql://localhost/db"},
301-
driver_features={"events_backend": "listen_notify"},
304+
extension_config={"events": {"backend": "listen_notify"}},
302305
)
303306

304307
# Hybrid: durable queue with NOTIFY wakeups
305308
config = AsyncpgConfig(
306309
connection_config={"dsn": "postgresql://localhost/db"},
307-
driver_features={"events_backend": "listen_notify_durable"},
310+
extension_config={"events": {"backend": "listen_notify_durable"}},
308311
)
309312
```
310313

311314
Queue-backed backends (`listen_notify_durable`, `table_queue`) require the
312-
events extension migrations. Ensure `migration_config["include_extensions"]`
313-
contains `"events"` before publishing or consuming.
315+
events extension migrations. When `extension_config["events"]` is present,
316+
SQLSpec auto-includes the migrations. For ephemeral `listen_notify` only,
317+
use `migration_config={"exclude_extensions": ["events"]}`.
314318

315319
### Event Queue Stores
316320

sqlspec/adapters/adbc/config.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from collections.abc import Callable
44
from contextlib import contextmanager
5-
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypedDict, cast
5+
from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, cast
66

77
from typing_extensions import NotRequired
88

@@ -141,8 +141,6 @@ class AdbcDriverFeatures(TypedDict):
141141
enable_cast_detection: NotRequired[bool]
142142
strict_type_coercion: NotRequired[bool]
143143
arrow_extension_types: NotRequired[bool]
144-
enable_events: NotRequired[bool]
145-
events_backend: NotRequired[Literal["table_queue"]]
146144

147145

148146
__all__ = ("AdbcConfig", "AdbcConnectionParams", "AdbcDriverFeatures")
@@ -205,10 +203,6 @@ def __init__(
205203
processed_driver_features.setdefault("strict_type_coercion", False)
206204
processed_driver_features.setdefault("arrow_extension_types", True)
207205

208-
# Auto-detect events support based on extension_config
209-
processed_driver_features.setdefault("enable_events", "events" in (extension_config or {}))
210-
processed_driver_features.setdefault("events_backend", "table_queue")
211-
212206
if json_serializer is not None:
213207
statement_config = _apply_json_serializer_to_statement_config(statement_config, json_serializer)
214208

sqlspec/adapters/aiosqlite/config.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Aiosqlite database configuration."""
22

33
from contextlib import asynccontextmanager
4-
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypedDict
4+
from typing import TYPE_CHECKING, Any, ClassVar, TypedDict
55

66
from typing_extensions import NotRequired
77

@@ -82,8 +82,6 @@ class AiosqliteDriverFeatures(TypedDict):
8282
enable_custom_adapters: NotRequired[bool]
8383
json_serializer: "NotRequired[Callable[[Any], str]]"
8484
json_deserializer: "NotRequired[Callable[[str], Any]]"
85-
enable_events: NotRequired[bool]
86-
events_backend: NotRequired[Literal["table_queue"]]
8785

8886

8987
class AiosqliteConfig(AsyncDatabaseConfig["AiosqliteConnection", AiosqliteConnectionPool, AiosqliteDriver]):
@@ -145,10 +143,6 @@ def __init__(
145143
json_serializer = processed_driver_features.setdefault("json_serializer", to_json)
146144
json_deserializer = processed_driver_features.setdefault("json_deserializer", from_json)
147145

148-
# Auto-detect events support based on extension_config
149-
processed_driver_features.setdefault("enable_events", "events" in (extension_config or {}))
150-
processed_driver_features.setdefault("events_backend", "table_queue")
151-
152146
base_statement_config = statement_config or aiosqlite_statement_config
153147
if json_serializer is not None:
154148
parameter_config = base_statement_config.parameter_config.with_json_serializers(

sqlspec/adapters/asyncmy/config.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from collections.abc import AsyncGenerator
44
from contextlib import asynccontextmanager
5-
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypedDict
5+
from typing import TYPE_CHECKING, Any, ClassVar, TypedDict
66

77
import asyncmy
88
from asyncmy.cursors import Cursor, DictCursor # pyright: ignore
@@ -90,8 +90,6 @@ class AsyncmyDriverFeatures(TypedDict):
9090

9191
json_serializer: NotRequired["Callable[[Any], str]"]
9292
json_deserializer: NotRequired["Callable[[str], Any]"]
93-
enable_events: NotRequired[bool]
94-
events_backend: NotRequired[Literal["table_queue"]]
9593

9694

9795
class AsyncmyConfig(AsyncDatabaseConfig[AsyncmyConnection, "AsyncmyPool", AsyncmyDriver]): # pyright: ignore
@@ -144,10 +142,6 @@ def __init__(
144142
serializer = processed_driver_features.setdefault("json_serializer", to_json)
145143
deserializer = processed_driver_features.setdefault("json_deserializer", from_json)
146144

147-
# Auto-detect events support based on extension_config
148-
processed_driver_features.setdefault("enable_events", "events" in (extension_config or {}))
149-
processed_driver_features.setdefault("events_backend", "table_queue")
150-
151145
base_statement_config = statement_config or build_asyncmy_statement_config(
152146
json_serializer=serializer, json_deserializer=deserializer
153147
)

0 commit comments

Comments
 (0)