Skip to content

Commit 1b3a595

Browse files
committed
finish emitting any records still in the queue before is_done() or raising exceptions
1 parent 737b22c commit 1b3a595

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,10 @@ def _consume_from_queue(
147147
airbyte_message_or_record_or_exception,
148148
concurrent_stream_processor,
149149
)
150-
if concurrent_stream_processor.is_done() and queue.empty():
150+
# In the event that a partition raises an exception, anything remaining in
151+
# the queue will be missed because is_done() can raise an exception and exit
152+
# out of this loop before remaining items are consumed
153+
if queue.empty() and concurrent_stream_processor.is_done():
151154
# all partitions were generated and processed. we're done here
152155
break
153156

0 commit comments

Comments
 (0)