Skip to content

Commit 8d45a01

Browse files
committed
Fix incorrect duplicate batch skipping, Fix pending batch being skipped
1 parent 075c017 commit 8d45a01

File tree

1 file changed

+6
-14
lines changed

1 file changed

+6
-14
lines changed

src/amp/streaming/reorg.py

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,15 @@ def __next__(self) -> ResponseBatch:
4646
KeyboardInterrupt: When user cancels the stream
4747
"""
4848
try:
49+
# Check if we have a pending batch from a previous reorg detection
50+
if hasattr(self, '_pending_batch'):
51+
pending = self._pending_batch
52+
delattr(self, '_pending_batch')
53+
return pending
54+
4955
# Get next batch from underlying stream
5056
batch = next(self.stream_iterator)
5157

52-
# Note: ranges_complete flag is handled by CheckpointStore in load_stream_continuous
53-
# Check if this batch contains only duplicate ranges
54-
if self._is_duplicate_batch(batch.metadata.ranges):
55-
self.logger.debug(f'Skipping duplicate batch with ranges: {batch.metadata.ranges}')
56-
# Recursively call to get the next non-duplicate batch
57-
return self.__next__()
58-
5958
# Detect reorgs by comparing with previous ranges
6059
invalidation_ranges = self._detect_reorg(batch.metadata.ranges)
6160

@@ -70,13 +69,6 @@ def __next__(self) -> ResponseBatch:
7069
self._pending_batch = batch
7170
return ResponseBatch.reorg_batch(invalidation_ranges)
7271

73-
# Check if we have a pending batch from a previous reorg detection
74-
# REVIEW: I think we should remove this
75-
if hasattr(self, '_pending_batch'):
76-
pending = self._pending_batch
77-
delattr(self, '_pending_batch')
78-
return pending
79-
8072
# Normal case - just return the data batch
8173
return batch
8274

0 commit comments

Comments
 (0)