Skip to content

Commit 58be2b9

Browse files
lazebnyioctavia-squidington-iii
andauthored
fix(low-code cdk): add type and parameters resolving for dynamic streams (#439)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 81fd920 commit 58be2b9

File tree

3 files changed

+36
-8
lines changed

3 files changed

+36
-8
lines changed

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,12 @@ def _dynamic_stream_configs(
397397
for dynamic_stream in components_resolver.resolve_components(
398398
stream_template_config=stream_template_config
399399
):
400+
dynamic_stream = {
401+
**ManifestComponentTransformer().propagate_types_and_parameters(
402+
"", dynamic_stream, {}, use_parent_parameters=True
403+
)
404+
}
405+
400406
if "type" not in dynamic_stream:
401407
dynamic_stream["type"] = "DeclarativeStream"
402408

airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import copy
66
import typing
7-
from typing import Any, Mapping
7+
from typing import Any, Mapping, Optional
88

99
PARAMETERS_STR = "$parameters"
1010

@@ -94,6 +94,7 @@ def propagate_types_and_parameters(
9494
parent_field_identifier: str,
9595
declarative_component: Mapping[str, Any],
9696
parent_parameters: Mapping[str, Any],
97+
use_parent_parameters: Optional[bool] = None,
9798
) -> Mapping[str, Any]:
9899
"""
99100
Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
@@ -103,6 +104,7 @@ def propagate_types_and_parameters(
103104
:param declarative_component: The current component that is having type and parameters added
104105
:param parent_field_identifier: The name of the field of the current component coming from the parent component
105106
:param parent_parameters: The parameters set on parent components defined before the current component
107+
:param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
106108
:return: A deep copy of the transformed component with types and parameters persisted to it
107109
"""
108110
propagated_component = dict(copy.deepcopy(declarative_component))
@@ -130,7 +132,11 @@ def propagate_types_and_parameters(
130132
# level take precedence
131133
current_parameters = dict(copy.deepcopy(parent_parameters))
132134
component_parameters = propagated_component.pop(PARAMETERS_STR, {})
133-
current_parameters = {**current_parameters, **component_parameters}
135+
current_parameters = (
136+
{**component_parameters, **current_parameters}
137+
if use_parent_parameters
138+
else {**current_parameters, **component_parameters}
139+
)
134140

135141
# Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
136142
# both exist
@@ -145,7 +151,10 @@ def propagate_types_and_parameters(
145151
excluded_parameter = current_parameters.pop(field_name, None)
146152
parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
147153
propagated_component[field_name] = self.propagate_types_and_parameters(
148-
parent_type_field_identifier, field_value, current_parameters
154+
parent_type_field_identifier,
155+
field_value,
156+
current_parameters,
157+
use_parent_parameters=use_parent_parameters,
149158
)
150159
if excluded_parameter:
151160
current_parameters[field_name] = excluded_parameter
@@ -158,7 +167,10 @@ def propagate_types_and_parameters(
158167
f"{propagated_component.get('type')}.{field_name}"
159168
)
160169
field_value[i] = self.propagate_types_and_parameters(
161-
parent_type_field_identifier, element, current_parameters
170+
parent_type_field_identifier,
171+
element,
172+
current_parameters,
173+
use_parent_parameters=use_parent_parameters,
162174
)
163175
if excluded_parameter:
164176
current_parameters[field_name] = excluded_parameter

unit_tests/sources/declarative/resolvers/test_http_components_resolver.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ def to_configured_catalog(
6060
"name": "TestDynamicStream",
6161
"stream_template": {
6262
"type": "DeclarativeStream",
63-
"name": "",
63+
"$parameters": {
64+
"name": "",
65+
},
6466
"primary_key": [],
6567
"schema_loader": {
6668
"type": "InlineSchemaLoader",
@@ -146,7 +148,9 @@ def to_configured_catalog(
146148
"type": "DynamicDeclarativeStream",
147149
"stream_template": {
148150
"type": "DeclarativeStream",
149-
"name": "",
151+
"$parameters": {
152+
"name": "",
153+
},
150154
"primary_key": [],
151155
"schema_loader": {
152156
"type": "InlineSchemaLoader",
@@ -232,7 +236,9 @@ def to_configured_catalog(
232236
"type": "DynamicDeclarativeStream",
233237
"stream_template": {
234238
"type": "DeclarativeStream",
235-
"name": "",
239+
"$parameters": {
240+
"name": "",
241+
},
236242
"primary_key": [],
237243
"schema_loader": {
238244
"type": "InlineSchemaLoader",
@@ -344,7 +350,7 @@ def to_configured_catalog(
344350
"components_mapping": [
345351
{
346352
"type": "ComponentMappingDefinition",
347-
"field_path": ["name"],
353+
"field_path": ["$parameters", "name"],
348354
"value": "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}",
349355
},
350356
{
@@ -592,6 +598,10 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str
592598
catalog=None,
593599
state=None,
594600
)
601+
dynamic_streams = source._dynamic_stream_configs(source.resolved_manifest, _CONFIG)
602+
603+
assert len(dynamic_streams) == 4
604+
assert dynamic_streams[0]["retriever"]["name"] == "parent_1_item_1"
595605

596606
actual_catalog = source.discover(logger=source.logger, config=_CONFIG)
597607

0 commit comments

Comments
 (0)