Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import orjson
import yaml
from airbyte_protocol_dataclasses.models import Level
from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor
from jsonschema.exceptions import ValidationError
from jsonschema.validators import validate

Expand Down Expand Up @@ -88,6 +88,7 @@
DebugSliceLogger,
SliceLogger,
)
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


Expand Down Expand Up @@ -630,15 +631,30 @@ def _dynamic_stream_configs(

return dynamic_stream_configs

@staticmethod
def _select_streams(
streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
) -> List[AbstractStream]:
stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
abstract_streams: List[AbstractStream] = []
for configured_stream in configured_catalog.streams:
stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
if stream_instance:
abstract_streams.append(stream_instance)
else:
self._message_repository.emit_message(
as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
)

missing_stream_exception = AirbyteTracedException(
message="A stream listed in your configuration was not found in the source. Please check the logs for more "
"details.",
internal_message=(
f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. "
f"Refresh the schema in your replication settings and remove this stream from future sync attempts."
),
failure_type=FailureType.config_error,
stream_descriptor=StreamDescriptor(name=configured_stream.stream.name),
)
self._message_repository.emit_message(missing_stream_exception.as_airbyte_message())

return abstract_streams
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
import pytest
import requests
import yaml
from airbyte_protocol_dataclasses.models import (
AirbyteStreamStatus,
AirbyteStreamStatusTraceMessage,
AirbyteTraceMessage,
TraceType,
)
from jsonschema.exceptions import ValidationError
from typing_extensions import deprecated

Expand Down Expand Up @@ -1898,6 +1904,55 @@ def get_mocked_read_records_output(stream_name: str) -> Mapping[tuple[str, str],
}


@freezegun.freeze_time("2025-01-01T00:00:00")
def test_catalog_contains_missing_stream_in_source():
expected_messages = [
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="missing"),
status=AirbyteStreamStatus.INCOMPLETE,
),
emitted_at=1735689600000.0,
),
),
AirbyteTracedException(
message="A stream listed in your configuration was not found in the source. Please check the logs for more "
"details.",
internal_message=(
"The stream 'missing' in your connection configuration was not found in the source. Refresh the schema in your replication settings and remove this stream from future sync attempts."
),
failure_type=FailureType.config_error,
stream_descriptor=StreamDescriptor(name="missing"),
).as_airbyte_message(stream_descriptor=StreamDescriptor(name="missing")),
]

catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="missing", json_schema={}, supported_sync_modes=[SyncMode.full_refresh]
),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append,
),
]
)

source = ConcurrentDeclarativeSource(
source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[]
)

list(source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[]))
queue = source._concurrent_source._queue

for expected_message in expected_messages:
queue_message = queue.get()
assert queue_message == expected_message


def get_records_for_stream(
stream_name: str, messages: List[AirbyteMessage]
) -> List[AirbyteRecordMessage]:
Expand Down
Loading