Skip to content

Commit a0ece6b

Browse files
committed
emit incomplete status for missing streams
1 parent ae0e8aa commit a0ece6b

File tree

2 files changed

+74
-3
lines changed

2 files changed

+74
-3
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import orjson
2222
import yaml
23-
from airbyte_protocol_dataclasses.models import Level
23+
from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor
2424
from jsonschema.exceptions import ValidationError
2525
from jsonschema.validators import validate
2626

@@ -88,6 +88,7 @@
8888
DebugSliceLogger,
8989
SliceLogger,
9090
)
91+
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message
9192
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
9293

9394

@@ -630,15 +631,30 @@ def _dynamic_stream_configs(
630631

631632
return dynamic_stream_configs
632633

633-
@staticmethod
634634
def _select_streams(
635-
streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
635+
self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
636636
) -> List[AbstractStream]:
637637
stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
638638
abstract_streams: List[AbstractStream] = []
639639
for configured_stream in configured_catalog.streams:
640640
stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
641641
if stream_instance:
642642
abstract_streams.append(stream_instance)
643+
else:
644+
self._message_repository.emit_message(
645+
as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
646+
)
647+
648+
missing_stream_exception = AirbyteTracedException(
649+
message="A stream listed in your configuration was not found in the source. Please check the logs for more "
650+
"details.",
651+
internal_message=(
652+
f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. "
653+
f"Refresh the schema in your replication settings and remove this stream from future sync attempts."
654+
),
655+
failure_type=FailureType.config_error,
656+
stream_descriptor=StreamDescriptor(name=configured_stream.stream.name),
657+
)
658+
self._message_repository.emit_message(missing_stream_exception.as_airbyte_message())
643659

644660
return abstract_streams

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919
import pytest
2020
import requests
2121
import yaml
22+
from airbyte_protocol_dataclasses.models import (
23+
AirbyteStreamStatus,
24+
AirbyteStreamStatusTraceMessage,
25+
AirbyteTraceMessage,
26+
TraceType,
27+
)
2228
from jsonschema.exceptions import ValidationError
2329
from typing_extensions import deprecated
2430

@@ -1898,6 +1904,55 @@ def get_mocked_read_records_output(stream_name: str) -> Mapping[tuple[str, str],
18981904
}
18991905

19001906

1907+
@freezegun.freeze_time("2025-01-01T00:00:00")
1908+
def test_catalog_contains_missing_stream_in_source():
1909+
expected_messages = [
1910+
AirbyteMessage(
1911+
type=Type.TRACE,
1912+
trace=AirbyteTraceMessage(
1913+
type=TraceType.STREAM_STATUS,
1914+
stream_status=AirbyteStreamStatusTraceMessage(
1915+
stream_descriptor=StreamDescriptor(name="missing"),
1916+
status=AirbyteStreamStatus.INCOMPLETE,
1917+
),
1918+
emitted_at=1735689600000.0,
1919+
),
1920+
),
1921+
AirbyteTracedException(
1922+
message="A stream listed in your configuration was not found in the source. Please check the logs for more "
1923+
"details.",
1924+
internal_message=(
1925+
"The stream 'missing' in your connection configuration was not found in the source. Refresh the schema in your replication settings and remove this stream from future sync attempts."
1926+
),
1927+
failure_type=FailureType.config_error,
1928+
stream_descriptor=StreamDescriptor(name="missing"),
1929+
).as_airbyte_message(stream_descriptor=StreamDescriptor(name="missing")),
1930+
]
1931+
1932+
catalog = ConfiguredAirbyteCatalog(
1933+
streams=[
1934+
ConfiguredAirbyteStream(
1935+
stream=AirbyteStream(
1936+
name="missing", json_schema={}, supported_sync_modes=[SyncMode.full_refresh]
1937+
),
1938+
sync_mode=SyncMode.full_refresh,
1939+
destination_sync_mode=DestinationSyncMode.append,
1940+
),
1941+
]
1942+
)
1943+
1944+
source = ConcurrentDeclarativeSource(
1945+
source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[]
1946+
)
1947+
1948+
list(source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[]))
1949+
queue = source._concurrent_source._queue
1950+
1951+
for expected_message in expected_messages:
1952+
queue_message = queue.get()
1953+
assert queue_message == expected_message
1954+
1955+
19011956
def get_records_for_stream(
19021957
stream_name: str, messages: List[AirbyteMessage]
19031958
) -> List[AirbyteRecordMessage]:

0 commit comments

Comments
 (0)