Skip to content

Commit 47fe18e

Browse files
committed
Formatting and linting
1 parent 439ed86 commit 47fe18e

File tree

12 files changed

+85
-163
lines changed

12 files changed

+85
-163
lines changed

src/amp/client.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,7 @@ def query_and_load_streaming(
362362
f'timestamp {checkpoint.timestamp}'
363363
)
364364
if checkpoint.is_reorg:
365-
resume_points = ', '.join(
366-
f'{r.network}:{r.start}' for r in checkpoint.ranges
367-
)
365+
resume_points = ', '.join(f'{r.network}:{r.start}' for r in checkpoint.ranges)
368366
self.logger.info(f'Reorg resume points: {resume_points}')
369367
except Exception as e:
370368
self.logger.warning(f'Failed to load checkpoint, starting from beginning: {e}')

src/amp/loaders/base.py

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ def load_batch(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> LoadRe
197197
)
198198
self.logger.error(error_msg)
199199
self.logger.error(
200-
f'Client will stop. On restart, streaming will resume from last checkpoint. '
201-
f'Fix the data/configuration issue before restarting.'
200+
'Client will stop. On restart, streaming will resume from last checkpoint. '
201+
'Fix the data/configuration issue before restarting.'
202202
)
203203
# Raise exception to stop the stream
204204
raise RuntimeError(error_msg)
@@ -220,8 +220,8 @@ def load_batch(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> LoadRe
220220
)
221221
self.logger.error(error_msg)
222222
self.logger.error(
223-
f'Client will stop. On restart, streaming will resume from last checkpoint. '
224-
f'Fix the underlying issue before restarting.'
223+
'Client will stop. On restart, streaming will resume from last checkpoint. '
224+
'Fix the underlying issue before restarting.'
225225
)
226226
# Raise exception to stop the stream
227227
raise RuntimeError(error_msg)
@@ -547,8 +547,7 @@ def _process_reorg_event(
547547
# Log reorg details
548548
for range_obj in response.invalidation_ranges:
549549
self.logger.warning(
550-
f'Reorg detected on {range_obj.network}: '
551-
f'blocks {range_obj.start}-{range_obj.end} invalidated'
550+
f'Reorg detected on {range_obj.network}: blocks {range_obj.start}-{range_obj.end} invalidated'
552551
)
553552

554553
# Save reorg checkpoint (keeps old checkpoints for history)
@@ -674,9 +673,7 @@ def _process_batch_non_transactional(
674673

675674
if is_duplicate:
676675
# Skip this batch - already processed
677-
self.logger.info(
678-
f'Skipping duplicate batch: {len(ranges)} ranges already processed for {table_name}'
679-
)
676+
self.logger.info(f'Skipping duplicate batch: {len(ranges)} ranges already processed for {table_name}')
680677
return LoadResult(
681678
rows_loaded=0,
682679
duration=0.0,
@@ -693,9 +690,7 @@ def _process_batch_non_transactional(
693690
if result.success and ranges:
694691
# Mark batch as processed (for exactly-once semantics)
695692
try:
696-
self.processed_ranges_store.mark_processed(
697-
connection_name, table_name, ranges, batch_hash
698-
)
693+
self.processed_ranges_store.mark_processed(connection_name, table_name, ranges, batch_hash)
699694
except Exception as e:
700695
self.logger.error(f'Failed to mark ranges as processed: {e}')
701696
# Continue anyway - checkpoint will provide resume capability
@@ -733,10 +728,7 @@ def _save_checkpoint_if_complete(
733728

734729
try:
735730
self.checkpoint_store.save(connection_name, table_name, checkpoint)
736-
self.logger.info(
737-
f'Saved checkpoint at batch {batch_count} '
738-
f'({len(checkpoint.ranges)} ranges)'
739-
)
731+
self.logger.info(f'Saved checkpoint at batch {batch_count} ({len(checkpoint.ranges)} ranges)')
740732
except Exception as e:
741733
# Log but don't fail the stream
742734
self.logger.error(f'Failed to save checkpoint: {e}')
@@ -760,9 +752,7 @@ def _augment_streaming_result(
760752
result.metadata['batch_count'] = batch_count
761753
result.metadata['ranges_complete'] = ranges_complete
762754
if ranges:
763-
result.metadata['block_ranges'] = [
764-
{'network': r.network, 'start': r.start, 'end': r.end} for r in ranges
765-
]
755+
result.metadata['block_ranges'] = [{'network': r.network, 'start': r.start, 'end': r.end} for r in ranges]
766756
return result
767757

768758
def _compute_reorg_resume_point(self, invalidation_ranges: List[BlockRange]) -> List[BlockRange]:

src/amp/loaders/implementations/postgresql_loader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import pyarrow as pa
55
from psycopg2.pool import ThreadedConnectionPool
66

7-
from ...streaming.idempotency import DatabaseProcessedRangesStore, IdempotencyConfig
7+
from ...streaming.idempotency import DatabaseProcessedRangesStore
88
from ...streaming.types import BlockRange
99
from ..base import DataLoader, LoadMode
1010
from ._postgres_helpers import has_binary_columns, prepare_csv_data, prepare_insert_data

src/amp/streaming/checkpoint.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import json
1010
import logging
1111
from abc import ABC, abstractmethod
12-
from dataclasses import asdict, dataclass
12+
from dataclasses import dataclass
1313
from datetime import datetime
1414
from typing import Dict, List, Optional
1515

@@ -88,16 +88,12 @@ def __init__(self, config: CheckpointConfig):
8888
self.config = config
8989

9090
@abstractmethod
91-
def save(
92-
self, connection_name: str, table_name: str, checkpoint: CheckpointState
93-
) -> None:
91+
def save(self, connection_name: str, table_name: str, checkpoint: CheckpointState) -> None:
9492
"""Save a checkpoint for a specific connection and table"""
9593
pass
9694

9795
@abstractmethod
98-
def load(
99-
self, connection_name: str, table_name: str, worker_id: int = 0
100-
) -> Optional[CheckpointState]:
96+
def load(self, connection_name: str, table_name: str, worker_id: int = 0) -> Optional[CheckpointState]:
10197
"""Load the latest checkpoint for a connection and table"""
10298
pass
10399

@@ -159,9 +155,7 @@ def _ensure_table_exists(self):
159155
logger.debug(f'Checkpoint table creation skipped: {e}')
160156
self.conn.rollback()
161157

162-
def save(
163-
self, connection_name: str, table_name: str, checkpoint: CheckpointState
164-
) -> None:
158+
def save(self, connection_name: str, table_name: str, checkpoint: CheckpointState) -> None:
165159
"""Save checkpoint to database"""
166160
if not self.config.enabled:
167161
return
@@ -203,9 +197,7 @@ def save(
203197
self.conn.rollback()
204198
raise
205199

206-
def load(
207-
self, connection_name: str, table_name: str, worker_id: int = 0
208-
) -> Optional[CheckpointState]:
200+
def load(self, connection_name: str, table_name: str, worker_id: int = 0) -> Optional[CheckpointState]:
209201
"""Load checkpoint from database"""
210202
if not self.config.enabled:
211203
return None
@@ -310,14 +302,10 @@ def delete(self, connection_name: str, table_name: str, worker_id: int = 0) -> N
310302
class NullCheckpointStore(CheckpointStore):
311303
"""No-op checkpoint store for when checkpointing is disabled"""
312304

313-
def save(
314-
self, connection_name: str, table_name: str, checkpoint: CheckpointState
315-
) -> None:
305+
def save(self, connection_name: str, table_name: str, checkpoint: CheckpointState) -> None:
316306
pass
317307

318-
def load(
319-
self, connection_name: str, table_name: str, worker_id: int = 0
320-
) -> Optional[CheckpointState]:
308+
def load(self, connection_name: str, table_name: str, worker_id: int = 0) -> Optional[CheckpointState]:
321309
return None
322310

323311
def delete_for_network(self, connection_name: str, table_name: str, network: str) -> None:

src/amp/streaming/idempotency.py

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
"""
77

88
import hashlib
9-
import json
109
import logging
1110
from abc import ABC, abstractmethod
1211
from dataclasses import dataclass
@@ -96,9 +95,7 @@ def __init__(self, config: IdempotencyConfig):
9695
self.config = config
9796

9897
@abstractmethod
99-
def is_processed(
100-
self, connection_name: str, table_name: str, ranges: List[BlockRange]
101-
) -> bool:
98+
def is_processed(self, connection_name: str, table_name: str, ranges: List[BlockRange]) -> bool:
10299
"""
103100
Check if a set of block ranges has already been processed.
104101
@@ -200,9 +197,7 @@ def _ensure_table_exists(self):
200197
logger.debug(f'Processed ranges table creation skipped: {e}')
201198
self.conn.rollback()
202199

203-
def is_processed(
204-
self, connection_name: str, table_name: str, ranges: List[BlockRange]
205-
) -> bool:
200+
def is_processed(self, connection_name: str, table_name: str, ranges: List[BlockRange]) -> bool:
206201
"""Check if all ranges have been processed"""
207202
if not self.config.enabled or not ranges:
208203
return False
@@ -243,10 +238,7 @@ def is_processed(
243238
return False
244239

245240
# All ranges have been processed
246-
logger.info(
247-
f'Duplicate detected: {len(ranges)} ranges already processed for '
248-
f'{connection_name}.{table_name}'
249-
)
241+
logger.info(f'Duplicate detected: {len(ranges)} ranges already processed for {connection_name}.{table_name}')
250242
return True
251243

252244
def mark_processed(
@@ -290,9 +282,7 @@ def mark_processed(
290282
)
291283

292284
self.conn.commit()
293-
logger.debug(
294-
f'Marked {len(ranges)} ranges as processed for {connection_name}.{table_name}'
295-
)
285+
logger.debug(f'Marked {len(ranges)} ranges as processed for {connection_name}.{table_name}')
296286

297287
except Exception as e:
298288
logger.error(f'Failed to mark ranges as processed: {e}')
@@ -336,9 +326,7 @@ def cleanup_old_ranges(self, connection_name: str, table_name: str, days: int) -
336326
class NullProcessedRangesStore(ProcessedRangesStore):
337327
"""No-op processed ranges store when idempotency is disabled"""
338328

339-
def is_processed(
340-
self, connection_name: str, table_name: str, ranges: List[BlockRange]
341-
) -> bool:
329+
def is_processed(self, connection_name: str, table_name: str, ranges: List[BlockRange]) -> bool:
342330
return False
343331

344332
def mark_processed(

src/amp/streaming/types.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,7 @@ def from_flight_data(cls, metadata_bytes: bytes) -> 'BatchMetadata':
121121
# Store remaining fields in extra
122122
extra = {k: v for k, v in metadata_dict.items() if k not in ('ranges', 'ranges_complete')}
123123

124-
return cls(
125-
ranges=ranges, ranges_complete=ranges_complete, extra=extra if extra else None
126-
)
124+
return cls(ranges=ranges, ranges_complete=ranges_complete, extra=extra if extra else None)
127125
except (json.JSONDecodeError, KeyError) as e:
128126
# Fallback to empty metadata if parsing fails
129127
return cls(ranges=[], ranges_complete=False, extra={'parse_error': str(e)})

tests/integration/test_checkpoint_resume.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,10 @@
77

88
import time
99
from datetime import datetime
10-
from unittest.mock import MagicMock, Mock, patch
1110

1211
import psycopg2
13-
import pyarrow as pa
1412
import pytest
1513

16-
from src.amp.loaders.base import DataLoader
1714
from src.amp.streaming.checkpoint import (
1815
CheckpointConfig,
1916
CheckpointState,
@@ -368,7 +365,6 @@ class TestCheckpointDisabled:
368365

369366
def test_checkpoint_disabled_by_default(self):
370367
"""Test that checkpoints are disabled by default"""
371-
from src.amp.streaming.checkpoint import NullCheckpointStore
372368

373369
config = CheckpointConfig()
374370
assert config.enabled is False
@@ -633,7 +629,6 @@ def test_exactly_once_with_checkpoint_integration(self, checkpoint_db_connection
633629
def test_transactional_exactly_once_postgresql(self, postgresql_test_config, small_test_data):
634630
"""Test transactional exactly-once semantics in PostgreSQL loader"""
635631
from src.amp.loaders.implementations.postgresql_loader import PostgreSQLLoader
636-
from src.amp.streaming.idempotency import IdempotencyConfig
637632

638633
# Configure loader with idempotency enabled
639634
config_dict = {
@@ -696,10 +691,13 @@ def test_transactional_exactly_once_postgresql(self, postgresql_test_config, sma
696691
conn = loader.pool.getconn()
697692
try:
698693
cursor = conn.cursor()
699-
cursor.execute("""
694+
cursor.execute(
695+
"""
700696
SELECT COUNT(*) FROM test_amp_processed_ranges
701697
WHERE connection_name = %s AND table_name = %s
702-
""", (connection_name, test_table))
698+
""",
699+
(connection_name, test_table),
700+
)
703701
processed_count = cursor.fetchone()[0]
704702
assert processed_count == 1, 'Exactly one processed range entry should exist'
705703
finally:

tests/integration/test_resilient_streaming.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,11 @@
88
import time
99
from dataclasses import dataclass
1010
from typing import Any, Dict
11-
from unittest.mock import MagicMock, patch
1211

1312
import pyarrow as pa
1413
import pytest
1514

1615
from amp.loaders.base import DataLoader
17-
from amp.loaders.types import LoadResult
18-
from amp.streaming.resilience import (
19-
BackPressureConfig,
20-
RetryConfig,
21-
)
2216

2317

2418
@dataclass

tests/unit/test_checkpoint.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
Unit tests for checkpoint system.
33
"""
44

5-
import json
65
from datetime import datetime
76

87
import pytest

tests/unit/test_idempotency.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,10 @@ class TestBatchHashing:
205205

206206
def test_compute_batch_hash(self):
207207
"""Test computing hash of batch data"""
208-
from src.amp.streaming.idempotency import compute_batch_hash
209208
import pyarrow as pa
210209

210+
from src.amp.streaming.idempotency import compute_batch_hash
211+
211212
# Create test batch
212213
schema = pa.schema([('id', pa.int64()), ('value', pa.string())])
213214
batch = pa.record_batch([[1, 2, 3], ['a', 'b', 'c']], schema=schema)
@@ -220,9 +221,10 @@ def test_compute_batch_hash(self):
220221

221222
def test_same_data_same_hash(self):
222223
"""Test that same data produces same hash"""
223-
from src.amp.streaming.idempotency import compute_batch_hash
224224
import pyarrow as pa
225225

226+
from src.amp.streaming.idempotency import compute_batch_hash
227+
226228
schema = pa.schema([('id', pa.int64()), ('value', pa.string())])
227229

228230
batch1 = pa.record_batch([[1, 2, 3], ['a', 'b', 'c']], schema=schema)
@@ -235,9 +237,10 @@ def test_same_data_same_hash(self):
235237

236238
def test_different_data_different_hash(self):
237239
"""Test that different data produces different hash"""
238-
from src.amp.streaming.idempotency import compute_batch_hash
239240
import pyarrow as pa
240241

242+
from src.amp.streaming.idempotency import compute_batch_hash
243+
241244
schema = pa.schema([('id', pa.int64()), ('value', pa.string())])
242245

243246
batch1 = pa.record_batch([[1, 2, 3], ['a', 'b', 'c']], schema=schema)

0 commit comments

Comments
 (0)