Skip to content

Commit 2fc42c8

Browse files
committed
Fix unit tests
1 parent 9013dc4 commit 2fc42c8

File tree

1 file changed

+35
-28
lines changed

1 file changed

+35
-28
lines changed

tests/unit/test_streaming_helpers.py

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,8 @@ class TestProcessBatchNonTransactional:
217217
"""Test _process_batch_non_transactional helper method"""
218218

219219
def test_successful_non_transactional_load(self, mock_loader, sample_batch, sample_ranges):
220-
"""Test successful non-transactional batch load"""
221-
# Setup - mock state store for new unified system
222-
mock_loader.state_store.is_processed = Mock(return_value=False)
220+
"""Test successful non-transactional batch load with watermark"""
221+
# Setup - mock state store for watermark checkpointing
223222
mock_loader.state_store.mark_processed = Mock()
224223

225224
# Mock load_batch to return success
@@ -228,86 +227,94 @@ def test_successful_non_transactional_load(self, mock_loader, sample_batch, samp
228227
)
229228
mock_loader.load_batch = Mock(return_value=success_result)
230229

231-
# Execute
230+
# Execute with ranges_complete=True (watermark)
232231
result = mock_loader._process_batch_non_transactional(
233232
batch_data=sample_batch,
234233
table_name='test_table',
235234
connection_name='test_conn',
236235
ranges=sample_ranges,
237-
batch_hash='hash123',
236+
ranges_complete=True,
238237
)
239238

240239
# Verify
241240
assert result.success
242241
assert result.rows_loaded == 3
243242

244-
# Verify method calls with state store
245-
mock_loader.state_store.is_processed.assert_called_once()
243+
# Verify method calls
246244
mock_loader.load_batch.assert_called_once()
245+
# mark_processed should be called for watermarks (ranges_complete=True)
247246
mock_loader.state_store.mark_processed.assert_called_once()
248247

249-
def test_duplicate_detection_returns_skip_result(self, mock_loader, sample_batch, sample_ranges):
250-
"""Test duplicate detection returns skip result"""
251-
# Setup - is_processed returns True
252-
mock_loader.state_store.is_processed = Mock(return_value=True)
253-
mock_loader.load_batch = Mock() # Should not be called
248+
def test_data_batch_does_not_checkpoint(self, mock_loader, sample_batch, sample_ranges):
249+
"""Test that data batches (ranges_complete=False) don't checkpoint state"""
250+
# Setup
251+
mock_loader.state_store.mark_processed = Mock()
254252

255-
# Execute
253+
# Mock load_batch to return success
254+
success_result = LoadResult(
255+
rows_loaded=3, duration=0.1, ops_per_second=30.0, table_name='test_table', loader_type='mock', success=True
256+
)
257+
mock_loader.load_batch = Mock(return_value=success_result)
258+
259+
# Execute with ranges_complete=False (data batch)
256260
result = mock_loader._process_batch_non_transactional(
257261
batch_data=sample_batch,
258262
table_name='test_table',
259263
connection_name='test_conn',
260264
ranges=sample_ranges,
261-
batch_hash='hash123',
265+
ranges_complete=False,
262266
)
263267

264268
# Verify
265269
assert result.success
266-
assert result.rows_loaded == 0
267-
assert result.metadata['operation'] == 'skip_duplicate'
268-
assert result.metadata['ranges'] == [r.to_dict() for r in sample_ranges]
270+
assert result.rows_loaded == 3
269271

270-
# load_batch should not be called for duplicates
271-
mock_loader.load_batch.assert_not_called()
272+
# load_batch should be called
273+
mock_loader.load_batch.assert_called_once()
274+
# mark_processed should NOT be called for data batches (ranges_complete=False)
275+
mock_loader.state_store.mark_processed.assert_not_called()
272276

273-
def test_no_ranges_skips_duplicate_check(self, mock_loader, sample_batch):
274-
"""Test that no ranges means no duplicate checking"""
277+
def test_no_ranges_skips_checkpoint(self, mock_loader, sample_batch):
278+
"""Test that no ranges means no checkpointing"""
275279
# Setup
276-
mock_loader.state_store.is_processed = Mock()
280+
mock_loader.state_store.mark_processed = Mock()
277281
success_result = LoadResult(
278282
rows_loaded=3, duration=0.1, ops_per_second=30.0, table_name='test_table', loader_type='mock', success=True
279283
)
280284
mock_loader.load_batch = Mock(return_value=success_result)
281285

282286
# Execute with None ranges
283287
result = mock_loader._process_batch_non_transactional(
284-
batch_data=sample_batch, table_name='test_table', connection_name='test_conn', ranges=None, batch_hash=None
288+
batch_data=sample_batch,
289+
table_name='test_table',
290+
connection_name='test_conn',
291+
ranges=None,
292+
ranges_complete=True,
285293
)
286294

287295
# Verify
288296
assert result.success
289297

290-
# is_processed should not be called
291-
mock_loader.state_store.is_processed.assert_not_called()
298+
# mark_processed should not be called when ranges is None
299+
mock_loader.state_store.mark_processed.assert_not_called()
292300

293301
def test_mark_processed_failure_continues(self, mock_loader, sample_batch, sample_ranges):
294302
"""Test that mark_processed failure doesn't fail the load"""
295303
# Setup
296-
mock_loader.state_store.is_processed = Mock(return_value=False)
297304
mock_loader.state_store.mark_processed = Mock(side_effect=Exception('Mark failed'))
298305

299306
success_result = LoadResult(
300307
rows_loaded=3, duration=0.1, ops_per_second=30.0, table_name='test_table', loader_type='mock', success=True
301308
)
302309
mock_loader.load_batch = Mock(return_value=success_result)
303310

304-
# Execute - should not raise exception
311+
# Execute - should not raise exception (watermark checkpoint failure is logged but not fatal)
305312
result = mock_loader._process_batch_non_transactional(
306313
batch_data=sample_batch,
307314
table_name='test_table',
308315
connection_name='test_conn',
309316
ranges=sample_ranges,
310-
batch_hash='hash123',
317+
ranges_complete=True,
311318
)
312319

313320
# Verify - load still succeeded despite mark_processed failure

0 commit comments

Comments
 (0)