diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 56a59e072..ef666dc51 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -20,7 +20,7 @@ import orjson import yaml -from airbyte_protocol_dataclasses.models import Level +from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor from jsonschema.exceptions import ValidationError from jsonschema.validators import validate @@ -88,6 +88,7 @@ DebugSliceLogger, SliceLogger, ) +from airbyte_cdk.utils.stream_status_utils import as_airbyte_message from airbyte_cdk.utils.traced_exception import AirbyteTracedException @@ -630,9 +631,8 @@ def _dynamic_stream_configs( return dynamic_stream_configs - @staticmethod def _select_streams( - streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog + self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog ) -> List[AbstractStream]: stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} abstract_streams: List[AbstractStream] = [] @@ -640,5 +640,13 @@ def _select_streams( stream_instance = stream_name_to_instance.get(configured_stream.stream.name) if stream_instance: abstract_streams.append(stream_instance) - + else: + # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if + # the source was configured with raise_exception_on_missing_stream=True. This was used on very + # few sources like facebook-marketing and google-ads. We decided not to port this feature over, + # but we can do so if we feel it necessary. With the current behavior,we should still result + # in a partial failure since missing streams will be marked as INCOMPLETE. + self._message_repository.emit_message( + as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE) + ) return abstract_streams diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 1d62ba33f..7661c7fc4 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -19,6 +19,12 @@ import pytest import requests import yaml +from airbyte_protocol_dataclasses.models import ( + AirbyteStreamStatus, + AirbyteStreamStatusTraceMessage, + AirbyteTraceMessage, + TraceType, +) from jsonschema.exceptions import ValidationError from typing_extensions import deprecated @@ -1898,6 +1904,46 @@ def get_mocked_read_records_output(stream_name: str) -> Mapping[tuple[str, str], } +@freezegun.freeze_time("2025-01-01T00:00:00") +def test_catalog_contains_missing_stream_in_source(): + expected_messages = [ + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="missing"), + status=AirbyteStreamStatus.INCOMPLETE, + ), + emitted_at=1735689600000.0, + ), + ), + ] + + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="missing", json_schema={}, supported_sync_modes=[SyncMode.full_refresh] + ), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + ), + ] + ) + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[] + ) + + list(source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[])) + queue = source._concurrent_source._queue + + for expected_message in expected_messages: + queue_message = queue.get() + assert queue_message == expected_message + + def get_records_for_stream( stream_name: str, messages: List[AirbyteMessage] ) -> List[AirbyteRecordMessage]: