Skip to content

Commit e7b2cd7

Browse files
author
maxime.c
committed
log every item in the queue
1 parent 39ba730 commit e7b2cd7

File tree

2 files changed

+4
-3
lines changed

2 files changed

+4
-3
lines changed

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,6 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
155155
)
156156
self._record_counter[stream.name] += 1
157157
stream.cursor.observe(record)
158-
test_env = os.getenv("PYTEST_CURRENT_TEST")
159-
if test_env and "test_concurrent_declarative_source.py" in test_env:
160-
self._logger.info(f"Processing and emitting: {message.__dict__}")
161158
yield message
162159
yield from self._message_repository.consume_queue()
163160

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44
import concurrent
55
import logging
6+
import os
67
from queue import Queue
78
from typing import Iterable, Iterator, List, Optional
89

@@ -143,6 +144,9 @@ def _consume_from_queue(
143144
concurrent_stream_processor: ConcurrentReadProcessor,
144145
) -> Iterable[AirbyteMessage]:
145146
while airbyte_message_or_record_or_exception := queue.get():
147+
test_env = os.getenv("PYTEST_CURRENT_TEST")
148+
if test_env and "test_concurrent_declarative_source.py" in test_env:
149+
self._logger.info(f"Processing and emitting: {airbyte_message_or_record_or_exception.__dict__}")
146150
yield from self._handle_item(
147151
airbyte_message_or_record_or_exception,
148152
concurrent_stream_processor,

0 commit comments

Comments
 (0)