@@ -591,28 +591,50 @@ def test_handle_reorg_single_network(self, delta_temp_config):
591591 ranges_complete = True , # Mark as complete so it gets tracked in state store
592592 ),
593593 )
594+ watermark1 = ResponseBatch .data_batch (
595+ data = pa .RecordBatch .from_pydict ({'id' : [], 'block_num' : [], 'year' : [], 'month' : []}),
596+ metadata = BatchMetadata (
597+ ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xabc' )], ranges_complete = True
598+ ),
599+ )
594600 response2 = ResponseBatch .data_batch (
595601 data = batch2 ,
596602 metadata = BatchMetadata (
597603 ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 160 , hash = '0xdef' )],
598604 ranges_complete = True , # Mark as complete so it gets tracked in state store
599605 ),
600606 )
607+ watermark2 = ResponseBatch .data_batch (
608+ data = pa .RecordBatch .from_pydict ({'id' : [], 'block_num' : [], 'year' : [], 'month' : []}),
609+ metadata = BatchMetadata (
610+ ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 160 , hash = '0xdef' )], ranges_complete = True
611+ ),
612+ )
601613 response3 = ResponseBatch .data_batch (
602614 data = batch3 ,
603615 metadata = BatchMetadata (
604616 ranges = [BlockRange (network = 'ethereum' , start = 200 , end = 210 , hash = '0x123' )],
605617 ranges_complete = True , # Mark as complete so it gets tracked in state store
606618 ),
607619 )
620+ watermark3 = ResponseBatch .data_batch (
621+ data = pa .RecordBatch .from_pydict ({'id' : [], 'block_num' : [], 'year' : [], 'month' : []}),
622+ metadata = BatchMetadata (
623+ ranges = [BlockRange (network = 'ethereum' , start = 200 , end = 210 , hash = '0x123' )], ranges_complete = True
624+ ),
625+ )
608626
609- # Load via streaming API
610- stream = [response1 , response2 , response3 ]
627+ # Load via streaming API with watermarks (following server protocol)
628+ stream = [response1 , watermark1 , response2 , watermark2 , response3 , watermark3 ]
611629 results = list (loader .load_stream_continuous (iter (stream ), 'test_reorg_single' ))
612- assert len (results ) == 3
630+ # Expect 6 results: 3 data batches + 3 watermarks
631+ assert len (results ) == 6
613632 assert all (r .success for r in results )
633+ # Verify only data batches loaded rows (watermarks have 0 rows)
634+ data_results = [r for r in results if r .rows_loaded > 0 ]
635+ assert len (data_results ) == 3
614636
615- # Verify all data exists
637+ # Verify all data exists in table
616638 initial_data = loader .query_table ()
617639 assert initial_data .num_rows == 3
618640
@@ -643,40 +665,70 @@ def test_handle_reorg_multi_network(self, delta_temp_config):
643665 batch3 = pa .RecordBatch .from_pydict ({'id' : [3 ], 'network' : ['ethereum' ], 'year' : [2024 ], 'month' : [1 ]})
644666 batch4 = pa .RecordBatch .from_pydict ({'id' : [4 ], 'network' : ['polygon' ], 'year' : [2024 ], 'month' : [1 ]})
645667
646- # Create response batches with network-specific ranges
668+ # Create response batches with network-specific ranges and watermarks
669+ empty = pa .RecordBatch .from_pydict ({'id' : [], 'network' : [], 'year' : [], 'month' : []})
670+
647671 response1 = ResponseBatch .data_batch (
648672 data = batch1 ,
649673 metadata = BatchMetadata (
650674 ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xaaa' )],
651675 ranges_complete = True , # Mark as complete so it gets tracked in state store
652676 ),
653677 )
678+ watermark1 = ResponseBatch .data_batch (
679+ data = empty ,
680+ metadata = BatchMetadata (
681+ ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xaaa' )], ranges_complete = True
682+ ),
683+ )
684+
654685 response2 = ResponseBatch .data_batch (
655686 data = batch2 ,
656687 metadata = BatchMetadata (
657688 ranges = [BlockRange (network = 'polygon' , start = 100 , end = 110 , hash = '0xbbb' )],
658689 ranges_complete = True , # Mark as complete so it gets tracked in state store
659690 ),
660691 )
692+ watermark2 = ResponseBatch .data_batch (
693+ data = empty ,
694+ metadata = BatchMetadata (
695+ ranges = [BlockRange (network = 'polygon' , start = 100 , end = 110 , hash = '0xbbb' )], ranges_complete = True
696+ ),
697+ )
698+
661699 response3 = ResponseBatch .data_batch (
662700 data = batch3 ,
663701 metadata = BatchMetadata (
664702 ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 160 , hash = '0xccc' )],
665703 ranges_complete = True , # Mark as complete so it gets tracked in state store
666704 ),
667705 )
706+ watermark3 = ResponseBatch .data_batch (
707+ data = empty ,
708+ metadata = BatchMetadata (
709+ ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 160 , hash = '0xccc' )], ranges_complete = True
710+ ),
711+ )
712+
668713 response4 = ResponseBatch .data_batch (
669714 data = batch4 ,
670715 metadata = BatchMetadata (
671716 ranges = [BlockRange (network = 'polygon' , start = 150 , end = 160 , hash = '0xddd' )],
672717 ranges_complete = True , # Mark as complete so it gets tracked in state store
673718 ),
674719 )
720+ watermark4 = ResponseBatch .data_batch (
721+ data = empty ,
722+ metadata = BatchMetadata (
723+ ranges = [BlockRange (network = 'polygon' , start = 150 , end = 160 , hash = '0xddd' )], ranges_complete = True
724+ ),
725+ )
675726
676- # Load via streaming API
677- stream = [response1 , response2 , response3 , response4 ]
727+ # Load via streaming API with watermarks
728+ stream = [response1 , watermark1 , response2 , watermark2 , response3 , watermark3 , response4 , watermark4 ]
678729 results = list (loader .load_stream_continuous (iter (stream ), 'test_reorg_multi' ))
679- assert len (results ) == 4
730+ # Expect 8 results: 4 data batches + 4 watermarks
731+ assert len (results ) == 8
680732 assert all (r .success for r in results )
681733
682734 # Reorg only ethereum from block 150
@@ -705,6 +757,9 @@ def test_handle_reorg_overlapping_ranges(self, delta_temp_config):
705757 batch2 = pa .RecordBatch .from_pydict ({'id' : [2 ], 'year' : [2024 ], 'month' : [1 ]})
706758 batch3 = pa .RecordBatch .from_pydict ({'id' : [3 ], 'year' : [2024 ], 'month' : [1 ]})
707759
760+ # Create empty batch for watermarks
761+ empty = pa .RecordBatch .from_pydict ({'id' : [], 'year' : [], 'month' : []})
762+
708763 # Batch 1: 90-110 (ends before reorg start of 150)
709764 # Batch 2: 140-160 (overlaps with reorg)
710765 # Batch 3: 170-190 (after reorg, but should be deleted as 170 >= 150)
@@ -715,25 +770,43 @@ def test_handle_reorg_overlapping_ranges(self, delta_temp_config):
715770 ranges_complete = True , # Mark as complete so it gets tracked in state store
716771 ),
717772 )
773+ watermark1 = ResponseBatch .data_batch (
774+ data = empty ,
775+ metadata = BatchMetadata (
776+ ranges = [BlockRange (network = 'ethereum' , start = 90 , end = 110 , hash = '0xaaa' )], ranges_complete = True
777+ ),
778+ )
718779 response2 = ResponseBatch .data_batch (
719780 data = batch2 ,
720781 metadata = BatchMetadata (
721782 ranges = [BlockRange (network = 'ethereum' , start = 140 , end = 160 , hash = '0xbbb' )],
722783 ranges_complete = True , # Mark as complete so it gets tracked in state store
723784 ),
724785 )
786+ watermark2 = ResponseBatch .data_batch (
787+ data = empty ,
788+ metadata = BatchMetadata (
789+ ranges = [BlockRange (network = 'ethereum' , start = 140 , end = 160 , hash = '0xbbb' )], ranges_complete = True
790+ ),
791+ )
725792 response3 = ResponseBatch .data_batch (
726793 data = batch3 ,
727794 metadata = BatchMetadata (
728795 ranges = [BlockRange (network = 'ethereum' , start = 170 , end = 190 , hash = '0xccc' )],
729796 ranges_complete = True , # Mark as complete so it gets tracked in state store
730797 ),
731798 )
799+ watermark3 = ResponseBatch .data_batch (
800+ data = empty ,
801+ metadata = BatchMetadata (
802+ ranges = [BlockRange (network = 'ethereum' , start = 170 , end = 190 , hash = '0xccc' )], ranges_complete = True
803+ ),
804+ )
732805
733- # Load via streaming API
734- stream = [response1 , response2 , response3 ]
806+ # Load via streaming API with watermarks (following server protocol)
807+ stream = [response1 , watermark1 , response2 , watermark2 , response3 , watermark3 ]
735808 results = list (loader .load_stream_continuous (iter (stream ), 'test_reorg_overlap' ))
736- assert len (results ) == 3
809+ assert len (results ) == 6 # 3 data batches + 3 watermarks
737810 assert all (r .success for r in results )
738811
739812 # Reorg from block 150 - should delete batches 2 and 3
@@ -761,32 +834,53 @@ def test_handle_reorg_version_history(self, delta_temp_config):
761834 batch2 = pa .RecordBatch .from_pydict ({'id' : [2 ], 'year' : [2024 ], 'month' : [1 ]})
762835 batch3 = pa .RecordBatch .from_pydict ({'id' : [3 ], 'year' : [2024 ], 'month' : [1 ]})
763836
837+ # Create empty batch for watermarks
838+ empty = pa .RecordBatch .from_pydict ({'id' : [], 'year' : [], 'month' : []})
839+
764840 response1 = ResponseBatch .data_batch (
765841 data = batch1 ,
766842 metadata = BatchMetadata (
767843 ranges = [BlockRange (network = 'ethereum' , start = 0 , end = 10 , hash = '0xaaa' )],
768844 ranges_complete = True , # Mark as complete so it gets tracked in state store
769845 ),
770846 )
847+ watermark1 = ResponseBatch .data_batch (
848+ data = empty ,
849+ metadata = BatchMetadata (
850+ ranges = [BlockRange (network = 'ethereum' , start = 0 , end = 10 , hash = '0xaaa' )], ranges_complete = True
851+ ),
852+ )
771853 response2 = ResponseBatch .data_batch (
772854 data = batch2 ,
773855 metadata = BatchMetadata (
774856 ranges = [BlockRange (network = 'ethereum' , start = 50 , end = 60 , hash = '0xbbb' )],
775857 ranges_complete = True , # Mark as complete so it gets tracked in state store
776858 ),
777859 )
860+ watermark2 = ResponseBatch .data_batch (
861+ data = empty ,
862+ metadata = BatchMetadata (
863+ ranges = [BlockRange (network = 'ethereum' , start = 50 , end = 60 , hash = '0xbbb' )], ranges_complete = True
864+ ),
865+ )
778866 response3 = ResponseBatch .data_batch (
779867 data = batch3 ,
780868 metadata = BatchMetadata (
781869 ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xccc' )],
782870 ranges_complete = True , # Mark as complete so it gets tracked in state store
783871 ),
784872 )
873+ watermark3 = ResponseBatch .data_batch (
874+ data = empty ,
875+ metadata = BatchMetadata (
876+ ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xccc' )], ranges_complete = True
877+ ),
878+ )
785879
786- # Load via streaming API
787- stream = [response1 , response2 , response3 ]
880+ # Load via streaming API with watermarks (following server protocol)
881+ stream = [response1 , watermark1 , response2 , watermark2 , response3 , watermark3 ]
788882 results = list (loader .load_stream_continuous (iter (stream ), 'test_reorg_history' ))
789- assert len (results ) == 3
883+ assert len (results ) == 6 # 3 data batches + 3 watermarks
790884
791885 initial_version = loader ._delta_table .version ()
792886
@@ -828,6 +922,9 @@ def test_streaming_with_reorg(self, delta_temp_config):
828922 {'id' : [3 , 4 ], 'value' : [300 , 400 ], 'year' : [2024 , 2024 ], 'month' : [1 , 1 ]}
829923 )
830924
925+ # Create empty batch for watermarks
926+ empty = pa .RecordBatch .from_pydict ({'id' : [], 'value' : [], 'year' : [], 'month' : []})
927+
831928 # Create response batches using factory methods (with hashes for proper state management)
832929 response1 = ResponseBatch .data_batch (
833930 data = data1 ,
@@ -836,6 +933,12 @@ def test_streaming_with_reorg(self, delta_temp_config):
836933 ranges_complete = True , # Mark as complete so it gets tracked in state store
837934 ),
838935 )
936+ watermark1 = ResponseBatch .data_batch (
937+ data = empty ,
938+ metadata = BatchMetadata (
939+ ranges = [BlockRange (network = 'ethereum' , start = 100 , end = 110 , hash = '0xabc123' )], ranges_complete = True
940+ ),
941+ )
839942
840943 response2 = ResponseBatch .data_batch (
841944 data = data2 ,
@@ -844,24 +947,34 @@ def test_streaming_with_reorg(self, delta_temp_config):
844947 ranges_complete = True , # Mark as complete so it gets tracked in state store
845948 ),
846949 )
950+ watermark2 = ResponseBatch .data_batch (
951+ data = empty ,
952+ metadata = BatchMetadata (
953+ ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 160 , hash = '0xdef456' )], ranges_complete = True
954+ ),
955+ )
847956
848957 # Simulate reorg event using factory method
849958 reorg_response = ResponseBatch .reorg_batch (
850959 invalidation_ranges = [BlockRange (network = 'ethereum' , start = 150 , end = 200 )]
851960 )
852961
853- # Process streaming data
854- stream = [response1 , response2 , reorg_response ]
962+ # Process streaming data with watermarks (following server protocol)
963+ stream = [response1 , watermark1 , response2 , watermark2 , reorg_response ]
855964 results = list (loader .load_stream_continuous (iter (stream ), 'test_streaming_reorg' ))
856965
857- # Verify results
858- assert len (results ) == 3
966+ # Verify results: 2 data batches + 2 watermarks + 1 reorg = 5 results
967+ assert len (results ) == 5
859968 assert results [0 ].success
860969 assert results [0 ].rows_loaded == 2
861970 assert results [1 ].success
862- assert results [1 ].rows_loaded == 2
971+ assert results [1 ].rows_loaded == 0 # Watermark
863972 assert results [2 ].success
864- assert results [2 ].is_reorg
973+ assert results [2 ].rows_loaded == 2
974+ assert results [3 ].success
975+ assert results [3 ].rows_loaded == 0 # Watermark
976+ assert results [4 ].success
977+ assert results [4 ].is_reorg
865978
866979 # Verify reorg deleted the second batch
867980 final_data = loader .query_table ()
0 commit comments