From a0ece6b89dfebd36220e156e7a368c05ff03f1d0 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Mon, 8 Sep 2025 18:45:59 -0700 Subject: [PATCH 1/2] emit incomplete status for missing streams --- .../concurrent_declarative_source.py | 22 +++++++- .../test_concurrent_declarative_source.py | 55 +++++++++++++++++++ 2 files changed, 74 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 56a59e072..2c975ea59 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -20,7 +20,7 @@ import orjson import yaml -from airbyte_protocol_dataclasses.models import Level +from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor from jsonschema.exceptions import ValidationError from jsonschema.validators import validate @@ -88,6 +88,7 @@ DebugSliceLogger, SliceLogger, ) +from airbyte_cdk.utils.stream_status_utils import as_airbyte_message from airbyte_cdk.utils.traced_exception import AirbyteTracedException @@ -630,9 +631,8 @@ def _dynamic_stream_configs( return dynamic_stream_configs - @staticmethod def _select_streams( - streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog + self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog ) -> List[AbstractStream]: stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} abstract_streams: List[AbstractStream] = [] @@ -640,5 +640,21 @@ def _select_streams( stream_instance = stream_name_to_instance.get(configured_stream.stream.name) if stream_instance: abstract_streams.append(stream_instance) + else: + self._message_repository.emit_message( + as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE) + ) + + missing_stream_exception = AirbyteTracedException( + message="A stream listed in your configuration was not found in the source. Please check the logs for more " + "details.", + internal_message=( + f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. " + f"Refresh the schema in your replication settings and remove this stream from future sync attempts." + ), + failure_type=FailureType.config_error, + stream_descriptor=StreamDescriptor(name=configured_stream.stream.name), + ) + self._message_repository.emit_message(missing_stream_exception.as_airbyte_message()) return abstract_streams diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 1d62ba33f..90da66e43 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -19,6 +19,12 @@ import pytest import requests import yaml +from airbyte_protocol_dataclasses.models import ( + AirbyteStreamStatus, + AirbyteStreamStatusTraceMessage, + AirbyteTraceMessage, + TraceType, +) from jsonschema.exceptions import ValidationError from typing_extensions import deprecated @@ -1898,6 +1904,55 @@ def get_mocked_read_records_output(stream_name: str) -> Mapping[tuple[str, str], } +@freezegun.freeze_time("2025-01-01T00:00:00") +def test_catalog_contains_missing_stream_in_source(): + expected_messages = [ + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name="missing"), + status=AirbyteStreamStatus.INCOMPLETE, + ), + emitted_at=1735689600000.0, + ), + ), + AirbyteTracedException( + message="A stream listed in your configuration was not found in the source. Please check the logs for more " + "details.", + internal_message=( + "The stream 'missing' in your connection configuration was not found in the source. Refresh the schema in your replication settings and remove this stream from future sync attempts." + ), + failure_type=FailureType.config_error, + stream_descriptor=StreamDescriptor(name="missing"), + ).as_airbyte_message(stream_descriptor=StreamDescriptor(name="missing")), + ] + + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="missing", json_schema={}, supported_sync_modes=[SyncMode.full_refresh] + ), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + ), + ] + ) + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[] + ) + + list(source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[])) + queue = source._concurrent_source._queue + + for expected_message in expected_messages: + queue_message = queue.get() + assert queue_message == expected_message + + def get_records_for_stream( stream_name: str, messages: List[AirbyteMessage] ) -> List[AirbyteRecordMessage]: From fe550759a2607bbf24445f86fb8d8b9455aa80de Mon Sep 17 00:00:00 2001 From: brianjlai Date: Fri, 12 Sep 2025 16:36:12 -0700 Subject: [PATCH 2/2] don't emit error trace and added comment --- .../concurrent_declarative_source.py | 18 +++++------------- .../test_concurrent_declarative_source.py | 9 --------- 2 files changed, 5 insertions(+), 22 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 2c975ea59..ef666dc51 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -641,20 +641,12 @@ def _select_streams( if stream_instance: abstract_streams.append(stream_instance) else: + # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if + # the source was configured with raise_exception_on_missing_stream=True. This was used on very + # few sources like facebook-marketing and google-ads. We decided not to port this feature over, + # but we can do so if we feel it necessary. With the current behavior,we should still result + # in a partial failure since missing streams will be marked as INCOMPLETE. self._message_repository.emit_message( as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE) ) - - missing_stream_exception = AirbyteTracedException( - message="A stream listed in your configuration was not found in the source. Please check the logs for more " - "details.", - internal_message=( - f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. " - f"Refresh the schema in your replication settings and remove this stream from future sync attempts." - ), - failure_type=FailureType.config_error, - stream_descriptor=StreamDescriptor(name=configured_stream.stream.name), - ) - self._message_repository.emit_message(missing_stream_exception.as_airbyte_message()) - return abstract_streams diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 90da66e43..7661c7fc4 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -1918,15 +1918,6 @@ def test_catalog_contains_missing_stream_in_source(): emitted_at=1735689600000.0, ), ), - AirbyteTracedException( - message="A stream listed in your configuration was not found in the source. Please check the logs for more " - "details.", - internal_message=( - "The stream 'missing' in your connection configuration was not found in the source. Refresh the schema in your replication settings and remove this stream from future sync attempts." - ), - failure_type=FailureType.config_error, - stream_descriptor=StreamDescriptor(name="missing"), - ).as_airbyte_message(stream_descriptor=StreamDescriptor(name="missing")), ] catalog = ConfiguredAirbyteCatalog(