Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import orjson
import yaml
from airbyte_protocol_dataclasses.models import Level
from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor
from jsonschema.exceptions import ValidationError
from jsonschema.validators import validate

Expand Down Expand Up @@ -88,6 +88,7 @@
DebugSliceLogger,
SliceLogger,
)
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


Expand Down Expand Up @@ -630,15 +631,22 @@ def _dynamic_stream_configs(

return dynamic_stream_configs

@staticmethod
def _select_streams(
streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
) -> List[AbstractStream]:
stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
abstract_streams: List[AbstractStream] = []
for configured_stream in configured_catalog.streams:
stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
if stream_instance:
abstract_streams.append(stream_instance)

else:
# Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if
# the source was configured with raise_exception_on_missing_stream=True. This was used on very
# few sources like facebook-marketing and google-ads. We decided not to port this feature over,
# but we can do so if we feel it necessary. With the current behavior,we should still result
# in a partial failure since missing streams will be marked as INCOMPLETE.
self._message_repository.emit_message(
as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
)
return abstract_streams
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
import pytest
import requests
import yaml
from airbyte_protocol_dataclasses.models import (
AirbyteStreamStatus,
AirbyteStreamStatusTraceMessage,
AirbyteTraceMessage,
TraceType,
)
from jsonschema.exceptions import ValidationError
from typing_extensions import deprecated

Expand Down Expand Up @@ -1898,6 +1904,46 @@ def get_mocked_read_records_output(stream_name: str) -> Mapping[tuple[str, str],
}


@freezegun.freeze_time("2025-01-01T00:00:00")
def test_catalog_contains_missing_stream_in_source():
expected_messages = [
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name="missing"),
status=AirbyteStreamStatus.INCOMPLETE,
),
emitted_at=1735689600000.0,
),
),
]

catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="missing", json_schema={}, supported_sync_modes=[SyncMode.full_refresh]
),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append,
),
]
)

source = ConcurrentDeclarativeSource(
source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[]
)

list(source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[]))
queue = source._concurrent_source._queue

for expected_message in expected_messages:
queue_message = queue.get()
assert queue_message == expected_message


def get_records_for_stream(
stream_name: str, messages: List[AirbyteMessage]
) -> List[AirbyteRecordMessage]:
Expand Down
Loading