11# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2-
2+ import logging
3+ import os
34from queue import Queue
45from typing import Callable , Iterable
56
67from airbyte_cdk .models import AirbyteMessage , Level
8+ from airbyte_cdk .models import Type as MessageType
79from airbyte_cdk .sources .message .repository import LogMessage , MessageRepository
810from airbyte_cdk .sources .streams .concurrent .partitions .types import QueueItem
911
12+ logger = logging .getLogger ("airbyte" )
13+
1014
1115class ConcurrentMessageRepository (MessageRepository ):
1216 """
@@ -25,14 +29,25 @@ def __init__(self, queue: Queue[QueueItem], message_repository: MessageRepositor
2529 self ._queue = queue
2630 self ._decorated_message_repository = message_repository
2731
32+ test_env = os .getenv ("PYTEST_CURRENT_TEST" )
33+ self ._log_messages_for_testing = (
34+ test_env and "test_concurrent_declarative_source.py" in test_env
35+ )
36+
2837 def emit_message (self , message : AirbyteMessage ) -> None :
38+ if self ._log_messages_for_testing :
39+ self ._log_message (message )
2940 self ._decorated_message_repository .emit_message (message )
3041 for message in self ._decorated_message_repository .consume_queue ():
42+ if self ._log_messages_for_testing :
43+ self ._log_message (message )
3144 self ._queue .put (message )
3245
3346 def log_message (self , level : Level , message_provider : Callable [[], LogMessage ]) -> None :
3447 self ._decorated_message_repository .log_message (level , message_provider )
3548 for message in self ._decorated_message_repository .consume_queue ():
49+ if self ._log_messages_for_testing :
50+ self ._log_message (message )
3651 self ._queue .put (message )
3752
3853 def consume_queue (self ) -> Iterable [AirbyteMessage ]:
@@ -41,3 +56,16 @@ def consume_queue(self) -> Iterable[AirbyteMessage]:
4156 loading messages onto the queue processed on the main thread.
4257 """
4358 yield from []
59+
60+ @staticmethod
61+ def _log_message (message : AirbyteMessage ) -> None :
62+ if message .type == MessageType .STATE :
63+ if message .state and message .state .stream :
64+ state = message .state .stream .stream_state .__dict__
65+ logger .info (
66+ f"Processing and emitting message of type { message .type } with contents: { message .state .stream .stream_state .__dict__ } "
67+ )
68+ else :
69+ logger .info (
70+ f"Processing and emitting message of type { message .type } with contents: { message .__dict__ } "
71+ )
0 commit comments