Skip to content

Commit 39ba730

Browse files
committed
more detailed logging
1 parent 8a971f3 commit 39ba730

File tree

1 file changed

+6
-7
lines changed

1 file changed

+6
-7
lines changed

airbyte_cdk/sources/message/concurrent_repository.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,17 @@ def __init__(self, queue: Queue[QueueItem], message_repository: MessageRepositor
3535
)
3636

3737
def emit_message(self, message: AirbyteMessage) -> None:
38-
if self._log_messages_for_testing:
39-
self._log_message(message)
4038
self._decorated_message_repository.emit_message(message)
4139
for message in self._decorated_message_repository.consume_queue():
4240
if self._log_messages_for_testing:
43-
self._log_message(message)
41+
self._log_message(message, "emit_message()")
4442
self._queue.put(message)
4543

4644
def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
4745
self._decorated_message_repository.log_message(level, message_provider)
4846
for message in self._decorated_message_repository.consume_queue():
4947
if self._log_messages_for_testing:
50-
self._log_message(message)
48+
self._log_message(message, "log_message()")
5149
self._queue.put(message)
5250

5351
def consume_queue(self) -> Iterable[AirbyteMessage]:
@@ -58,14 +56,15 @@ def consume_queue(self) -> Iterable[AirbyteMessage]:
5856
yield from []
5957

6058
@staticmethod
61-
def _log_message(message: AirbyteMessage) -> None:
59+
def _log_message(message: AirbyteMessage, calling_method: str) -> None:
6260
if message.type == MessageType.STATE:
6361
if message.state and message.state.stream:
62+
stream_name = message.state.stream.stream_descriptor.name
6463
state = message.state.stream.stream_state.__dict__
6564
logger.info(
66-
f"Processing and emitting message of type {message.type} with contents: {message.state.stream.stream_state.__dict__}"
65+
f"From {calling_method} -- emitting message of type {message.type} for stream {stream_name} with contents: {state}"
6766
)
6867
else:
6968
logger.info(
70-
f"Processing and emitting message of type {message.type} with contents: {message.__dict__}"
69+
f"From {calling_method} -- emitting message of type {message.type} with contents: {message.__dict__}"
7170
)

0 commit comments

Comments
 (0)