Skip to content

Commit 2fae5a3

Browse files
authored
Fix: More accurate perf metrics for slow-starting streams (#385)
1 parent f4a3170 commit 2fae5a3

File tree

1 file changed

+21
-18
lines changed

1 file changed

+21
-18
lines changed

airbyte/progress.py

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
from airbyte_protocol.models import (
3737
AirbyteMessage,
3838
AirbyteStreamStatus,
39-
Type,
4039
)
4140

4241
from airbyte import logs
@@ -275,16 +274,17 @@ def tally_records_read(
275274
self.stream_read_counts[message.record.stream] += 1
276275

277276
if message.record.stream not in self.stream_read_start_times:
278-
self._log_stream_read_start(stream_name=message.record.stream)
277+
self.log_stream_start(stream_name=message.record.stream)
279278

280-
elif (
281-
message.trace
282-
and message.trace.stream_status
283-
and message.trace.stream_status.status is AirbyteStreamStatus.COMPLETE
284-
):
285-
self._log_stream_read_end(
286-
stream_name=message.trace.stream_status.stream_descriptor.name
287-
)
279+
elif message.trace and message.trace.stream_status:
280+
if message.trace.stream_status.status is AirbyteStreamStatus.STARTED:
281+
self.log_stream_start(
282+
stream_name=message.trace.stream_status.stream_descriptor.name
283+
)
284+
if message.trace.stream_status.status is AirbyteStreamStatus.COMPLETE:
285+
self._log_stream_read_end(
286+
stream_name=message.trace.stream_status.stream_descriptor.name
287+
)
288288

289289
# Bail if we're not due for a progress update.
290290
if count % update_period != 0:
@@ -345,12 +345,12 @@ def tally_confirmed_writes(
345345
"""
346346
self._start_rich_view() # Start Rich's live view if not already running.
347347
for message in messages:
348-
if message.type is Type.STATE:
348+
if message.state:
349349
# This is a state message from the destination. Tally the records written.
350350
if message.state.stream and message.state.destinationStats:
351351
stream_name = message.state.stream.stream_descriptor.name
352-
self.destination_stream_records_confirmed[stream_name] += (
353-
message.state.destinationStats.recordCount
352+
self.destination_stream_records_confirmed[stream_name] += int(
353+
message.state.destinationStats.recordCount or 0
354354
)
355355
self._update_display()
356356

@@ -418,11 +418,14 @@ def _log_sync_start(self) -> None:
418418
event_type=EventType.SYNC,
419419
)
420420

421-
def _log_stream_read_start(self, stream_name: str) -> None:
422-
self._print_info_message(
423-
f"Read started on stream `{stream_name}` at `{pendulum.now().format('HH:mm:ss')}`..."
424-
)
425-
self.stream_read_start_times[stream_name] = time.time()
421+
def log_stream_start(self, stream_name: str) -> None:
422+
"""Log that a stream has started reading."""
423+
if stream_name not in self.stream_read_start_times:
424+
self._print_info_message(
425+
f"Read started on stream `{stream_name}` at "
426+
f"`{pendulum.now().format('HH:mm:ss')}`..."
427+
)
428+
self.stream_read_start_times[stream_name] = time.time()
426429

427430
def _log_stream_read_end(self, stream_name: str) -> None:
428431
self._print_info_message(

0 commit comments

Comments
 (0)