Skip to content

Commit ff749a8

Browse files
committed
loaders: Integrate resilience, checkpointing, and idempotency
1 parent 0fd23e7 commit ff749a8

File tree

9 files changed

+1384
-74
lines changed

9 files changed

+1384
-74
lines changed

src/amp/client.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,29 @@ def query_and_load_streaming(
346346

347347
self.logger.info(f'Starting streaming query to {loader_type}:{destination}')
348348

349+
# Create loader instance early to access checkpoint store
350+
loader_instance = create_loader(loader_type, loader_config)
351+
352+
# Load checkpoint and create resume watermark if enabled (default: enabled)
353+
if resume_watermark is None and kwargs.get('resume', True):
354+
try:
355+
checkpoint = loader_instance.checkpoint_store.load(connection_name, destination)
356+
357+
if checkpoint:
358+
resume_watermark = checkpoint.to_resume_watermark()
359+
checkpoint_type = 'reorg checkpoint' if checkpoint.is_reorg else 'checkpoint'
360+
self.logger.info(
361+
f'Resuming from {checkpoint_type}: {len(checkpoint.ranges)} ranges, '
362+
f'timestamp {checkpoint.timestamp}'
363+
)
364+
if checkpoint.is_reorg:
365+
resume_points = ', '.join(
366+
f'{r.network}:{r.start}' for r in checkpoint.ranges
367+
)
368+
self.logger.info(f'Reorg resume points: {resume_points}')
369+
except Exception as e:
370+
self.logger.warning(f'Failed to load checkpoint, starting from beginning: {e}')
371+
349372
try:
350373
# Execute streaming query with Flight SQL
351374
# Create a CommandStatementQuery message
@@ -376,12 +399,13 @@ def query_and_load_streaming(
376399
stream_iterator = ReorgAwareStream(stream_iterator)
377400
self.logger.info('Reorg detection enabled for streaming query')
378401

379-
# Create loader instance and start continuous loading
380-
loader_instance = create_loader(loader_type, loader_config)
381-
402+
# Start continuous loading with checkpoint support
382403
with loader_instance:
383404
self.logger.info(f'Starting continuous load to {destination}. Press Ctrl+C to stop.')
384-
yield from loader_instance.load_stream_continuous(stream_iterator, destination, **load_config.__dict__)
405+
# Pass connection_name for checkpoint saving
406+
yield from loader_instance.load_stream_continuous(
407+
stream_iterator, destination, connection_name=connection_name, **load_config.__dict__
408+
)
385409

386410
except Exception as e:
387411
self.logger.error(f'Streaming query failed: {e}')

src/amp/loaders/base.py

Lines changed: 479 additions & 47 deletions
Large diffs are not rendered by default.

src/amp/loaders/implementations/postgresql_loader.py

Lines changed: 110 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pyarrow as pa
55
from psycopg2.pool import ThreadedConnectionPool
66

7+
from ...streaming.idempotency import DatabaseProcessedRangesStore, IdempotencyConfig
78
from ...streaming.types import BlockRange
89
from ..base import DataLoader, LoadMode
910
from ._postgres_helpers import has_binary_columns, prepare_csv_data, prepare_insert_data
@@ -84,6 +85,22 @@ def connect(self) -> None:
8485
finally:
8586
self.pool.putconn(conn)
8687

88+
# Replace NullStores with database-backed implementations
89+
# This enables persistent checkpointing and idempotency
90+
conn = self.pool.getconn()
91+
try:
92+
if self.checkpoint_config.enabled:
93+
from ...streaming.checkpoint import DatabaseCheckpointStore
94+
95+
self.checkpoint_store = DatabaseCheckpointStore(self.checkpoint_config, conn)
96+
self.logger.info('Enabled database-backed checkpoint store')
97+
98+
if self.idempotency_config.enabled:
99+
self.processed_ranges_store = DatabaseProcessedRangesStore(self.idempotency_config, conn)
100+
self.logger.info('Enabled database-backed idempotency store')
101+
finally:
102+
self.pool.putconn(conn)
103+
87104
self._is_connected = True
88105

89106
except Exception as e:
@@ -109,6 +126,90 @@ def _load_batch_impl(self, batch: pa.RecordBatch, table_name: str, **kwargs) ->
109126
finally:
110127
self.pool.putconn(conn)
111128

129+
def load_batch_transactional(
130+
self,
131+
batch: pa.RecordBatch,
132+
table_name: str,
133+
connection_name: str,
134+
ranges: List[BlockRange],
135+
batch_hash: Optional[str] = None,
136+
) -> int:
137+
"""
138+
Load a batch with transactional exactly-once semantics.
139+
140+
This method wraps the duplicate check, data loading, and processed marking
141+
in a single PostgreSQL transaction, ensuring atomic exactly-once processing.
142+
143+
The transaction flow:
144+
1. BEGIN TRANSACTION
145+
2. Check if batch already processed (with SELECT FOR UPDATE lock)
146+
3. If not processed:
147+
- Load data into target table
148+
- Mark ranges as processed in processed_ranges table
149+
4. COMMIT (or ROLLBACK on error)
150+
151+
This guarantees that either both operations succeed or both fail,
152+
preventing duplicate data even in case of crashes between operations.
153+
154+
Args:
155+
batch: PyArrow RecordBatch to load
156+
table_name: Target table name
157+
connection_name: Connection identifier for tracking
158+
ranges: Block ranges covered by this batch
159+
batch_hash: Optional hash for additional validation
160+
161+
Returns:
162+
Number of rows loaded (0 if duplicate)
163+
"""
164+
if not self.idempotency_config.enabled:
165+
raise ValueError('Transactional loading requires idempotency to be enabled')
166+
167+
conn = self.pool.getconn()
168+
try:
169+
# Create processed ranges store with this connection for transactional operations
170+
store = DatabaseProcessedRangesStore(self.idempotency_config, conn)
171+
172+
# Disable autocommit to manage transaction manually
173+
original_autocommit = conn.autocommit
174+
conn.autocommit = False
175+
176+
try:
177+
# Check if already processed (within transaction)
178+
if store.is_processed(connection_name, table_name, ranges):
179+
self.logger.info(
180+
f'Batch already processed (ranges: {[f"{r.network}:{r.start}-{r.end}" for r in ranges]}), '
181+
f'skipping (transactional check)'
182+
)
183+
conn.rollback()
184+
return 0
185+
186+
# Load data within transaction
187+
with conn.cursor() as cur:
188+
self._copy_arrow_data(cur, batch, table_name)
189+
190+
# Mark as processed within same transaction
191+
store.mark_processed(connection_name, table_name, ranges, batch_hash)
192+
193+
# Commit transaction - both data load and processed marking succeed atomically
194+
conn.commit()
195+
self.logger.debug(
196+
f'Transactional batch load committed: {batch.num_rows} rows, '
197+
f'ranges: {[f"{r.network}:{r.start}-{r.end}" for r in ranges]}'
198+
)
199+
return batch.num_rows
200+
201+
except Exception as e:
202+
# Rollback on any error - ensures no partial state
203+
conn.rollback()
204+
self.logger.error(f'Transactional batch load failed, rolled back: {e}')
205+
raise
206+
finally:
207+
# Restore original autocommit setting
208+
conn.autocommit = original_autocommit
209+
210+
finally:
211+
self.pool.putconn(conn)
212+
112213
def _clear_table(self, table_name: str) -> None:
113214
"""Clear table for overwrite mode"""
114215
conn = self.pool.getconn()
@@ -208,11 +309,9 @@ def _create_table_from_schema(self, schema: pa.Schema, table_name: str) -> None:
208309

209310
# Build CREATE TABLE statement
210311
columns = []
211-
# Check if this is streaming data with metadata columns
212-
has_metadata = any(field.name.startswith('_meta_') for field in schema)
213312

214313
for field in schema:
215-
# Skip generic metadata columns - we'll use _meta_block_range instead
314+
# Skip generic metadata columns - we'll use _meta_block_ranges instead
216315
if field.name in ('_meta_range_start', '_meta_range_end'):
217316
continue
218317
# Special handling for JSONB metadata column
@@ -258,13 +357,14 @@ def _create_table_from_schema(self, schema: pa.Schema, table_name: str) -> None:
258357
# Quote column name for safety (important for blockchain field names)
259358
columns.append(f'"{field.name}" {pg_type}{nullable}')
260359

261-
# Add metadata columns for streaming/reorg support if this is streaming data
262-
# but only if they don't already exist in the schema
263-
if has_metadata:
264-
schema_field_names = [field.name for field in schema]
265-
if '_meta_block_ranges' not in schema_field_names:
266-
# Use JSONB for multi-network block ranges with GIN index support
267-
columns.append('"_meta_block_ranges" JSONB')
360+
# Always add metadata column for streaming/reorg support
361+
# This supports hybrid streaming (parallel catch-up → continuous streaming)
362+
# where initial batches don't have metadata but later ones do
363+
schema_field_names = [field.name for field in schema]
364+
if '_meta_block_ranges' not in schema_field_names:
365+
# Use JSONB for multi-network block ranges with GIN index support
366+
# This column is optional and can be NULL for non-streaming loads
367+
columns.append('"_meta_block_ranges" JSONB')
268368

269369
# Create the table - Fixed: use proper identifier quoting
270370
create_sql = f"""

src/amp/streaming/__init__.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,19 @@
11
# Streaming module for continuous data loading
2+
from .checkpoint import (
3+
CheckpointConfig,
4+
CheckpointState,
5+
CheckpointStore,
6+
DatabaseCheckpointStore,
7+
NullCheckpointStore,
8+
)
9+
from .idempotency import (
10+
DatabaseProcessedRangesStore,
11+
IdempotencyConfig,
12+
NullProcessedRangesStore,
13+
ProcessedRange,
14+
ProcessedRangesStore,
15+
compute_batch_hash,
16+
)
217
from .iterator import StreamingResultIterator
318
from .parallel import (
419
BlockRangePartitionStrategy,
@@ -27,4 +42,15 @@
2742
'ParallelStreamExecutor',
2843
'QueryPartition',
2944
'BlockRangePartitionStrategy',
45+
'CheckpointConfig',
46+
'CheckpointState',
47+
'CheckpointStore',
48+
'DatabaseCheckpointStore',
49+
'NullCheckpointStore',
50+
'IdempotencyConfig',
51+
'ProcessedRange',
52+
'ProcessedRangesStore',
53+
'DatabaseProcessedRangesStore',
54+
'NullProcessedRangesStore',
55+
'compute_batch_hash',
3056
]

src/amp/streaming/parallel.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional
1919

2020
from ..loaders.types import LoadResult
21+
from .resilience import BackPressureConfig, RetryConfig
2122

2223
if TYPE_CHECKING:
2324
from ..client import Client
@@ -53,7 +54,7 @@ def metadata(self) -> Dict[str, Any]:
5354

5455
@dataclass
5556
class ParallelConfig:
56-
"""Configuration for parallel streaming execution"""
57+
"""Configuration for parallel streaming execution with resilience support"""
5758

5859
num_workers: int
5960
table_name: str # Name of the table to partition (e.g., 'blocks', 'transactions')
@@ -64,6 +65,11 @@ class ParallelConfig:
6465
stop_on_error: bool = False # Stop all workers on first error
6566
reorg_buffer: int = 200 # Block overlap when transitioning to continuous streaming (for reorg detection)
6667

68+
# Resilience configuration (applied to all workers)
69+
# If not specified, uses sensible defaults from resilience module
70+
retry_config: Optional[RetryConfig] = None
71+
back_pressure_config: Optional[BackPressureConfig] = None
72+
6773
def __post_init__(self):
6874
if self.num_workers < 1:
6975
raise ValueError(f'num_workers must be >= 1, got {self.num_workers}')
@@ -74,6 +80,37 @@ def __post_init__(self):
7480
if not self.table_name:
7581
raise ValueError('table_name is required')
7682

83+
def get_resilience_config(self) -> Dict[str, Any]:
84+
"""
85+
Get resilience configuration as a dict suitable for loader config.
86+
87+
Returns:
88+
Dict with resilience settings, or empty dict if all None (use defaults)
89+
"""
90+
resilience_dict = {}
91+
92+
if self.retry_config is not None:
93+
resilience_dict['retry'] = {
94+
'enabled': self.retry_config.enabled,
95+
'max_retries': self.retry_config.max_retries,
96+
'initial_backoff_ms': self.retry_config.initial_backoff_ms,
97+
'max_backoff_ms': self.retry_config.max_backoff_ms,
98+
'backoff_multiplier': self.retry_config.backoff_multiplier,
99+
'jitter': self.retry_config.jitter,
100+
}
101+
102+
if self.back_pressure_config is not None:
103+
resilience_dict['back_pressure'] = {
104+
'enabled': self.back_pressure_config.enabled,
105+
'initial_delay_ms': self.back_pressure_config.initial_delay_ms,
106+
'max_delay_ms': self.back_pressure_config.max_delay_ms,
107+
'adapt_on_429': self.back_pressure_config.adapt_on_429,
108+
'adapt_on_timeout': self.back_pressure_config.adapt_on_timeout,
109+
'recovery_factor': self.back_pressure_config.recovery_factor,
110+
}
111+
112+
return {'resilience': resilience_dict} if resilience_dict else {}
113+
77114

78115
class BlockRangePartitionStrategy:
79116
"""
@@ -317,6 +354,13 @@ def execute_parallel_stream(
317354
"""
318355
load_config = load_config or {}
319356

357+
# Merge resilience configuration into load_config
358+
# This ensures all workers inherit the resilience behavior
359+
resilience_config = self.config.get_resilience_config()
360+
if resilience_config:
361+
load_config.update(resilience_config)
362+
self.logger.info('Applied resilience configuration to parallel workers')
363+
320364
# Detect if we should continue with live streaming after parallel phase
321365
continue_streaming = self.config.max_block is None
322366

0 commit comments

Comments
 (0)