Skip to content

Commit 3210e7a

Browse files
committed
Add type and parameters resolving for dynamic streams
1 parent b79177b commit 3210e7a

File tree

3 files changed

+27
-11
lines changed

3 files changed

+27
-11
lines changed

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@ def __init__(
8787
self.components_module: ModuleType | None = get_registered_components_module(config=config)
8888

8989
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest)
90-
propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
91-
"", resolved_source_config, {}
92-
)
90+
propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters("",
91+
resolved_source_config,
92+
{})
9393
self._source_config = propagated_source_config
9494
self._debug = debug
9595
self._emit_connector_builder_messages = emit_connector_builder_messages
@@ -387,6 +387,8 @@ def _dynamic_stream_configs(
387387
for dynamic_stream in components_resolver.resolve_components(
388388
stream_template_config=stream_template_config
389389
):
390+
dynamic_stream = {**ManifestComponentTransformer().propagate_types_and_parameters("", dynamic_stream, {}, use_parent_parameters=True)}
391+
390392
if "type" not in dynamic_stream:
391393
dynamic_stream["type"] = "DeclarativeStream"
392394

airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import copy
66
import typing
7-
from typing import Any, Mapping
7+
from typing import Any, Mapping, Optional
88

99
PARAMETERS_STR = "$parameters"
1010

@@ -94,6 +94,7 @@ def propagate_types_and_parameters(
9494
parent_field_identifier: str,
9595
declarative_component: Mapping[str, Any],
9696
parent_parameters: Mapping[str, Any],
97+
use_parent_parameters: Optional[bool] = None,
9798
) -> Mapping[str, Any]:
9899
"""
99100
Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
@@ -103,6 +104,7 @@ def propagate_types_and_parameters(
103104
:param declarative_component: The current component that is having type and parameters added
104105
:param parent_field_identifier: The name of the field of the current component coming from the parent component
105106
:param parent_parameters: The parameters set on parent components defined before the current component
107+
:param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
106108
:return: A deep copy of the transformed component with types and parameters persisted to it
107109
"""
108110
propagated_component = dict(copy.deepcopy(declarative_component))
@@ -130,7 +132,7 @@ def propagate_types_and_parameters(
130132
# level take precedence
131133
current_parameters = dict(copy.deepcopy(parent_parameters))
132134
component_parameters = propagated_component.pop(PARAMETERS_STR, {})
133-
current_parameters = {**current_parameters, **component_parameters}
135+
current_parameters = {**component_parameters, **current_parameters} if use_parent_parameters else {**current_parameters, **component_parameters}
134136

135137
# Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
136138
# both exist
@@ -145,7 +147,7 @@ def propagate_types_and_parameters(
145147
excluded_parameter = current_parameters.pop(field_name, None)
146148
parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
147149
propagated_component[field_name] = self.propagate_types_and_parameters(
148-
parent_type_field_identifier, field_value, current_parameters
150+
parent_type_field_identifier, field_value, current_parameters, use_parent_parameters=use_parent_parameters
149151
)
150152
if excluded_parameter:
151153
current_parameters[field_name] = excluded_parameter
@@ -158,7 +160,7 @@ def propagate_types_and_parameters(
158160
f"{propagated_component.get('type')}.{field_name}"
159161
)
160162
field_value[i] = self.propagate_types_and_parameters(
161-
parent_type_field_identifier, element, current_parameters
163+
parent_type_field_identifier, element, current_parameters, use_parent_parameters=use_parent_parameters
162164
)
163165
if excluded_parameter:
164166
current_parameters[field_name] = excluded_parameter

unit_tests/sources/declarative/resolvers/test_http_components_resolver.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ def to_configured_catalog(
5959
"type": "DynamicDeclarativeStream",
6060
"stream_template": {
6161
"type": "DeclarativeStream",
62-
"name": "",
62+
"$parameters": {
63+
"name": "",
64+
},
6365
"primary_key": [],
6466
"schema_loader": {
6567
"type": "InlineSchemaLoader",
@@ -145,7 +147,9 @@ def to_configured_catalog(
145147
"type": "DynamicDeclarativeStream",
146148
"stream_template": {
147149
"type": "DeclarativeStream",
148-
"name": "",
150+
"$parameters": {
151+
"name": "",
152+
},
149153
"primary_key": [],
150154
"schema_loader": {
151155
"type": "InlineSchemaLoader",
@@ -231,7 +235,9 @@ def to_configured_catalog(
231235
"type": "DynamicDeclarativeStream",
232236
"stream_template": {
233237
"type": "DeclarativeStream",
234-
"name": "",
238+
"$parameters": {
239+
"name": "",
240+
},
235241
"primary_key": [],
236242
"schema_loader": {
237243
"type": "InlineSchemaLoader",
@@ -331,7 +337,7 @@ def to_configured_catalog(
331337
"components_mapping": [
332338
{
333339
"type": "ComponentMappingDefinition",
334-
"field_path": ["name"],
340+
"field_path": ["$parameters", "name"],
335341
"value": "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}",
336342
},
337343
{
@@ -563,6 +569,10 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str
563569
catalog=None,
564570
state=None,
565571
)
572+
dynamic_streams = source._dynamic_stream_configs(source.resolved_manifest, _CONFIG)
573+
574+
assert len(dynamic_streams) == 4
575+
assert dynamic_streams[0]["retriever"]["name"] == "parent_1_item_1"
566576

567577
actual_catalog = source.discover(logger=source.logger, config=_CONFIG)
568578

@@ -586,3 +596,5 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str
586596
actual_record_stream_names.sort()
587597

588598
assert actual_record_stream_names == expected_stream_names
599+
600+

0 commit comments

Comments
 (0)