Skip to content

Commit 6361af6

Browse files
committed
feat: Add unified stream state management for resume and deduplication
- StreamStateStore interface with in-memory, null, and DB-backed implementations - Checkpoint management for resume after interruptions - Idempotency tracking to prevent duplicate processing - Block range tracking with gap detection - Reorg invalidation support Key features: - Resume from last processed position after crashes - Exactly-once semantics via batch deduplication - Gap detection and intelligent backfill - Support for multiple networks and tables
1 parent 6c26b94 commit 6361af6

File tree

4 files changed

+1202
-0
lines changed

4 files changed

+1202
-0
lines changed

sql/snowflake_stream_state.sql

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
-- Snowflake Stream State Table
2+
-- Stores server-confirmed completed batches for persistent job resumption
3+
--
4+
-- This table tracks which batches have been successfully processed and confirmed
5+
-- by the server (via checkpoint watermarks). This enables jobs to resume from
6+
-- the correct position after interruption or failure.
7+
8+
CREATE TABLE IF NOT EXISTS amp_stream_state (
9+
-- Job/Table identification
10+
connection_name VARCHAR(255) NOT NULL,
11+
table_name VARCHAR(255) NOT NULL,
12+
network VARCHAR(100) NOT NULL,
13+
14+
-- Batch identification (compact 16-char hex ID)
15+
batch_id VARCHAR(16) NOT NULL,
16+
17+
-- Block range covered by this batch
18+
start_block BIGINT NOT NULL,
19+
end_block BIGINT NOT NULL,
20+
21+
-- Block hashes for reorg detection (optional)
22+
end_hash VARCHAR(66),
23+
start_parent_hash VARCHAR(66),
24+
25+
-- Processing metadata
26+
processed_at TIMESTAMP_NTZ NOT NULL DEFAULT CURRENT_TIMESTAMP(),
27+
28+
-- Primary key ensures no duplicate batches
29+
PRIMARY KEY (connection_name, table_name, network, batch_id)
30+
);
31+
32+
-- Index for fast resume position queries
33+
CREATE INDEX IF NOT EXISTS idx_stream_state_resume
34+
ON amp_stream_state (connection_name, table_name, network, end_block);
35+
36+
-- Index for fast reorg invalidation queries
37+
CREATE INDEX IF NOT EXISTS idx_stream_state_blocks
38+
ON amp_stream_state (connection_name, table_name, network, start_block, end_block);
39+
40+
-- Comments for documentation
41+
COMMENT ON TABLE amp_stream_state IS 'Persistent stream state for job resumption - tracks server-confirmed completed batches';
42+
COMMENT ON COLUMN amp_stream_state.batch_id IS 'Compact 16-character hex identifier generated from block range + hash';
43+
COMMENT ON COLUMN amp_stream_state.processed_at IS 'Timestamp when batch was marked as successfully processed';

src/amp/streaming/checkpoint.py

Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
"""
2+
Checkpointing system for durable streaming with resume capability.
3+
4+
Provides checkpoint management to enable resuming streaming queries from
5+
the last successfully processed position, with support for reorg detection
6+
and invalidation.
7+
"""
8+
9+
import json
10+
import logging
11+
from abc import ABC, abstractmethod
12+
from dataclasses import dataclass
13+
from datetime import datetime
14+
from typing import Dict, List, Optional
15+
16+
from .types import BlockRange, ResumeWatermark
17+
18+
logger = logging.getLogger(__name__)
19+
20+
21+
@dataclass
22+
class CheckpointConfig:
23+
"""Configuration for checkpoint behavior."""
24+
25+
enabled: bool = False # Opt-in for backward compatibility
26+
storage: str = 'db' # 'db', 'external', or 'both'
27+
table_prefix: str = 'amp_'
28+
29+
30+
@dataclass
31+
class CheckpointState:
32+
"""
33+
State of a checkpoint, storing server-provided metadata for resume.
34+
35+
This directly stores the block ranges and metadata from the server,
36+
allowing us to create a ResumeWatermark for resuming streams.
37+
"""
38+
39+
# Server metadata (from ranges field)
40+
ranges: List[BlockRange]
41+
42+
# Client metadata
43+
timestamp: datetime
44+
worker_id: int = 0
45+
is_reorg: bool = False # True if this checkpoint was created due to a reorg
46+
47+
def to_resume_watermark(self) -> ResumeWatermark:
48+
"""
49+
Convert checkpoint to ResumeWatermark for resuming streaming queries.
50+
51+
Uses the block ranges from the server to create a watermark that
52+
tells the server where to resume from.
53+
"""
54+
return ResumeWatermark(
55+
ranges=[r.to_dict() for r in self.ranges],
56+
timestamp=self.timestamp.isoformat() if self.timestamp else None,
57+
)
58+
59+
def to_dict(self) -> Dict:
60+
"""Convert to dictionary for serialization"""
61+
return {
62+
'ranges': [r.to_dict() for r in self.ranges],
63+
'timestamp': self.timestamp.isoformat(),
64+
'worker_id': self.worker_id,
65+
'is_reorg': self.is_reorg,
66+
}
67+
68+
@classmethod
69+
def from_dict(cls, data: Dict) -> 'CheckpointState':
70+
"""Create from dictionary"""
71+
return cls(
72+
ranges=[BlockRange.from_dict(r) for r in data['ranges']],
73+
timestamp=datetime.fromisoformat(data['timestamp']),
74+
worker_id=data.get('worker_id', 0),
75+
is_reorg=data.get('is_reorg', False),
76+
)
77+
78+
79+
class CheckpointStore(ABC):
80+
"""
81+
Abstract interface for checkpoint storage.
82+
83+
Implementations can store checkpoints in databases, external stores
84+
like Redis/S3, or both.
85+
"""
86+
87+
def __init__(self, config: CheckpointConfig):
88+
self.config = config
89+
90+
@abstractmethod
91+
def save(self, connection_name: str, table_name: str, checkpoint: CheckpointState) -> None:
92+
"""Save a checkpoint for a specific connection and table"""
93+
pass
94+
95+
@abstractmethod
96+
def load(self, connection_name: str, table_name: str, worker_id: int = 0) -> Optional[CheckpointState]:
97+
"""Load the latest checkpoint for a connection and table"""
98+
pass
99+
100+
@abstractmethod
101+
def delete_for_network(self, connection_name: str, table_name: str, network: str) -> None:
102+
"""Delete all checkpoints for a specific network (used after reorgs)"""
103+
pass
104+
105+
@abstractmethod
106+
def delete(self, connection_name: str, table_name: str, worker_id: int = 0) -> None:
107+
"""Delete a checkpoint"""
108+
pass
109+
110+
111+
class DatabaseCheckpointStore(CheckpointStore):
112+
"""
113+
Store checkpoints in the destination database.
114+
115+
This implementation requires the destination database to support a
116+
checkpoints table (currently PostgreSQL, Snowflake, etc.).
117+
"""
118+
119+
def __init__(self, config: CheckpointConfig, db_connection):
120+
"""
121+
Initialize database checkpoint store.
122+
123+
Args:
124+
config: Checkpoint configuration
125+
db_connection: Database connection object with execute() method
126+
"""
127+
super().__init__(config)
128+
self.conn = db_connection
129+
self._ensure_table_exists()
130+
131+
def _ensure_table_exists(self):
132+
"""Ensure the checkpoints table exists in the database"""
133+
table_name = f'{self.config.table_prefix}checkpoints'
134+
135+
# Try to create table (PostgreSQL syntax - will need adaptation for other DBs)
136+
create_sql = f"""
137+
CREATE TABLE IF NOT EXISTS {table_name} (
138+
connection_name VARCHAR(255) NOT NULL,
139+
table_name VARCHAR(255) NOT NULL,
140+
worker_id INT NOT NULL DEFAULT 0,
141+
checkpoint_data JSONB NOT NULL,
142+
timestamp TIMESTAMP NOT NULL,
143+
metadata JSONB,
144+
PRIMARY KEY (connection_name, table_name, worker_id)
145+
)
146+
"""
147+
148+
try:
149+
cursor = self.conn.cursor()
150+
cursor.execute(create_sql)
151+
self.conn.commit()
152+
logger.debug(f'Ensured checkpoint table {table_name} exists')
153+
except Exception as e:
154+
# Table might already exist or DB might not support IF NOT EXISTS
155+
logger.debug(f'Checkpoint table creation skipped: {e}')
156+
self.conn.rollback()
157+
158+
def save(self, connection_name: str, table_name: str, checkpoint: CheckpointState) -> None:
159+
"""Save checkpoint to database"""
160+
if not self.config.enabled:
161+
return
162+
163+
table = f'{self.config.table_prefix}checkpoints'
164+
165+
# Serialize entire checkpoint to JSON
166+
checkpoint_json = json.dumps(checkpoint.to_dict())
167+
168+
# Upsert checkpoint
169+
upsert_sql = f"""
170+
INSERT INTO {table} (connection_name, table_name, worker_id, checkpoint_data, timestamp)
171+
VALUES (%s, %s, %s, %s::jsonb, %s)
172+
ON CONFLICT (connection_name, table_name, worker_id)
173+
DO UPDATE SET
174+
checkpoint_data = EXCLUDED.checkpoint_data,
175+
timestamp = EXCLUDED.timestamp
176+
"""
177+
178+
try:
179+
cursor = self.conn.cursor()
180+
cursor.execute(
181+
upsert_sql,
182+
(
183+
connection_name,
184+
table_name,
185+
checkpoint.worker_id,
186+
checkpoint_json,
187+
checkpoint.timestamp,
188+
),
189+
)
190+
self.conn.commit()
191+
logger.info(
192+
f'Saved checkpoint for {connection_name}.{table_name} '
193+
f'(worker {checkpoint.worker_id}, {len(checkpoint.ranges)} ranges)'
194+
)
195+
except Exception as e:
196+
logger.error(f'Failed to save checkpoint: {e}')
197+
self.conn.rollback()
198+
raise
199+
200+
def load(self, connection_name: str, table_name: str, worker_id: int = 0) -> Optional[CheckpointState]:
201+
"""Load checkpoint from database"""
202+
if not self.config.enabled:
203+
return None
204+
205+
table = f'{self.config.table_prefix}checkpoints'
206+
207+
select_sql = f"""
208+
SELECT checkpoint_data
209+
FROM {table}
210+
WHERE connection_name = %s AND table_name = %s AND worker_id = %s
211+
"""
212+
213+
try:
214+
cursor = self.conn.cursor()
215+
cursor.execute(select_sql, (connection_name, table_name, worker_id))
216+
row = cursor.fetchone()
217+
218+
if not row:
219+
logger.debug(f'No checkpoint found for {connection_name}.{table_name}')
220+
return None
221+
222+
checkpoint_json = row[0]
223+
224+
# Parse checkpoint from JSON
225+
if isinstance(checkpoint_json, str):
226+
checkpoint_data = json.loads(checkpoint_json)
227+
else:
228+
# Already parsed (psycopg2 with RealDictCursor)
229+
checkpoint_data = checkpoint_json
230+
231+
checkpoint = CheckpointState.from_dict(checkpoint_data)
232+
233+
logger.info(
234+
f'Loaded checkpoint for {connection_name}.{table_name} '
235+
f'(worker {checkpoint.worker_id}, {len(checkpoint.ranges)} ranges)'
236+
)
237+
return checkpoint
238+
239+
except Exception as e:
240+
logger.error(f'Failed to load checkpoint: {e}')
241+
return None
242+
243+
def delete_for_network(self, connection_name: str, table_name: str, network: str) -> None:
244+
"""
245+
Delete all checkpoints containing ranges for a specific network.
246+
247+
This is called after a reorg to force full rescan of affected ranges.
248+
Checkpoints are JSON so we search within the stored data.
249+
"""
250+
if not self.config.enabled:
251+
return
252+
253+
table = f'{self.config.table_prefix}checkpoints'
254+
255+
# Delete checkpoints where the network appears in the checkpoint_data JSON
256+
# This uses JSON containment checking (PostgreSQL jsonb @> operator)
257+
delete_sql = f"""
258+
DELETE FROM {table}
259+
WHERE connection_name = %s AND table_name = %s
260+
AND checkpoint_data::text LIKE %s
261+
"""
262+
263+
try:
264+
cursor = self.conn.cursor()
265+
# Search for "network": "<network>" in the JSON (note: JSONB adds space after colon)
266+
network_pattern = f'%"network": "{network}"%'
267+
cursor.execute(delete_sql, (connection_name, table_name, network_pattern))
268+
deleted_count = cursor.rowcount
269+
self.conn.commit()
270+
271+
if deleted_count > 0:
272+
logger.warning(
273+
f'Deleted {deleted_count} checkpoint(s) for {connection_name}.{table_name} '
274+
f'after reorg on network {network}'
275+
)
276+
except Exception as e:
277+
logger.error(f'Failed to delete checkpoints for network {network}: {e}')
278+
self.conn.rollback()
279+
280+
def delete(self, connection_name: str, table_name: str, worker_id: int = 0) -> None:
281+
"""Delete a specific checkpoint"""
282+
if not self.config.enabled:
283+
return
284+
285+
table = f'{self.config.table_prefix}checkpoints'
286+
287+
delete_sql = f"""
288+
DELETE FROM {table}
289+
WHERE connection_name = %s AND table_name = %s AND worker_id = %s
290+
"""
291+
292+
try:
293+
cursor = self.conn.cursor()
294+
cursor.execute(delete_sql, (connection_name, table_name, worker_id))
295+
self.conn.commit()
296+
logger.info(f'Deleted checkpoint for {connection_name}.{table_name} (worker {worker_id})')
297+
except Exception as e:
298+
logger.error(f'Failed to delete checkpoint: {e}')
299+
self.conn.rollback()
300+
301+
302+
class NullCheckpointStore(CheckpointStore):
303+
"""No-op checkpoint store for when checkpointing is disabled"""
304+
305+
def save(self, connection_name: str, table_name: str, checkpoint: CheckpointState) -> None:
306+
pass
307+
308+
def load(self, connection_name: str, table_name: str, worker_id: int = 0) -> Optional[CheckpointState]:
309+
return None
310+
311+
def delete_for_network(self, connection_name: str, table_name: str, network: str) -> None:
312+
pass
313+
314+
def delete(self, connection_name: str, table_name: str, worker_id: int = 0) -> None:
315+
pass

0 commit comments

Comments
 (0)