Skip to content

Commit 92151df

Browse files
committed
test: Add comprehensive unit tests for streaming features
Add unit tests for all new streaming features: - test_checkpoint.py - Checkpoint management and persistence - test_idempotency.py - Duplicate detection and deduplication - test_label_joining.py - Label enrichment with type conversion - test_label_manager.py - CSV loading and label storage - test_resilience.py - Retry, backoff, rate limiting - test_resume_optimization.py - Resume position calculation - test_stream_state.py - State store implementations - test_streaming_helpers.py - Utility functions and batch ID generation - test_streaming_types.py - BlockRange, ResumeWatermark types
1 parent 86749a0 commit 92151df

9 files changed

+2304
-14
lines changed

tests/unit/test_checkpoint.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
"""
2+
Unit tests for checkpoint system.
3+
"""
4+
5+
from datetime import UTC, datetime
6+
7+
import pytest
8+
9+
from src.amp.streaming.checkpoint import CheckpointConfig, CheckpointState, NullCheckpointStore
10+
from src.amp.streaming.types import BlockRange
11+
12+
13+
@pytest.mark.unit
14+
class TestCheckpointState:
15+
"""Test CheckpointState dataclass and methods"""
16+
17+
def test_creation(self):
18+
"""Test creating a checkpoint state"""
19+
ranges = [
20+
BlockRange(network='ethereum', start=100, end=200, hash='0xabc'),
21+
BlockRange(network='polygon', start=50, end=150, hash='0xdef'),
22+
]
23+
timestamp = datetime.now(UTC)
24+
25+
checkpoint = CheckpointState(
26+
ranges=ranges,
27+
timestamp=timestamp,
28+
worker_id=0,
29+
)
30+
31+
assert len(checkpoint.ranges) == 2
32+
assert checkpoint.timestamp == timestamp
33+
assert checkpoint.worker_id == 0
34+
35+
def test_to_resume_watermark(self):
36+
"""Test converting checkpoint to resume watermark"""
37+
ranges = [
38+
BlockRange(network='ethereum', start=100, end=200, hash='0xabc'),
39+
]
40+
timestamp = datetime(2024, 1, 1, 0, 0, 0)
41+
42+
checkpoint = CheckpointState(
43+
ranges=ranges,
44+
timestamp=timestamp,
45+
)
46+
47+
watermark = checkpoint.to_resume_watermark()
48+
49+
assert len(watermark.ranges) == 1
50+
assert watermark.ranges[0]['network'] == 'ethereum'
51+
assert watermark.ranges[0]['start'] == 100
52+
assert watermark.ranges[0]['end'] == 200
53+
assert watermark.timestamp == '2024-01-01T00:00:00'
54+
55+
def test_serialization_round_trip(self):
56+
"""Test checkpoint serialization and deserialization"""
57+
ranges = [
58+
BlockRange(network='ethereum', start=100, end=200, hash='0xabc', prev_hash='0x123'),
59+
BlockRange(network='polygon', start=50, end=150, hash='0xdef'),
60+
]
61+
timestamp = datetime(2024, 1, 1, 12, 30, 0)
62+
63+
original = CheckpointState(
64+
ranges=ranges,
65+
timestamp=timestamp,
66+
worker_id=1,
67+
)
68+
69+
# Serialize
70+
data = original.to_dict()
71+
72+
# Deserialize
73+
restored = CheckpointState.from_dict(data)
74+
75+
assert len(restored.ranges) == 2
76+
assert restored.ranges[0].network == 'ethereum'
77+
assert restored.ranges[0].hash == '0xabc'
78+
assert restored.ranges[0].prev_hash == '0x123'
79+
assert restored.ranges[1].network == 'polygon'
80+
assert restored.timestamp == timestamp
81+
assert restored.worker_id == 1
82+
83+
def test_to_dict_structure(self):
84+
"""Test checkpoint dictionary structure"""
85+
ranges = [BlockRange(network='ethereum', start=100, end=200)]
86+
timestamp = datetime(2024, 1, 1, 0, 0, 0)
87+
88+
checkpoint = CheckpointState(
89+
ranges=ranges,
90+
timestamp=timestamp,
91+
worker_id=0,
92+
)
93+
94+
data = checkpoint.to_dict()
95+
96+
assert 'ranges' in data
97+
assert 'timestamp' in data
98+
assert 'worker_id' in data
99+
assert data['worker_id'] == 0
100+
assert isinstance(data['ranges'], list)
101+
assert isinstance(data['timestamp'], str)
102+
103+
104+
@pytest.mark.unit
105+
class TestCheckpointConfig:
106+
"""Test CheckpointConfig dataclass"""
107+
108+
def test_default_values(self):
109+
"""Test checkpoint config defaults"""
110+
config = CheckpointConfig()
111+
112+
assert config.enabled == False # Disabled by default
113+
assert config.storage == 'db'
114+
assert config.table_prefix == 'amp_'
115+
116+
def test_custom_values(self):
117+
"""Test checkpoint config with custom values"""
118+
config = CheckpointConfig(
119+
enabled=True,
120+
storage='external',
121+
table_prefix='custom_',
122+
)
123+
124+
assert config.enabled == True
125+
assert config.storage == 'external'
126+
assert config.table_prefix == 'custom_'
127+
128+
129+
@pytest.mark.unit
130+
class TestNullCheckpointStore:
131+
"""Test NullCheckpointStore no-op implementation"""
132+
133+
def test_save_does_nothing(self):
134+
"""Test that save is a no-op"""
135+
config = CheckpointConfig(enabled=False)
136+
store = NullCheckpointStore(config)
137+
138+
checkpoint = CheckpointState(
139+
ranges=[BlockRange(network='ethereum', start=100, end=200)],
140+
timestamp=datetime.now(UTC),
141+
)
142+
143+
# Should not raise
144+
store.save('test_conn', 'test_table', checkpoint)
145+
146+
def test_load_returns_none(self):
147+
"""Test that load always returns None"""
148+
config = CheckpointConfig(enabled=False)
149+
store = NullCheckpointStore(config)
150+
151+
result = store.load('test_conn', 'test_table')
152+
153+
assert result is None
154+
155+
def test_delete_for_network_does_nothing(self):
156+
"""Test that delete_for_network is a no-op"""
157+
config = CheckpointConfig(enabled=False)
158+
store = NullCheckpointStore(config)
159+
160+
# Should not raise
161+
store.delete_for_network('test_conn', 'test_table', 'ethereum')
162+
163+
def test_delete_does_nothing(self):
164+
"""Test that delete is a no-op"""
165+
config = CheckpointConfig(enabled=False)
166+
store = NullCheckpointStore(config)
167+
168+
# Should not raise
169+
store.delete('test_conn', 'test_table')

0 commit comments

Comments
 (0)