diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index b7c0d84a0..e4e56e423 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1427,6 +1427,13 @@ definitions: - "$ref": "#/definitions/LegacyToPerPartitionStateMigration" - "$ref": "#/definitions/CustomStateMigration" default: [] + dynamic_stream_name: + title: Dynamic Stream Name + description: The dynamic stream name that create current stream, if None is static stream. + type: string + default: None + example: + - "Tables" $parameters: type: object additional_properties: true @@ -3766,6 +3773,13 @@ definitions: type: type: string enum: [DynamicDeclarativeStream] + name: + title: Name + description: The dynamic stream name. + type: string + default: "" + example: + - "Tables" stream_template: title: Stream Template description: Reference to the stream template. diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 23d41b174..a69d8c6f4 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -343,6 +343,7 @@ def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config stream_configs: List[Dict[str, Any]] = manifest.get("streams", []) for s in stream_configs: + s["dynamic_stream_name"] = None if "type" not in s: s["type"] = "DeclarativeStream" return stream_configs @@ -354,7 +355,7 @@ def _dynamic_stream_configs( dynamic_stream_configs: List[Dict[str, Any]] = [] seen_dynamic_streams: Set[str] = set() - for dynamic_definition in dynamic_stream_definitions: + for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): components_resolver_config = dynamic_definition["components_resolver"] if not components_resolver_config: @@ -393,6 +394,10 @@ def _dynamic_stream_configs( # Ensure that each stream is created with a unique name name = dynamic_stream.get("name") + dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( + "name", f"dynamic_stream_{dynamic_definition_index}" + ) + if not isinstance(name, str): raise ValueError( f"Expected stream name {name} to be a string, got {type(name)}." diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index c43f550db..36f68999a 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2049,6 +2049,12 @@ class Config: description="Array of state migrations to be applied on the input state", title="State Migrations", ) + dynamic_stream_name: Optional[str] = Field( + "None", + description="The dynamic stream name that create current stream, if None is static stream.", + example=["Tables"], + title="Dynamic Stream Name", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2489,6 +2495,9 @@ class HttpComponentsResolver(BaseModel): class DynamicDeclarativeStream(BaseModel): type: Literal["DynamicDeclarativeStream"] + name: Optional[str] = Field( + "", description="The dynamic stream name.", example=["Tables"], title="Name" + ) stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 09d069bff..6585cd556 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -57,6 +57,7 @@ def to_configured_catalog( "dynamic_streams": [ { "type": "DynamicDeclarativeStream", + "name": "TestDynamicStream", "stream_template": { "type": "DeclarativeStream", "name": "", @@ -487,6 +488,9 @@ def test_dynamic_streams_read_with_http_components_resolver(): ] configured_catalog = to_configured_catalog(configured_streams) + for stream in source._dynamic_stream_configs(source.resolved_manifest, _CONFIG): + assert stream["dynamic_stream_name"] == "TestDynamicStream" + records = [ message.record for message in source.read(MagicMock(), _CONFIG, configured_catalog) @@ -572,6 +576,9 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str ] configured_catalog = to_configured_catalog(configured_streams) + for stream in source._dynamic_stream_configs(source.resolved_manifest, _CONFIG): + assert stream["dynamic_stream_name"] == "dynamic_stream_0" + records = [ message.record for message in source.read(MagicMock(), _CONFIG, configured_catalog) diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 4a043ac82..a7d51e7df 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -709,6 +709,9 @@ def test_group_streams(): ) concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) + for stream in source._stream_configs(source.resolved_manifest): + assert stream["dynamic_stream_name"] is None + # 1 full refresh stream, 3 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental # 1 async job stream, 1 substream w/ incremental assert len(concurrent_streams) == 8