Skip to content

Commit f624c99

Browse files
fix(smoke-test-source): emit stream status TRACE messages for JDK destinations
Co-Authored-By: AJ Steers <aj@airbyte.io>
1 parent c725831 commit f624c99

File tree

1 file changed

+31
-2
lines changed

1 file changed

+31
-2
lines changed

airbyte/cli/smoke_test_source/source.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,15 @@
2525
AirbyteMessage,
2626
AirbyteRecordMessage,
2727
AirbyteStream,
28+
AirbyteStreamStatus,
29+
AirbyteStreamStatusTraceMessage,
30+
AirbyteTraceMessage,
2831
ConfiguredAirbyteCatalog,
2932
ConnectorSpecification,
3033
Status,
34+
StreamDescriptor,
3135
SyncMode,
36+
TraceType,
3237
Type,
3338
)
3439
from airbyte_cdk.sources.source import Source
@@ -262,8 +267,7 @@ def _validate_custom_scenarios(
262267
return f"Custom scenario at index {i} is missing 'name'."
263268
if not isinstance(scenario.get("json_schema"), dict):
264269
return (
265-
f"Custom scenario '{scenario['name']}' must provide "
266-
"'json_schema' as an object."
270+
f"Custom scenario '{scenario['name']}' must provide 'json_schema' as an object."
267271
)
268272
if "records" in scenario:
269273
if not isinstance(scenario["records"], list):
@@ -320,6 +324,24 @@ def discover(
320324
logger.info(f"Discovered {len(streams)} smoke test streams.")
321325
return AirbyteCatalog(streams=streams)
322326

327+
def _stream_status_message(
328+
self,
329+
stream_name: str,
330+
status: AirbyteStreamStatus,
331+
) -> AirbyteMessage:
332+
"""Build an AirbyteMessage containing a stream status trace."""
333+
return AirbyteMessage(
334+
type=Type.TRACE,
335+
trace=AirbyteTraceMessage(
336+
type=TraceType.STREAM_STATUS,
337+
emitted_at=time.time() * 1000,
338+
stream_status=AirbyteStreamStatusTraceMessage(
339+
stream_descriptor=StreamDescriptor(name=stream_name),
340+
status=status,
341+
),
342+
),
343+
)
344+
323345
def read(
324346
self,
325347
logger: logging.Logger,
@@ -339,6 +361,10 @@ def read(
339361
logger.warning(f"Stream '{stream_name}' not found in scenarios, skipping.")
340362
continue
341363

364+
# Emit STARTED status
365+
yield self._stream_status_message(stream_name, AirbyteStreamStatus.STARTED)
366+
yield self._stream_status_message(stream_name, AirbyteStreamStatus.RUNNING)
367+
342368
records = get_scenario_records(scenario)
343369
logger.info(f"Emitting {len(records)} records for stream '{stream_name}'.")
344370

@@ -351,3 +377,6 @@ def read(
351377
emitted_at=now_ms,
352378
),
353379
)
380+
381+
# Emit COMPLETE status
382+
yield self._stream_status_message(stream_name, AirbyteStreamStatus.COMPLETE)

0 commit comments

Comments
 (0)