Skip to content

Commit b5c3fb9

Browse files
committed
Fix pending batch being skipped after reorg detection
Move the pending batch check to BEFORE fetching a new batch from the stream. Previously, after a reorg was detected: 1. The data batch was stored as _pending_batch 2. A reorg batch was returned 3. On next __next__() call, a NEW batch was fetched BEFORE checking _pending_batch 4. This caused the pending batch to be lost or returned out of order Now the pending batch check happens first, ensuring proper ordering: reorg_batch -> pending_data_batch -> next_batch
1 parent 8ce25f1 commit b5c3fb9

File tree

1 file changed

+6
-7
lines changed

1 file changed

+6
-7
lines changed

src/amp/streaming/reorg.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ def __next__(self) -> ResponseBatch:
4646
KeyboardInterrupt: When user cancels the stream
4747
"""
4848
try:
49+
# Check if we have a pending batch from a previous reorg detection
50+
if hasattr(self, '_pending_batch'):
51+
pending = self._pending_batch
52+
delattr(self, '_pending_batch')
53+
return pending
54+
4955
# Get next batch from underlying stream
5056
batch = next(self.stream_iterator)
5157

@@ -70,13 +76,6 @@ def __next__(self) -> ResponseBatch:
7076
self._pending_batch = batch
7177
return ResponseBatch.reorg_batch(invalidation_ranges)
7278

73-
# Check if we have a pending batch from a previous reorg detection
74-
# REVIEW: I think we should remove this
75-
if hasattr(self, '_pending_batch'):
76-
pending = self._pending_batch
77-
delattr(self, '_pending_batch')
78-
return pending
79-
8079
# Normal case - just return the data batch
8180
return batch
8281

0 commit comments

Comments
 (0)