Skip to content

Commit 0fd23e7

Browse files
committed
streaming: Add idempotency system
- IdempotencyConfig: Simple config (no mode parameter - auto-detect) - ProcessedRange: Track processed block ranges with metadata - ProcessedRangesStore: Abstract interface for storage backends - DatabaseProcessedRangesStore: PostgreSQL implementation - NullProcessedRangesStore: No-op for disabled state - compute_batch_hash: SHA256 content verification
1 parent 4bb653b commit 0fd23e7

File tree

2 files changed

+630
-0
lines changed

2 files changed

+630
-0
lines changed

src/amp/streaming/idempotency.py

Lines changed: 381 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,381 @@
1+
"""
2+
Idempotency system for exactly-once processing semantics.
3+
4+
Tracks processed block ranges to prevent duplicate processing of the same data,
5+
providing exactly-once guarantees for financial and mission-critical applications.
6+
"""
7+
8+
import hashlib
9+
import json
10+
import logging
11+
from abc import ABC, abstractmethod
12+
from dataclasses import dataclass
13+
from datetime import datetime
14+
from typing import List, Optional
15+
16+
from .types import BlockRange
17+
18+
logger = logging.getLogger(__name__)
19+
20+
21+
@dataclass
22+
class IdempotencyConfig:
23+
"""Configuration for idempotency behavior.
24+
25+
Mode is auto-detected:
26+
- Transactional: If loader has load_batch_transactional() method
27+
- Tracking: Otherwise (uses separate check/load/mark flow)
28+
"""
29+
30+
enabled: bool = False # Opt-in for backward compatibility
31+
table_prefix: str = 'amp_'
32+
verification_hash: bool = False # Compute and verify batch content hashes
33+
cleanup_days: int = 30 # Auto-cleanup processed ranges older than this
34+
35+
36+
@dataclass
37+
class ProcessedRange:
38+
"""
39+
Record of a processed block range.
40+
41+
Stores information about which block ranges have been successfully processed
42+
to enable duplicate detection and exactly-once semantics.
43+
"""
44+
45+
connection_name: str
46+
table_name: str
47+
network: str
48+
start_block: int
49+
end_block: int
50+
processed_at: datetime
51+
batch_hash: Optional[str] = None # Optional: content verification hash
52+
53+
def matches_range(self, block_range: BlockRange) -> bool:
54+
"""Check if this processed range matches a block range"""
55+
return (
56+
self.network == block_range.network
57+
and self.start_block == block_range.start
58+
and self.end_block == block_range.end
59+
)
60+
61+
def to_dict(self) -> dict:
62+
"""Convert to dictionary for serialization"""
63+
return {
64+
'connection_name': self.connection_name,
65+
'table_name': self.table_name,
66+
'network': self.network,
67+
'start_block': self.start_block,
68+
'end_block': self.end_block,
69+
'processed_at': self.processed_at.isoformat(),
70+
'batch_hash': self.batch_hash,
71+
}
72+
73+
@classmethod
74+
def from_dict(cls, data: dict) -> 'ProcessedRange':
75+
"""Create from dictionary"""
76+
return cls(
77+
connection_name=data['connection_name'],
78+
table_name=data['table_name'],
79+
network=data['network'],
80+
start_block=data['start_block'],
81+
end_block=data['end_block'],
82+
processed_at=datetime.fromisoformat(data['processed_at']),
83+
batch_hash=data.get('batch_hash'),
84+
)
85+
86+
87+
class ProcessedRangesStore(ABC):
88+
"""
89+
Abstract interface for tracking processed block ranges.
90+
91+
Implementations can use database tables, external stores, or other mechanisms
92+
to track which ranges have been successfully processed.
93+
"""
94+
95+
def __init__(self, config: IdempotencyConfig):
96+
self.config = config
97+
98+
@abstractmethod
99+
def is_processed(
100+
self, connection_name: str, table_name: str, ranges: List[BlockRange]
101+
) -> bool:
102+
"""
103+
Check if a set of block ranges has already been processed.
104+
105+
Args:
106+
connection_name: Name of the connection
107+
table_name: Name of the destination table
108+
ranges: List of block ranges to check
109+
110+
Returns:
111+
True if all ranges have been processed, False otherwise
112+
"""
113+
pass
114+
115+
@abstractmethod
116+
def mark_processed(
117+
self,
118+
connection_name: str,
119+
table_name: str,
120+
ranges: List[BlockRange],
121+
batch_hash: Optional[str] = None,
122+
) -> None:
123+
"""
124+
Mark a set of block ranges as processed.
125+
126+
Args:
127+
connection_name: Name of the connection
128+
table_name: Name of the destination table
129+
ranges: List of block ranges that were processed
130+
batch_hash: Optional hash of batch content for verification
131+
"""
132+
pass
133+
134+
@abstractmethod
135+
def cleanup_old_ranges(self, connection_name: str, table_name: str, days: int) -> int:
136+
"""
137+
Clean up processed ranges older than specified days.
138+
139+
Args:
140+
connection_name: Name of the connection
141+
table_name: Name of the destination table
142+
days: Delete ranges older than this many days
143+
144+
Returns:
145+
Number of ranges deleted
146+
"""
147+
pass
148+
149+
150+
class DatabaseProcessedRangesStore(ProcessedRangesStore):
151+
"""
152+
Store processed ranges in the destination database.
153+
154+
Uses a dedicated table to track which block ranges have been processed,
155+
enabling exactly-once semantics even for non-transactional operations.
156+
"""
157+
158+
def __init__(self, config: IdempotencyConfig, db_connection):
159+
"""
160+
Initialize database processed ranges store.
161+
162+
Args:
163+
config: Idempotency configuration
164+
db_connection: Database connection object with execute() method
165+
"""
166+
super().__init__(config)
167+
self.conn = db_connection
168+
self._ensure_table_exists()
169+
170+
def _ensure_table_exists(self):
171+
"""Ensure the processed ranges table exists in the database"""
172+
table_name = f'{self.config.table_prefix}processed_ranges'
173+
174+
create_sql = f"""
175+
CREATE TABLE IF NOT EXISTS {table_name} (
176+
connection_name VARCHAR(255) NOT NULL,
177+
table_name VARCHAR(255) NOT NULL,
178+
network VARCHAR(50) NOT NULL,
179+
start_block BIGINT NOT NULL,
180+
end_block BIGINT NOT NULL,
181+
processed_at TIMESTAMP NOT NULL,
182+
batch_hash VARCHAR(64),
183+
PRIMARY KEY (connection_name, table_name, network, start_block, end_block)
184+
)
185+
"""
186+
187+
# Create index for efficient lookups
188+
index_sql = f"""
189+
CREATE INDEX IF NOT EXISTS idx_{self.config.table_prefix}processed_ranges_lookup
190+
ON {table_name}(connection_name, table_name, network, start_block)
191+
"""
192+
193+
try:
194+
cursor = self.conn.cursor()
195+
cursor.execute(create_sql)
196+
cursor.execute(index_sql)
197+
self.conn.commit()
198+
logger.debug(f'Ensured processed ranges table {table_name} exists')
199+
except Exception as e:
200+
logger.debug(f'Processed ranges table creation skipped: {e}')
201+
self.conn.rollback()
202+
203+
def is_processed(
204+
self, connection_name: str, table_name: str, ranges: List[BlockRange]
205+
) -> bool:
206+
"""Check if all ranges have been processed"""
207+
if not self.config.enabled or not ranges:
208+
return False
209+
210+
table = f'{self.config.table_prefix}processed_ranges'
211+
212+
# Check if ALL ranges in the list have been processed
213+
for block_range in ranges:
214+
check_sql = f"""
215+
SELECT COUNT(*) FROM {table}
216+
WHERE connection_name = %s
217+
AND table_name = %s
218+
AND network = %s
219+
AND start_block = %s
220+
AND end_block = %s
221+
"""
222+
223+
try:
224+
cursor = self.conn.cursor()
225+
cursor.execute(
226+
check_sql,
227+
(
228+
connection_name,
229+
table_name,
230+
block_range.network,
231+
block_range.start,
232+
block_range.end,
233+
),
234+
)
235+
count = cursor.fetchone()[0]
236+
237+
if count == 0:
238+
# This range not processed yet
239+
return False
240+
241+
except Exception as e:
242+
logger.error(f'Failed to check processed range: {e}')
243+
return False
244+
245+
# All ranges have been processed
246+
logger.info(
247+
f'Duplicate detected: {len(ranges)} ranges already processed for '
248+
f'{connection_name}.{table_name}'
249+
)
250+
return True
251+
252+
def mark_processed(
253+
self,
254+
connection_name: str,
255+
table_name: str,
256+
ranges: List[BlockRange],
257+
batch_hash: Optional[str] = None,
258+
) -> None:
259+
"""Mark ranges as processed"""
260+
if not self.config.enabled:
261+
return
262+
263+
table = f'{self.config.table_prefix}processed_ranges'
264+
processed_at = datetime.utcnow()
265+
266+
insert_sql = f"""
267+
INSERT INTO {table} (connection_name, table_name, network, start_block, end_block, processed_at, batch_hash)
268+
VALUES (%s, %s, %s, %s, %s, %s, %s)
269+
ON CONFLICT (connection_name, table_name, network, start_block, end_block)
270+
DO UPDATE SET
271+
processed_at = EXCLUDED.processed_at,
272+
batch_hash = EXCLUDED.batch_hash
273+
"""
274+
275+
try:
276+
cursor = self.conn.cursor()
277+
278+
for block_range in ranges:
279+
cursor.execute(
280+
insert_sql,
281+
(
282+
connection_name,
283+
table_name,
284+
block_range.network,
285+
block_range.start,
286+
block_range.end,
287+
processed_at,
288+
batch_hash,
289+
),
290+
)
291+
292+
self.conn.commit()
293+
logger.debug(
294+
f'Marked {len(ranges)} ranges as processed for {connection_name}.{table_name}'
295+
)
296+
297+
except Exception as e:
298+
logger.error(f'Failed to mark ranges as processed: {e}')
299+
self.conn.rollback()
300+
raise
301+
302+
def cleanup_old_ranges(self, connection_name: str, table_name: str, days: int) -> int:
303+
"""Clean up old processed ranges"""
304+
if not self.config.enabled:
305+
return 0
306+
307+
table = f'{self.config.table_prefix}processed_ranges'
308+
309+
delete_sql = f"""
310+
DELETE FROM {table}
311+
WHERE connection_name = %s
312+
AND table_name = %s
313+
AND processed_at < NOW() - INTERVAL '{days} days'
314+
"""
315+
316+
try:
317+
cursor = self.conn.cursor()
318+
cursor.execute(delete_sql, (connection_name, table_name))
319+
deleted_count = cursor.rowcount
320+
self.conn.commit()
321+
322+
if deleted_count > 0:
323+
logger.info(
324+
f'Cleaned up {deleted_count} old processed ranges for '
325+
f'{connection_name}.{table_name} (older than {days} days)'
326+
)
327+
328+
return deleted_count
329+
330+
except Exception as e:
331+
logger.error(f'Failed to cleanup old ranges: {e}')
332+
self.conn.rollback()
333+
return 0
334+
335+
336+
class NullProcessedRangesStore(ProcessedRangesStore):
337+
"""No-op processed ranges store when idempotency is disabled"""
338+
339+
def is_processed(
340+
self, connection_name: str, table_name: str, ranges: List[BlockRange]
341+
) -> bool:
342+
return False
343+
344+
def mark_processed(
345+
self,
346+
connection_name: str,
347+
table_name: str,
348+
ranges: List[BlockRange],
349+
batch_hash: Optional[str] = None,
350+
) -> None:
351+
pass
352+
353+
def cleanup_old_ranges(self, connection_name: str, table_name: str, days: int) -> int:
354+
return 0
355+
356+
357+
def compute_batch_hash(batch_data) -> str:
358+
"""
359+
Compute a hash of batch data for verification.
360+
361+
Args:
362+
batch_data: PyArrow RecordBatch
363+
364+
Returns:
365+
SHA256 hash of batch content
366+
"""
367+
try:
368+
# Convert batch to bytes for hashing
369+
import pyarrow as pa
370+
371+
sink = pa.BufferOutputStream()
372+
writer = pa.ipc.new_stream(sink, batch_data.schema)
373+
writer.write_batch(batch_data)
374+
writer.close()
375+
376+
batch_bytes = sink.getvalue().to_pybytes()
377+
return hashlib.sha256(batch_bytes).hexdigest()
378+
379+
except Exception as e:
380+
logger.warning(f'Failed to compute batch hash: {e}')
381+
return None

0 commit comments

Comments
 (0)