Skip to content

Commit 6a5b807

Browse files
authored
feat: simplify event backend (#297)
1 parent b844333 commit 6a5b807

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+3305
-2422
lines changed

AGENTS.md

Lines changed: 102 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,35 @@ SQLSpec is a type-safe SQL query mapper designed for minimal abstraction between
9696
- **Single-Pass Processing**: Parse once → transform once → validate once - SQL object is single source of truth
9797
- **Abstract Methods with Concrete Implementations**: Protocol defines abstract methods, base classes provide concrete sync/async implementations
9898

99+
### Adapter Transaction Detection Pattern
100+
101+
Each adapter MUST override `_connection_in_transaction()` with direct attribute access instead of using the base class fallback which relies on `getattr()` chains.
102+
103+
```python
104+
# In each adapter's driver.py
105+
class MyAdapterDriver(SyncDriverBase):
106+
def _connection_in_transaction(self) -> bool:
107+
# AsyncPG: uses is_in_transaction() method
108+
return self.connection.is_in_transaction()
109+
110+
# SQLite/DuckDB: uses in_transaction property
111+
return self.connection.in_transaction
112+
113+
# Psycopg: uses status attribute
114+
return self.connection.status != psycopg.pq.TransactionStatus.IDLE
115+
116+
# BigQuery: No transaction support
117+
return False
118+
```
119+
120+
**Why this matters:**
121+
122+
- The base class uses `getattr()` chains which are slow and prevent mypyc optimization
123+
- Each adapter knows exactly which attribute to check
124+
- Direct attribute access is 10-50x faster in hot paths
125+
126+
**Reference implementations:** All adapters in `sqlspec/adapters/*/driver.py` have this override.
127+
99128
### Query Stack Implementation Guidelines
100129

101130
- **Builder Discipline**
@@ -290,6 +319,60 @@ def test_starlette_autocommit_mode() -> None:
290319
- No inline comments - use docstrings
291320
- Google-style docstrings with Args, Returns, Raises sections
292321

322+
### Type Guards Pattern
323+
324+
When checking object capabilities, ALWAYS use type guards from `sqlspec.utils.type_guards` instead of `hasattr()`:
325+
326+
```python
327+
# NEVER do this - breaks mypyc and is slow
328+
if hasattr(obj, "expression") and hasattr(obj, "sql"):
329+
sql = obj.sql
330+
expr = obj.expression
331+
332+
# ALWAYS do this - type-safe and fast
333+
from sqlspec.utils.type_guards import has_expression_and_sql
334+
if has_expression_and_sql(obj):
335+
sql = obj.sql # Type checker knows these exist
336+
expr = obj.expression
337+
```
338+
339+
**Available type guards:**
340+
341+
| Guard | Checks For | Use When |
342+
|-------|------------|----------|
343+
| `is_readable(obj)` | `.read()` method | Checking LOB/stream objects |
344+
| `has_array_interface(obj)` | `.dtype` and `.shape` | NumPy-like arrays |
345+
| `has_cursor_metadata(obj)` | `.description` | Cursor metadata access |
346+
| `has_expression_and_sql(obj)` | `.expression` and `.sql` | SQL statement objects |
347+
| `has_expression_and_parameters(obj)` | `.expression` and `.parameters` | Parameterized statements |
348+
| `is_statement_filter(obj)` | `.append_to_statement()` | Statement filter objects |
349+
350+
**Creating new type guards:**
351+
352+
1. Add protocol to `sqlspec/protocols.py`:
353+
354+
```python
355+
class MyProtocol(Protocol):
356+
"""Protocol for objects with specific capability."""
357+
my_attribute: str
358+
def my_method(self) -> None: ...
359+
```
360+
361+
2. Add type guard to `sqlspec/utils/type_guards.py`:
362+
363+
```python
364+
def has_my_capability(obj: Any) -> TypeGuard[MyProtocol]:
365+
"""Check if object has my_attribute and my_method."""
366+
return hasattr(obj, "my_attribute") and hasattr(obj, "my_method")
367+
```
368+
369+
**Key principles:**
370+
371+
- Type guards centralize `hasattr()` calls in ONE place
372+
- Protocols provide type narrowing after the guard passes
373+
- Type checkers understand the guard's implications
374+
- Works with mypyc compilation
375+
293376
### Testing
294377

295378
- **MANDATORY**: Function-based tests only (`def test_something():`)
@@ -887,26 +970,37 @@ Available backends:
887970
PostgreSQL adapters (asyncpg, psycopg, psqlpy) default to `listen_notify`.
888971
All other adapters default to `table_queue`.
889972

890-
**Stores** generate adapter-specific DDL for the queue table:
973+
**Stores** generate adapter-specific DDL using a hook-based pattern:
891974

892975
```python
893976
# In sqlspec/adapters/{adapter}/events/store.py
894977
class AdapterEventQueueStore(BaseEventQueueStore[AdapterConfig]):
895978
__slots__ = ()
896979

980+
# REQUIRED: Return (payload_type, metadata_type, timestamp_type)
897981
def _column_types(self) -> tuple[str, str, str]:
898-
# Return (payload_type, metadata_type, timestamp_type)
899982
return "JSONB", "JSONB", "TIMESTAMPTZ" # PostgreSQL
900983

901-
def _build_create_table_sql(self) -> str:
902-
# Override for database-specific DDL syntax
903-
return super()._build_create_table_sql()
984+
# OPTIONAL hooks for dialect variations:
985+
def _string_type(self, length: int) -> str:
986+
return f"VARCHAR({length})" # Override for STRING(N), VARCHAR2(N), etc.
904987

905-
def _wrap_create_statement(self, statement: str, object_type: str) -> str:
906-
# Wrap with IF NOT EXISTS, PL/SQL blocks, etc.
907-
return statement.replace("CREATE TABLE", "CREATE TABLE IF NOT EXISTS", 1)
988+
def _integer_type(self) -> str:
989+
return "INTEGER" # Override for INT64, NUMBER(10), etc.
990+
991+
def _timestamp_default(self) -> str:
992+
return "CURRENT_TIMESTAMP" # Override for CURRENT_TIMESTAMP(6), SYSTIMESTAMP, etc.
993+
994+
def _primary_key_syntax(self) -> str:
995+
return "" # Override for " PRIMARY KEY (event_id)" if PK must be inline
996+
997+
def _table_clause(self) -> str:
998+
return "" # Override for " CLUSTER BY ..." or " INMEMORY ..."
908999
```
9091000

1001+
Most adapters only override `_column_types()`. Complex dialects may override
1002+
`_build_create_table_sql()` directly (Oracle PL/SQL, BigQuery CLUSTER BY, Spanner no-DEFAULT).
1003+
9101004
**Backend factory pattern** for native backends:
9111005

9121006
```python

docs/guides/events/database-event-channels.md

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -318,13 +318,23 @@ use `migration_config={"exclude_extensions": ["events"]}`.
318318

319319
### Event Queue Stores
320320

321-
Stores generate adapter-specific DDL for the queue table. Each adapter has
322-
a store class in `sqlspec/adapters/{adapter}/events/store.py` that handles:
323-
324-
- Column type mapping (JSON, JSONB, CLOB, etc.)
325-
- Timestamp types (TIMESTAMPTZ, DATETIME, TIMESTAMP)
326-
- Index creation strategies
327-
- Database-specific DDL wrapping (IF NOT EXISTS, PL/SQL blocks, etc.)
321+
Stores generate adapter-specific DDL for the queue table using a hook-based
322+
pattern. The base class `BaseEventQueueStore` provides a template for DDL
323+
generation, and adapters override hook methods for dialect-specific variations:
324+
325+
**Required hook:**
326+
- `_column_types()` - Return tuple of (payload_type, metadata_type, timestamp_type)
327+
328+
**Optional hooks for dialect variations:**
329+
- `_string_type(length)` - String type syntax (default: `VARCHAR(N)`)
330+
- `_integer_type()` - Integer type syntax (default: `INTEGER`)
331+
- `_timestamp_default()` - Timestamp default expression (default: `CURRENT_TIMESTAMP`)
332+
- `_primary_key_syntax()` - Inline PRIMARY KEY clause (default: empty, PK on column)
333+
- `_table_clause()` - Additional table options (default: empty)
334+
335+
Most adapters only need to override `_column_types()`. Complex dialects
336+
(Oracle PL/SQL, BigQuery CLUSTER BY, Spanner no-DEFAULT) may override
337+
`_build_create_table_sql()` directly.
328338

329339
Example store implementations:
330340

sqlspec/adapters/adbc/driver.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,16 @@ def commit(self) -> None:
605605
msg = f"Failed to commit transaction: {e}"
606606
raise SQLSpecError(msg) from e
607607

608+
def _connection_in_transaction(self) -> bool:
609+
"""Check if connection is in transaction.
610+
611+
ADBC uses explicit BEGIN and does not expose reliable transaction state.
612+
613+
Returns:
614+
False - ADBC requires explicit transaction management.
615+
"""
616+
return False
617+
608618
@property
609619
def data_dictionary(self) -> "SyncDataDictionaryBase":
610620
"""Get the data dictionary for this driver.

sqlspec/adapters/aiosqlite/driver.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,14 @@ async def _truncate_table_async(self, table: str) -> None:
390390
async with self.handle_database_exceptions(), self.with_cursor(self.connection) as cursor:
391391
await cursor.execute(statement)
392392

393+
def _connection_in_transaction(self) -> bool:
394+
"""Check if connection is in transaction.
395+
396+
Returns:
397+
True if connection is in an active transaction.
398+
"""
399+
return bool(self.connection.in_transaction)
400+
393401
@property
394402
def data_dictionary(self) -> "AsyncDataDictionaryBase":
395403
"""Get the data dictionary for this driver.

sqlspec/adapters/asyncmy/driver.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,16 @@ async def _truncate_table_async(self, table: str) -> None:
550550
async with self.handle_database_exceptions(), self.with_cursor(self.connection) as cursor:
551551
await cursor.execute(statement)
552552

553+
def _connection_in_transaction(self) -> bool:
554+
"""Check if connection is in transaction.
555+
556+
AsyncMy uses explicit BEGIN and does not expose reliable transaction state.
557+
558+
Returns:
559+
False - AsyncMy requires explicit transaction management.
560+
"""
561+
return False
562+
553563
@property
554564
def data_dictionary(self) -> "AsyncDataDictionaryBase":
555565
"""Get the data dictionary for this driver.

sqlspec/adapters/asyncmy/events/store.py

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -55,31 +55,16 @@ def _column_types(self) -> tuple[str, str, str]:
5555
"""
5656
return "JSON", "JSON", "DATETIME(6)"
5757

58-
def _build_create_table_sql(self) -> str:
59-
"""Build MySQL-specific CREATE TABLE SQL.
58+
def _timestamp_default(self) -> str:
59+
"""Return MySQL timestamp default expression.
6060
6161
MySQL requires CURRENT_TIMESTAMP(6) for DATETIME(6) columns,
6262
not just CURRENT_TIMESTAMP which is only valid for TIMESTAMP type.
6363
6464
Returns:
65-
CREATE TABLE SQL statement with MySQL-specific defaults.
65+
MySQL-specific timestamp default with microsecond precision.
6666
"""
67-
payload_type, metadata_type, timestamp_type = self._column_types()
68-
table_clause = self._table_clause()
69-
return (
70-
f"CREATE TABLE {self.table_name} ("
71-
"event_id VARCHAR(64) PRIMARY KEY,"
72-
" channel VARCHAR(128) NOT NULL,"
73-
f" payload_json {payload_type} NOT NULL,"
74-
f" metadata_json {metadata_type},"
75-
" status VARCHAR(32) NOT NULL DEFAULT 'pending',"
76-
f" available_at {timestamp_type} NOT NULL DEFAULT CURRENT_TIMESTAMP(6),"
77-
f" lease_expires_at {timestamp_type},"
78-
" attempts INTEGER NOT NULL DEFAULT 0,"
79-
f" created_at {timestamp_type} NOT NULL DEFAULT CURRENT_TIMESTAMP(6),"
80-
f" acknowledged_at {timestamp_type}"
81-
f") {table_clause}"
82-
)
67+
return "CURRENT_TIMESTAMP(6)"
8368

8469
def _build_index_sql(self) -> str | None:
8570
"""Build MySQL conditional index creation SQL.

sqlspec/adapters/asyncpg/driver.py

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -347,47 +347,57 @@ async def _execute_stack_native(
347347
) -> "tuple[StackResult, ...]":
348348
results: list[StackResult] = []
349349

350-
async def _run_operations(observer: StackExecutionObserver) -> None:
351-
for index, operation in enumerate(stack.operations):
352-
try:
353-
normalized = None
354-
if operation.method == "execute":
355-
normalized = self._normalize_stack_execute_operation(operation)
356-
357-
if normalized is not None and self._can_prepare_stack_operation(normalized):
358-
stack_result = await self._execute_stack_operation_prepared(normalized)
359-
else:
360-
result = await self._execute_stack_operation(operation)
361-
stack_result = StackResult(result=result)
362-
except Exception as exc:
363-
stack_error = StackExecutionError(
364-
index,
365-
describe_stack_statement(operation.statement),
366-
exc,
367-
adapter=type(self).__name__,
368-
mode="continue-on-error" if continue_on_error else "fail-fast",
369-
)
370-
if continue_on_error:
371-
observer.record_operation_error(stack_error)
372-
results.append(StackResult.from_error(stack_error))
373-
continue
374-
raise stack_error from exc
375-
376-
results.append(stack_result)
377-
378350
transaction_cm = None
379351
if not continue_on_error and not self._connection_in_transaction():
380352
transaction_cm = self.connection.transaction()
381353

382354
with StackExecutionObserver(self, stack, continue_on_error, native_pipeline=True) as observer:
383355
if transaction_cm is not None:
384356
async with transaction_cm:
385-
await _run_operations(observer)
357+
await self._run_stack_operations(stack, continue_on_error, observer, results)
386358
else:
387-
await _run_operations(observer)
359+
await self._run_stack_operations(stack, continue_on_error, observer, results)
388360

389361
return tuple(results)
390362

363+
async def _run_stack_operations(
364+
self,
365+
stack: "StatementStack",
366+
continue_on_error: bool,
367+
observer: "StackExecutionObserver",
368+
results: "list[StackResult]",
369+
) -> None:
370+
"""Run operations for statement stack execution.
371+
372+
Extracted from _execute_stack_native to avoid closure compilation issues.
373+
"""
374+
for index, operation in enumerate(stack.operations):
375+
try:
376+
normalized = None
377+
if operation.method == "execute":
378+
normalized = self._normalize_stack_execute_operation(operation)
379+
380+
if normalized is not None and self._can_prepare_stack_operation(normalized):
381+
stack_result = await self._execute_stack_operation_prepared(normalized)
382+
else:
383+
result = await self._execute_stack_operation(operation)
384+
stack_result = StackResult(result=result)
385+
except Exception as exc:
386+
stack_error = StackExecutionError(
387+
index,
388+
describe_stack_statement(operation.statement),
389+
exc,
390+
adapter=type(self).__name__,
391+
mode="continue-on-error" if continue_on_error else "fail-fast",
392+
)
393+
if continue_on_error:
394+
observer.record_operation_error(stack_error)
395+
results.append(StackResult.from_error(stack_error))
396+
continue
397+
raise stack_error from exc
398+
399+
results.append(stack_result)
400+
391401
async def _execute_statement(self, cursor: "AsyncpgConnection", statement: "SQL") -> "ExecutionResult":
392402
"""Execute single SQL statement.
393403
@@ -623,6 +633,10 @@ async def _truncate_table(self, table: str) -> None:
623633
msg = f"Failed to truncate table '{table}': {exc}"
624634
raise SQLSpecError(msg) from exc
625635

636+
def _connection_in_transaction(self) -> bool:
637+
"""Check if connection is in transaction."""
638+
return bool(self.connection.is_in_transaction())
639+
626640

627641
def _convert_datetime_param(value: Any) -> Any:
628642
"""Convert datetime parameter, handling ISO strings."""

0 commit comments

Comments
 (0)