Skip to content

Commit c7562b8

Browse files
committed
add code to consume what is in the queue to avoid a deadlock when the number of records exceed the queue size
1 parent 15c8cde commit c7562b8

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

airbyte_cdk/connector_builder/test_reader/message_grouper.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,17 @@ def get_message_groups(
9595
latest_state_message: Optional[Dict[str, Any]] = None
9696
slice_auxiliary_requests: List[AuxiliaryRequest] = []
9797

98-
while records_count < limit and (message := next(messages, None)):
98+
while message := next(messages, None):
99+
# Even though we do not emit records beyond the limit in the message group response, we still
100+
# need to process messages off the queue in order to avoid a deadlock occurs if the amount
101+
# of extracted records exceeds the size of the queue (which has a default of 10,000)
102+
#
103+
# A few other options considered was killing the thread pool, but that doesn't kill in-progress
104+
# in-progress threads. We also considered adding another event to the main queue, but this is
105+
# the simplest solution for the time being.
106+
if records_count >= limit:
107+
continue
108+
99109
json_message = airbyte_message_to_json(message)
100110

101111
if is_page_http_request_for_different_stream(json_message, stream_name):

0 commit comments

Comments
 (0)