Skip to content

Commit d3d8dda

Browse files
committed
Fix integration test failures
1 parent 6375767 commit d3d8dda

File tree

2 files changed

+237
-36
lines changed

2 files changed

+237
-36
lines changed

tests/integration/test_deltalake_loader.py

Lines changed: 133 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -591,28 +591,50 @@ def test_handle_reorg_single_network(self, delta_temp_config):
591591
ranges_complete=True, # Mark as complete so it gets tracked in state store
592592
),
593593
)
594+
watermark1 = ResponseBatch.data_batch(
595+
data=pa.RecordBatch.from_pydict({'id': [], 'block_num': [], 'year': [], 'month': []}),
596+
metadata=BatchMetadata(
597+
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xabc')], ranges_complete=True
598+
),
599+
)
594600
response2 = ResponseBatch.data_batch(
595601
data=batch2,
596602
metadata=BatchMetadata(
597603
ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xdef')],
598604
ranges_complete=True, # Mark as complete so it gets tracked in state store
599605
),
600606
)
607+
watermark2 = ResponseBatch.data_batch(
608+
data=pa.RecordBatch.from_pydict({'id': [], 'block_num': [], 'year': [], 'month': []}),
609+
metadata=BatchMetadata(
610+
ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xdef')], ranges_complete=True
611+
),
612+
)
601613
response3 = ResponseBatch.data_batch(
602614
data=batch3,
603615
metadata=BatchMetadata(
604616
ranges=[BlockRange(network='ethereum', start=200, end=210, hash='0x123')],
605617
ranges_complete=True, # Mark as complete so it gets tracked in state store
606618
),
607619
)
620+
watermark3 = ResponseBatch.data_batch(
621+
data=pa.RecordBatch.from_pydict({'id': [], 'block_num': [], 'year': [], 'month': []}),
622+
metadata=BatchMetadata(
623+
ranges=[BlockRange(network='ethereum', start=200, end=210, hash='0x123')], ranges_complete=True
624+
),
625+
)
608626

609-
# Load via streaming API
610-
stream = [response1, response2, response3]
627+
# Load via streaming API with watermarks (following server protocol)
628+
stream = [response1, watermark1, response2, watermark2, response3, watermark3]
611629
results = list(loader.load_stream_continuous(iter(stream), 'test_reorg_single'))
612-
assert len(results) == 3
630+
# Expect 6 results: 3 data batches + 3 watermarks
631+
assert len(results) == 6
613632
assert all(r.success for r in results)
633+
# Verify only data batches loaded rows (watermarks have 0 rows)
634+
data_results = [r for r in results if r.rows_loaded > 0]
635+
assert len(data_results) == 3
614636

615-
# Verify all data exists
637+
# Verify all data exists in table
616638
initial_data = loader.query_table()
617639
assert initial_data.num_rows == 3
618640

@@ -643,40 +665,70 @@ def test_handle_reorg_multi_network(self, delta_temp_config):
643665
batch3 = pa.RecordBatch.from_pydict({'id': [3], 'network': ['ethereum'], 'year': [2024], 'month': [1]})
644666
batch4 = pa.RecordBatch.from_pydict({'id': [4], 'network': ['polygon'], 'year': [2024], 'month': [1]})
645667

646-
# Create response batches with network-specific ranges
668+
# Create response batches with network-specific ranges and watermarks
669+
empty = pa.RecordBatch.from_pydict({'id': [], 'network': [], 'year': [], 'month': []})
670+
647671
response1 = ResponseBatch.data_batch(
648672
data=batch1,
649673
metadata=BatchMetadata(
650674
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xaaa')],
651675
ranges_complete=True, # Mark as complete so it gets tracked in state store
652676
),
653677
)
678+
watermark1 = ResponseBatch.data_batch(
679+
data=empty,
680+
metadata=BatchMetadata(
681+
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xaaa')], ranges_complete=True
682+
),
683+
)
684+
654685
response2 = ResponseBatch.data_batch(
655686
data=batch2,
656687
metadata=BatchMetadata(
657688
ranges=[BlockRange(network='polygon', start=100, end=110, hash='0xbbb')],
658689
ranges_complete=True, # Mark as complete so it gets tracked in state store
659690
),
660691
)
692+
watermark2 = ResponseBatch.data_batch(
693+
data=empty,
694+
metadata=BatchMetadata(
695+
ranges=[BlockRange(network='polygon', start=100, end=110, hash='0xbbb')], ranges_complete=True
696+
),
697+
)
698+
661699
response3 = ResponseBatch.data_batch(
662700
data=batch3,
663701
metadata=BatchMetadata(
664702
ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xccc')],
665703
ranges_complete=True, # Mark as complete so it gets tracked in state store
666704
),
667705
)
706+
watermark3 = ResponseBatch.data_batch(
707+
data=empty,
708+
metadata=BatchMetadata(
709+
ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xccc')], ranges_complete=True
710+
),
711+
)
712+
668713
response4 = ResponseBatch.data_batch(
669714
data=batch4,
670715
metadata=BatchMetadata(
671716
ranges=[BlockRange(network='polygon', start=150, end=160, hash='0xddd')],
672717
ranges_complete=True, # Mark as complete so it gets tracked in state store
673718
),
674719
)
720+
watermark4 = ResponseBatch.data_batch(
721+
data=empty,
722+
metadata=BatchMetadata(
723+
ranges=[BlockRange(network='polygon', start=150, end=160, hash='0xddd')], ranges_complete=True
724+
),
725+
)
675726

676-
# Load via streaming API
677-
stream = [response1, response2, response3, response4]
727+
# Load via streaming API with watermarks
728+
stream = [response1, watermark1, response2, watermark2, response3, watermark3, response4, watermark4]
678729
results = list(loader.load_stream_continuous(iter(stream), 'test_reorg_multi'))
679-
assert len(results) == 4
730+
# Expect 8 results: 4 data batches + 4 watermarks
731+
assert len(results) == 8
680732
assert all(r.success for r in results)
681733

682734
# Reorg only ethereum from block 150
@@ -705,6 +757,9 @@ def test_handle_reorg_overlapping_ranges(self, delta_temp_config):
705757
batch2 = pa.RecordBatch.from_pydict({'id': [2], 'year': [2024], 'month': [1]})
706758
batch3 = pa.RecordBatch.from_pydict({'id': [3], 'year': [2024], 'month': [1]})
707759

760+
# Create empty batch for watermarks
761+
empty = pa.RecordBatch.from_pydict({'id': [], 'year': [], 'month': []})
762+
708763
# Batch 1: 90-110 (ends before reorg start of 150)
709764
# Batch 2: 140-160 (overlaps with reorg)
710765
# Batch 3: 170-190 (after reorg, but should be deleted as 170 >= 150)
@@ -715,25 +770,43 @@ def test_handle_reorg_overlapping_ranges(self, delta_temp_config):
715770
ranges_complete=True, # Mark as complete so it gets tracked in state store
716771
),
717772
)
773+
watermark1 = ResponseBatch.data_batch(
774+
data=empty,
775+
metadata=BatchMetadata(
776+
ranges=[BlockRange(network='ethereum', start=90, end=110, hash='0xaaa')], ranges_complete=True
777+
),
778+
)
718779
response2 = ResponseBatch.data_batch(
719780
data=batch2,
720781
metadata=BatchMetadata(
721782
ranges=[BlockRange(network='ethereum', start=140, end=160, hash='0xbbb')],
722783
ranges_complete=True, # Mark as complete so it gets tracked in state store
723784
),
724785
)
786+
watermark2 = ResponseBatch.data_batch(
787+
data=empty,
788+
metadata=BatchMetadata(
789+
ranges=[BlockRange(network='ethereum', start=140, end=160, hash='0xbbb')], ranges_complete=True
790+
),
791+
)
725792
response3 = ResponseBatch.data_batch(
726793
data=batch3,
727794
metadata=BatchMetadata(
728795
ranges=[BlockRange(network='ethereum', start=170, end=190, hash='0xccc')],
729796
ranges_complete=True, # Mark as complete so it gets tracked in state store
730797
),
731798
)
799+
watermark3 = ResponseBatch.data_batch(
800+
data=empty,
801+
metadata=BatchMetadata(
802+
ranges=[BlockRange(network='ethereum', start=170, end=190, hash='0xccc')], ranges_complete=True
803+
),
804+
)
732805

733-
# Load via streaming API
734-
stream = [response1, response2, response3]
806+
# Load via streaming API with watermarks (following server protocol)
807+
stream = [response1, watermark1, response2, watermark2, response3, watermark3]
735808
results = list(loader.load_stream_continuous(iter(stream), 'test_reorg_overlap'))
736-
assert len(results) == 3
809+
assert len(results) == 6 # 3 data batches + 3 watermarks
737810
assert all(r.success for r in results)
738811

739812
# Reorg from block 150 - should delete batches 2 and 3
@@ -761,32 +834,53 @@ def test_handle_reorg_version_history(self, delta_temp_config):
761834
batch2 = pa.RecordBatch.from_pydict({'id': [2], 'year': [2024], 'month': [1]})
762835
batch3 = pa.RecordBatch.from_pydict({'id': [3], 'year': [2024], 'month': [1]})
763836

837+
# Create empty batch for watermarks
838+
empty = pa.RecordBatch.from_pydict({'id': [], 'year': [], 'month': []})
839+
764840
response1 = ResponseBatch.data_batch(
765841
data=batch1,
766842
metadata=BatchMetadata(
767843
ranges=[BlockRange(network='ethereum', start=0, end=10, hash='0xaaa')],
768844
ranges_complete=True, # Mark as complete so it gets tracked in state store
769845
),
770846
)
847+
watermark1 = ResponseBatch.data_batch(
848+
data=empty,
849+
metadata=BatchMetadata(
850+
ranges=[BlockRange(network='ethereum', start=0, end=10, hash='0xaaa')], ranges_complete=True
851+
),
852+
)
771853
response2 = ResponseBatch.data_batch(
772854
data=batch2,
773855
metadata=BatchMetadata(
774856
ranges=[BlockRange(network='ethereum', start=50, end=60, hash='0xbbb')],
775857
ranges_complete=True, # Mark as complete so it gets tracked in state store
776858
),
777859
)
860+
watermark2 = ResponseBatch.data_batch(
861+
data=empty,
862+
metadata=BatchMetadata(
863+
ranges=[BlockRange(network='ethereum', start=50, end=60, hash='0xbbb')], ranges_complete=True
864+
),
865+
)
778866
response3 = ResponseBatch.data_batch(
779867
data=batch3,
780868
metadata=BatchMetadata(
781869
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xccc')],
782870
ranges_complete=True, # Mark as complete so it gets tracked in state store
783871
),
784872
)
873+
watermark3 = ResponseBatch.data_batch(
874+
data=empty,
875+
metadata=BatchMetadata(
876+
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xccc')], ranges_complete=True
877+
),
878+
)
785879

786-
# Load via streaming API
787-
stream = [response1, response2, response3]
880+
# Load via streaming API with watermarks (following server protocol)
881+
stream = [response1, watermark1, response2, watermark2, response3, watermark3]
788882
results = list(loader.load_stream_continuous(iter(stream), 'test_reorg_history'))
789-
assert len(results) == 3
883+
assert len(results) == 6 # 3 data batches + 3 watermarks
790884

791885
initial_version = loader._delta_table.version()
792886

@@ -828,6 +922,9 @@ def test_streaming_with_reorg(self, delta_temp_config):
828922
{'id': [3, 4], 'value': [300, 400], 'year': [2024, 2024], 'month': [1, 1]}
829923
)
830924

925+
# Create empty batch for watermarks
926+
empty = pa.RecordBatch.from_pydict({'id': [], 'value': [], 'year': [], 'month': []})
927+
831928
# Create response batches using factory methods (with hashes for proper state management)
832929
response1 = ResponseBatch.data_batch(
833930
data=data1,
@@ -836,6 +933,12 @@ def test_streaming_with_reorg(self, delta_temp_config):
836933
ranges_complete=True, # Mark as complete so it gets tracked in state store
837934
),
838935
)
936+
watermark1 = ResponseBatch.data_batch(
937+
data=empty,
938+
metadata=BatchMetadata(
939+
ranges=[BlockRange(network='ethereum', start=100, end=110, hash='0xabc123')], ranges_complete=True
940+
),
941+
)
839942

840943
response2 = ResponseBatch.data_batch(
841944
data=data2,
@@ -844,24 +947,34 @@ def test_streaming_with_reorg(self, delta_temp_config):
844947
ranges_complete=True, # Mark as complete so it gets tracked in state store
845948
),
846949
)
950+
watermark2 = ResponseBatch.data_batch(
951+
data=empty,
952+
metadata=BatchMetadata(
953+
ranges=[BlockRange(network='ethereum', start=150, end=160, hash='0xdef456')], ranges_complete=True
954+
),
955+
)
847956

848957
# Simulate reorg event using factory method
849958
reorg_response = ResponseBatch.reorg_batch(
850959
invalidation_ranges=[BlockRange(network='ethereum', start=150, end=200)]
851960
)
852961

853-
# Process streaming data
854-
stream = [response1, response2, reorg_response]
962+
# Process streaming data with watermarks (following server protocol)
963+
stream = [response1, watermark1, response2, watermark2, reorg_response]
855964
results = list(loader.load_stream_continuous(iter(stream), 'test_streaming_reorg'))
856965

857-
# Verify results
858-
assert len(results) == 3
966+
# Verify results: 2 data batches + 2 watermarks + 1 reorg = 5 results
967+
assert len(results) == 5
859968
assert results[0].success
860969
assert results[0].rows_loaded == 2
861970
assert results[1].success
862-
assert results[1].rows_loaded == 2
971+
assert results[1].rows_loaded == 0 # Watermark
863972
assert results[2].success
864-
assert results[2].is_reorg
973+
assert results[2].rows_loaded == 2
974+
assert results[3].success
975+
assert results[3].rows_loaded == 0 # Watermark
976+
assert results[4].success
977+
assert results[4].is_reorg
865978

866979
# Verify reorg deleted the second batch
867980
final_data = loader.query_table()

0 commit comments

Comments
 (0)