Skip to content

Commit aea02e1

Browse files
authored
Feat: Enable logging when new stream starts (#288)
1 parent 8676125 commit aea02e1

File tree

2 files changed

+8
-1
lines changed

2 files changed

+8
-1
lines changed

airbyte/sources/base.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def __init__(
7575
self.executor = executor
7676
self.name = name
7777
self._processed_records = 0
78+
self._stream_names_observed: set[str] = set()
7879
self._config_dict: dict[str, Any] | None = None
7980
self._last_log_messages: list[str] = []
8081
self._discovered_catalog: AirbyteCatalog | None = None
@@ -582,6 +583,9 @@ def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]:
582583
message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line)
583584
if message.type is Type.RECORD:
584585
self._processed_records += 1
586+
if message.record.stream not in self._stream_names_observed:
587+
self._stream_names_observed.add(message.record.stream)
588+
self._log_stream_read_start(message.record.stream)
585589
if message.type == Type.LOG:
586590
self._add_to_logs(message.log.message)
587591
if message.type == Type.TRACE and message.trace.type == TraceType.ERROR:
@@ -620,6 +624,9 @@ def _log_sync_start(
620624
event_type=EventType.SYNC,
621625
)
622626

627+
def _log_stream_read_start(self, stream: str) -> None:
628+
print(f"Read started on stream: {stream} at {pendulum.now().format('HH:mm:ss')}...")
629+
623630
def _log_sync_success(
624631
self,
625632
*,

tests/unit_tests/test_lowcode_connectors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@ def test_nocode_execution(connector_name: str, config: dict) -> None:
2828
source.read()
2929
for name, records in source.read().streams.items():
3030
assert name
31-
assert len(records) > 0
31+
assert len(records) > 0, f"No records were returned from the '{name}' stream."

0 commit comments

Comments
 (0)