File tree Expand file tree Collapse file tree 1 file changed +14
-0
lines changed
src/amp/loaders/implementations Expand file tree Collapse file tree 1 file changed +14
-0
lines changed Original file line number Diff line number Diff line change @@ -460,6 +460,20 @@ def connect(self) -> None:
460460 if self .loading_method == 'stage' :
461461 self ._create_stage ()
462462
463+ # Replace NullStores with database-backed implementations
464+ # This enables persistent checkpointing and idempotency
465+ if self .checkpoint_config .enabled :
466+ from ...streaming .checkpoint import DatabaseCheckpointStore
467+
468+ self .checkpoint_store = DatabaseCheckpointStore (self .checkpoint_config , self .connection )
469+ self .logger .info ('Enabled database-backed checkpoint store' )
470+
471+ if self .idempotency_config .enabled :
472+ from ...streaming .idempotency import DatabaseProcessedRangesStore
473+
474+ self .processed_ranges_store = DatabaseProcessedRangesStore (self .idempotency_config , self .connection )
475+ self .logger .info ('Enabled database-backed idempotency store' )
476+
463477 self ._is_connected = True
464478
465479 except Exception as e :
You can’t perform that action at this time.
0 commit comments