@@ -588,22 +588,44 @@ def test_handle_reorg_single_network(self, delta_temp_config):
588588 data = batch1 ,
589589 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xabc' )]),
590590 )
591+ watermark1 = ResponseBatch .data_batch (
592+ data = pa .RecordBatch .from_pydict ({'id' : [], 'block_num' : [], 'year' : [], 'month' : []}),
593+ metadata = BatchMetadata (
594+ ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xabc' )], ranges_complete = True
595+ ),
596+ )
591597 response2 = ResponseBatch .data_batch (
592598 data = batch2 ,
593599 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 160 , hash = '0xdef' )]),
594600 )
601+ watermark2 = ResponseBatch .data_batch (
602+ data = pa .RecordBatch .from_pydict ({'id' : [], 'block_num' : [], 'year' : [], 'month' : []}),
603+ metadata = BatchMetadata (
604+ ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 160 , hash = '0xdef' )], ranges_complete = True
605+ ),
606+ )
595607 response3 = ResponseBatch .data_batch (
596608 data = batch3 ,
597609 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 200 , end = 210 , hash = '0x123' )]),
598610 )
611+ watermark3 = ResponseBatch .data_batch (
612+ data = pa .RecordBatch .from_pydict ({'id' : [], 'block_num' : [], 'year' : [], 'month' : []}),
613+ metadata = BatchMetadata (
614+ ranges = [BlockRange (network = 'ethereum' , start = 200 , end = 210 , hash = '0x123' )], ranges_complete = True
615+ ),
616+ )
599617
600- # Load via streaming API
601- stream = [response1 , response2 , response3 ]
618+ # Load via streaming API with watermarks (following server protocol)
619+ stream = [response1 , watermark1 , response2 , watermark2 , response3 , watermark3 ]
602620 results = list (loader .load_stream_continuous (iter (stream ), 'test_reorg_single' ))
603- assert len (results ) == 3
621+ # Expect 6 results: 3 data batches + 3 watermarks
622+ assert len (results ) == 6
604623 assert all (r .success for r in results )
624+ # Verify only data batches loaded rows (watermarks have 0 rows)
625+ data_results = [r for r in results if r .rows_loaded > 0 ]
626+ assert len (data_results ) == 3
605627
606- # Verify all data exists
628+ # Verify all data exists in table
607629 initial_data = loader .query_table ()
608630 assert initial_data .num_rows == 3
609631
@@ -634,28 +656,58 @@ def test_handle_reorg_multi_network(self, delta_temp_config):
634656 batch3 = pa .RecordBatch .from_pydict ({'id' : [3 ], 'network' : ['ethereum' ], 'year' : [2024 ], 'month' : [1 ]})
635657 batch4 = pa .RecordBatch .from_pydict ({'id' : [4 ], 'network' : ['polygon' ], 'year' : [2024 ], 'month' : [1 ]})
636658
637- # Create response batches with network-specific ranges
659+ # Create response batches with network-specific ranges and watermarks
660+ empty = pa .RecordBatch .from_pydict ({'id' : [], 'network' : [], 'year' : [], 'month' : []})
661+
638662 response1 = ResponseBatch .data_batch (
639663 data = batch1 ,
640664 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xaaa' )]),
641665 )
666+ watermark1 = ResponseBatch .data_batch (
667+ data = empty ,
668+ metadata = BatchMetadata (
669+ ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xaaa' )], ranges_complete = True
670+ ),
671+ )
672+
642673 response2 = ResponseBatch .data_batch (
643674 data = batch2 ,
644675 metadata = BatchMetadata (ranges = [BlockRange (network = 'polygon' , start = 100 , end = 110 , hash = '0xbbb' )]),
645676 )
677+ watermark2 = ResponseBatch .data_batch (
678+ data = empty ,
679+ metadata = BatchMetadata (
680+ ranges = [BlockRange (network = 'polygon' , start = 100 , end = 110 , hash = '0xbbb' )], ranges_complete = True
681+ ),
682+ )
683+
646684 response3 = ResponseBatch .data_batch (
647685 data = batch3 ,
648686 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 160 , hash = '0xccc' )]),
649687 )
688+ watermark3 = ResponseBatch .data_batch (
689+ data = empty ,
690+ metadata = BatchMetadata (
691+ ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 160 , hash = '0xccc' )], ranges_complete = True
692+ ),
693+ )
694+
650695 response4 = ResponseBatch .data_batch (
651696 data = batch4 ,
652697 metadata = BatchMetadata (ranges = [BlockRange (network = 'polygon' , start = 150 , end = 160 , hash = '0xddd' )]),
653698 )
699+ watermark4 = ResponseBatch .data_batch (
700+ data = empty ,
701+ metadata = BatchMetadata (
702+ ranges = [BlockRange (network = 'polygon' , start = 150 , end = 160 , hash = '0xddd' )], ranges_complete = True
703+ ),
704+ )
654705
655- # Load via streaming API
656- stream = [response1 , response2 , response3 , response4 ]
706+ # Load via streaming API with watermarks
707+ stream = [response1 , watermark1 , response2 , watermark2 , response3 , watermark3 , response4 , watermark4 ]
657708 results = list (loader .load_stream_continuous (iter (stream ), 'test_reorg_multi' ))
658- assert len (results ) == 4
709+ # Expect 8 results: 4 data batches + 4 watermarks
710+ assert len (results ) == 8
659711 assert all (r .success for r in results )
660712
661713 # Reorg only ethereum from block 150
@@ -684,26 +736,47 @@ def test_handle_reorg_overlapping_ranges(self, delta_temp_config):
684736 batch2 = pa .RecordBatch .from_pydict ({'id' : [2 ], 'year' : [2024 ], 'month' : [1 ]})
685737 batch3 = pa .RecordBatch .from_pydict ({'id' : [3 ], 'year' : [2024 ], 'month' : [1 ]})
686738
739+ # Create empty batch for watermarks
740+ empty = pa .RecordBatch .from_pydict ({'id' : [], 'year' : [], 'month' : []})
741+
687742 # Batch 1: 90-110 (ends before reorg start of 150)
688743 # Batch 2: 140-160 (overlaps with reorg)
689744 # Batch 3: 170-190 (after reorg, but should be deleted as 170 >= 150)
690745 response1 = ResponseBatch .data_batch (
691746 data = batch1 ,
692747 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 90 , end = 110 , hash = '0xaaa' )]),
693748 )
749+ watermark1 = ResponseBatch .data_batch (
750+ data = empty ,
751+ metadata = BatchMetadata (
752+ ranges = [BlockRange (network = 'ethereum' , start = 90 , end = 110 , hash = '0xaaa' )], ranges_complete = True
753+ ),
754+ )
694755 response2 = ResponseBatch .data_batch (
695756 data = batch2 ,
696757 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 140 , end = 160 , hash = '0xbbb' )]),
697758 )
759+ watermark2 = ResponseBatch .data_batch (
760+ data = empty ,
761+ metadata = BatchMetadata (
762+ ranges = [BlockRange (network = 'ethereum' , start = 140 , end = 160 , hash = '0xbbb' )], ranges_complete = True
763+ ),
764+ )
698765 response3 = ResponseBatch .data_batch (
699766 data = batch3 ,
700767 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 170 , end = 190 , hash = '0xccc' )]),
701768 )
769+ watermark3 = ResponseBatch .data_batch (
770+ data = empty ,
771+ metadata = BatchMetadata (
772+ ranges = [BlockRange (network = 'ethereum' , start = 170 , end = 190 , hash = '0xccc' )], ranges_complete = True
773+ ),
774+ )
702775
703- # Load via streaming API
704- stream = [response1 , response2 , response3 ]
776+ # Load via streaming API with watermarks (following server protocol)
777+ stream = [response1 , watermark1 , response2 , watermark2 , response3 , watermark3 ]
705778 results = list (loader .load_stream_continuous (iter (stream ), 'test_reorg_overlap' ))
706- assert len (results ) == 3
779+ assert len (results ) == 6 # 3 data batches + 3 watermarks
707780 assert all (r .success for r in results )
708781
709782 # Reorg from block 150 - should delete batches 2 and 3
@@ -731,23 +804,44 @@ def test_handle_reorg_version_history(self, delta_temp_config):
731804 batch2 = pa .RecordBatch .from_pydict ({'id' : [2 ], 'year' : [2024 ], 'month' : [1 ]})
732805 batch3 = pa .RecordBatch .from_pydict ({'id' : [3 ], 'year' : [2024 ], 'month' : [1 ]})
733806
807+ # Create empty batch for watermarks
808+ empty = pa .RecordBatch .from_pydict ({'id' : [], 'year' : [], 'month' : []})
809+
734810 response1 = ResponseBatch .data_batch (
735811 data = batch1 ,
736812 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 0 , end = 10 , hash = '0xaaa' )]),
737813 )
814+ watermark1 = ResponseBatch .data_batch (
815+ data = empty ,
816+ metadata = BatchMetadata (
817+ ranges = [BlockRange (network = 'ethereum' , start = 0 , end = 10 , hash = '0xaaa' )], ranges_complete = True
818+ ),
819+ )
738820 response2 = ResponseBatch .data_batch (
739821 data = batch2 ,
740822 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 50 , end = 60 , hash = '0xbbb' )]),
741823 )
824+ watermark2 = ResponseBatch .data_batch (
825+ data = empty ,
826+ metadata = BatchMetadata (
827+ ranges = [BlockRange (network = 'ethereum' , start = 50 , end = 60 , hash = '0xbbb' )], ranges_complete = True
828+ ),
829+ )
742830 response3 = ResponseBatch .data_batch (
743831 data = batch3 ,
744832 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xccc' )]),
745833 )
834+ watermark3 = ResponseBatch .data_batch (
835+ data = empty ,
836+ metadata = BatchMetadata (
837+ ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xccc' )], ranges_complete = True
838+ ),
839+ )
746840
747- # Load via streaming API
748- stream = [response1 , response2 , response3 ]
841+ # Load via streaming API with watermarks (following server protocol)
842+ stream = [response1 , watermark1 , response2 , watermark2 , response3 , watermark3 ]
749843 results = list (loader .load_stream_continuous (iter (stream ), 'test_reorg_history' ))
750- assert len (results ) == 3
844+ assert len (results ) == 6 # 3 data batches + 3 watermarks
751845
752846 initial_version = loader ._delta_table .version ()
753847
@@ -789,34 +883,53 @@ def test_streaming_with_reorg(self, delta_temp_config):
789883 {'id' : [3 , 4 ], 'value' : [300 , 400 ], 'year' : [2024 , 2024 ], 'month' : [1 , 1 ]}
790884 )
791885
886+ # Create empty batch for watermarks
887+ empty = pa .RecordBatch .from_pydict ({'id' : [], 'value' : [], 'year' : [], 'month' : []})
888+
792889 # Create response batches using factory methods (with hashes for proper state management)
793890 response1 = ResponseBatch .data_batch (
794891 data = data1 ,
795892 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xabc123' )]),
796893 )
894+ watermark1 = ResponseBatch .data_batch (
895+ data = empty ,
896+ metadata = BatchMetadata (
897+ ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xabc123' )], ranges_complete = True
898+ ),
899+ )
797900
798901 response2 = ResponseBatch .data_batch (
799902 data = data2 ,
800903 metadata = BatchMetadata (ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 160 , hash = '0xdef456' )]),
801904 )
905+ watermark2 = ResponseBatch .data_batch (
906+ data = empty ,
907+ metadata = BatchMetadata (
908+ ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 160 , hash = '0xdef456' )], ranges_complete = True
909+ ),
910+ )
802911
803912 # Simulate reorg event using factory method
804913 reorg_response = ResponseBatch .reorg_batch (
805914 invalidation_ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 200 )]
806915 )
807916
808- # Process streaming data
809- stream = [response1 , response2 , reorg_response ]
917+ # Process streaming data with watermarks (following server protocol)
918+ stream = [response1 , watermark1 , response2 , watermark2 , reorg_response ]
810919 results = list (loader .load_stream_continuous (iter (stream ), 'test_streaming_reorg' ))
811920
812- # Verify results
813- assert len (results ) == 3
921+ # Verify results: 2 data batches + 2 watermarks + 1 reorg = 5 results
922+ assert len (results ) == 5
814923 assert results [0 ].success
815924 assert results [0 ].rows_loaded == 2
816925 assert results [1 ].success
817- assert results [1 ].rows_loaded == 2
926+ assert results [1 ].rows_loaded == 0 # Watermark
818927 assert results [2 ].success
819- assert results [2 ].is_reorg
928+ assert results [2 ].rows_loaded == 2
929+ assert results [3 ].success
930+ assert results [3 ].rows_loaded == 0 # Watermark
931+ assert results [4 ].success
932+ assert results [4 ].is_reorg
820933
821934 # Verify reorg deleted the second batch
822935 final_data = loader .query_table ()
0 commit comments