Skip to content

Commit b0842e5

Browse files
committed
add condition to component mapping definition
1 parent 410571c commit b0842e5

File tree

8 files changed

+229
-2
lines changed

8 files changed

+229
-2
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4173,6 +4173,15 @@ definitions:
41734173
description: Determines whether to create a new path if it doesn't exist (true) or only update existing paths (false). When set to true, the resolver will create new paths in the stream template if they don't exist. When false (default), it will only update existing paths.
41744174
type: boolean
41754175
default: false
4176+
condition:
4177+
title: Condition
4178+
description: A condition that must be met for the mapping to be applied.
4179+
type: string
4180+
interpolation_context:
4181+
- config
4182+
- stream_template_config
4183+
- components_values
4184+
- stream_slice
41764185
$parameters:
41774186
type: object
41784187
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -1492,6 +1494,11 @@ class ComponentMappingDefinition(BaseModel):
14921494
description="Determines whether to create a new path if it doesn't exist (true) or only update existing paths (false). When set to true, the resolver will create new paths in the stream template if they don't exist. When false (default), it will only update existing paths.",
14931495
title="Create or Update",
14941496
)
1497+
condition: Optional[str] = Field(
1498+
None,
1499+
description="A condition that must be met for the mapping to be applied.",
1500+
title="Condition",
1501+
)
14951502
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
14961503

14971504

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3800,6 +3800,7 @@ def create_components_mapping_definition(
38003800
value=interpolated_value,
38013801
value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type),
38023802
create_or_update=model.create_or_update,
3803+
condition=model.condition,
38033804
parameters=model.parameters or {},
38043805
)
38053806

@@ -3851,7 +3852,9 @@ def create_stream_config(
38513852
)
38523853

38533854
def create_config_components_resolver(
3854-
self, model: ConfigComponentsResolverModel, config: Config
3855+
self,
3856+
model: ConfigComponentsResolverModel,
3857+
config: Config,
38553858
) -> Any:
38563859
model_stream_configs = (
38573860
model.stream_config if isinstance(model.stream_config, list) else [model.stream_config]
@@ -3871,6 +3874,7 @@ def create_config_components_resolver(
38713874
components_mapping_definition_model.value_type
38723875
),
38733876
config=config,
3877+
parameters=model.parameters,
38743878
)
38753879
for components_mapping_definition_model in model.components_mapping
38763880
]

airbyte_cdk/sources/declarative/resolvers/components_resolver.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from typing_extensions import deprecated
1010

1111
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
12+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1213
from airbyte_cdk.sources.source import ExperimentalClassWarning
1314

1415

@@ -22,6 +23,7 @@ class ComponentMappingDefinition:
2223
value: Union["InterpolatedString", str]
2324
value_type: Optional[Type[Any]]
2425
parameters: InitVar[Mapping[str, Any]]
26+
condition: Optional[str] = None
2527
create_or_update: Optional[bool] = False
2628

2729

@@ -35,6 +37,7 @@ class ResolvedComponentMappingDefinition:
3537
value: "InterpolatedString"
3638
value_type: Optional[Type[Any]]
3739
parameters: InitVar[Mapping[str, Any]]
40+
condition: Optional[InterpolatedBoolean] = None
3841
create_or_update: Optional[bool] = False
3942

4043

airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from yaml.scanner import ScannerError
1515

1616
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
17+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1718
from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
1819
ComponentMappingDefinition,
1920
ComponentsResolver,
@@ -70,6 +71,12 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
7071
"""
7172

7273
for component_mapping in self.components_mapping:
74+
interpolated_condition = (
75+
InterpolatedBoolean(condition=component_mapping.condition, parameters=parameters)
76+
if component_mapping.condition
77+
else None
78+
)
79+
7380
if isinstance(component_mapping.value, (str, InterpolatedString)):
7481
interpolated_value = (
7582
InterpolatedString.create(component_mapping.value, parameters=parameters)
@@ -89,6 +96,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
8996
value_type=component_mapping.value_type,
9097
create_or_update=component_mapping.create_or_update,
9198
parameters=parameters,
99+
condition=interpolated_condition,
92100
)
93101
)
94102
else:
@@ -155,6 +163,12 @@ def resolve_components(
155163
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
156164

157165
for resolved_component in self._resolved_components:
166+
if (
167+
resolved_component.condition is not None
168+
and not resolved_component.condition.eval(self.config, **kwargs)
169+
):
170+
continue
171+
158172
valid_types = (
159173
(resolved_component.value_type,) if resolved_component.value_type else None
160174
)

airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing_extensions import deprecated
1111

1212
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
13+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1314
from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
1415
ComponentMappingDefinition,
1516
ComponentsResolver,
@@ -49,6 +50,12 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4950
parameters (Mapping[str, Any]): Parameters for interpolation.
5051
"""
5152
for component_mapping in self.components_mapping:
53+
interpolated_condition = (
54+
InterpolatedBoolean(condition=component_mapping.condition, parameters=parameters)
55+
if component_mapping.condition
56+
else None
57+
)
58+
5259
if isinstance(component_mapping.value, (str, InterpolatedString)):
5360
interpolated_value = (
5461
InterpolatedString.create(component_mapping.value, parameters=parameters)
@@ -67,6 +74,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
6774
value=interpolated_value,
6875
value_type=component_mapping.value_type,
6976
parameters=parameters,
77+
condition=interpolated_condition,
7078
)
7179
)
7280
else:
@@ -97,6 +105,12 @@ def resolve_components(
97105
kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any]
98106

99107
for resolved_component in self._resolved_components:
108+
if (
109+
resolved_component.condition is not None
110+
and not resolved_component.condition.eval(self.config, **kwargs)
111+
):
112+
continue
113+
100114
valid_types = (
101115
(resolved_component.value_type,) if resolved_component.value_type else None
102116
)

airbyte_cdk/sources/declarative/resolvers/parametrized_components_resolver.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from yaml.parser import ParserError
1313

1414
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
15+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1516
from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
1617
ComponentMappingDefinition,
1718
ComponentsResolver,
@@ -55,6 +56,12 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
5556
"""
5657

5758
for component_mapping in self.components_mapping:
59+
interpolated_condition = (
60+
InterpolatedBoolean(condition=component_mapping.condition, parameters=parameters)
61+
if component_mapping.condition
62+
else None
63+
)
64+
5865
if isinstance(component_mapping.value, (str, InterpolatedString)):
5966
interpolated_value = (
6067
InterpolatedString.create(component_mapping.value, parameters=parameters)
@@ -74,6 +81,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
7481
value_type=component_mapping.value_type,
7582
create_or_update=component_mapping.create_or_update,
7683
parameters=parameters,
84+
condition=interpolated_condition,
7785
)
7886
)
7987
else:
@@ -90,6 +98,12 @@ def resolve_components(
9098
updated_config = deepcopy(stream_template_config)
9199
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
92100
for resolved_component in self._resolved_components:
101+
if (
102+
resolved_component.condition is not None
103+
and not resolved_component.condition.eval(self.config, **kwargs)
104+
):
105+
continue
106+
93107
valid_types = (
94108
(resolved_component.value_type,) if resolved_component.value_type else None
95109
)

unit_tests/sources/declarative/resolvers/test_config_components_resolver.py

Lines changed: 163 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def to_configured_catalog(
145145
STREAM_CONFIG
146146
]
147147

148-
# Manifest with component definition with value that is fails when trying
148+
# Manifest with component definition with value that fails when trying
149149
# to parse yaml in _parse_yaml_if_possible but generally contains valid string
150150
_MANIFEST_WITH_SCANNER_ERROR = deepcopy(_MANIFEST)
151151
_MANIFEST_WITH_SCANNER_ERROR["dynamic_streams"][0]["components_resolver"][
@@ -173,6 +173,7 @@ def to_configured_catalog(
173173
(_MANIFEST_WITH_STREAM_CONFIGS_LIST, _CONFIG, None, ["item_1", "item_2", "default_item"]),
174174
(_MANIFEST_WITH_SCANNER_ERROR, _CONFIG, None, ["item_1", "item_2", "default_item"]),
175175
],
176+
ids=["no_duplicates", "duplicates", "stream_configs_list", "scanner_error"],
176177
)
177178
def test_dynamic_streams_read_with_config_components_resolver(
178179
manifest, config, expected_exception, expected_stream_names
@@ -223,3 +224,164 @@ def test_dynamic_streams_read_with_config_components_resolver(
223224
assert len(records) == len(expected_stream_names)
224225
# Use set comparison to avoid relying on deterministic ordering
225226
assert set(record.stream for record in records) == set(expected_stream_names)
227+
228+
229+
# Manifest with condition that always evaluates to true
230+
_MANIFEST_WITH_TRUE_CONDITION = deepcopy(_MANIFEST)
231+
_MANIFEST_WITH_TRUE_CONDITION["dynamic_streams"][0]["components_resolver"][
232+
"components_mapping"
233+
].append(
234+
{
235+
"type": "ComponentMappingDefinition",
236+
"field_path": ["retriever", "requester", "$parameters", "always_included"],
237+
"value": "true_condition_value",
238+
"create_or_update": True,
239+
"condition": "{{ True }}",
240+
}
241+
)
242+
243+
# Manifest with condition that always evaluates to false
244+
_MANIFEST_WITH_FALSE_CONDITION = deepcopy(_MANIFEST)
245+
_MANIFEST_WITH_FALSE_CONDITION["dynamic_streams"][0]["components_resolver"][
246+
"components_mapping"
247+
].append(
248+
{
249+
"type": "ComponentMappingDefinition",
250+
"field_path": ["retriever", "requester", "$parameters", "never_included"],
251+
"value": "false_condition_value",
252+
"create_or_update": True,
253+
"condition": "{{ False }}",
254+
}
255+
)
256+
257+
# Manifest with condition using components_values that evaluates to true for some items
258+
_MANIFEST_WITH_COMPONENTS_VALUES_TRUE_CONDITION = deepcopy(_MANIFEST)
259+
_MANIFEST_WITH_COMPONENTS_VALUES_TRUE_CONDITION["dynamic_streams"][0]["components_resolver"][
260+
"components_mapping"
261+
].append(
262+
{
263+
"type": "ComponentMappingDefinition",
264+
"field_path": ["retriever", "requester", "$parameters", "conditional_param"],
265+
"value": "item_1_special_value",
266+
"create_or_update": True,
267+
"condition": "{{ components_values['name'] == 'item_1' }}",
268+
}
269+
)
270+
271+
# Manifest with condition using components_values that evaluates to false for all items
272+
_MANIFEST_WITH_COMPONENTS_VALUES_FALSE_CONDITION = deepcopy(_MANIFEST)
273+
_MANIFEST_WITH_COMPONENTS_VALUES_FALSE_CONDITION["dynamic_streams"][0]["components_resolver"][
274+
"components_mapping"
275+
].append(
276+
{
277+
"type": "ComponentMappingDefinition",
278+
"field_path": ["retriever", "requester", "$parameters", "never_matching"],
279+
"value": "never_applied_value",
280+
"create_or_update": True,
281+
"condition": "{{ components_values['name'] == 'non_existent_item' }}",
282+
}
283+
)
284+
285+
# Manifest with multiple conditions - some true, some false
286+
_MANIFEST_WITH_MIXED_CONDITIONS = deepcopy(_MANIFEST)
287+
_MANIFEST_WITH_MIXED_CONDITIONS["dynamic_streams"][0]["components_resolver"][
288+
"components_mapping"
289+
].extend(
290+
[
291+
{
292+
"type": "ComponentMappingDefinition",
293+
"field_path": ["retriever", "requester", "$parameters", "always_true"],
294+
"value": "always_applied",
295+
"create_or_update": True,
296+
"condition": "{{ True }}",
297+
},
298+
{
299+
"type": "ComponentMappingDefinition",
300+
"field_path": ["retriever", "requester", "$parameters", "always_false"],
301+
"value": "never_applied",
302+
"create_or_update": True,
303+
"condition": "{{ False }}",
304+
},
305+
{
306+
"type": "ComponentMappingDefinition",
307+
"field_path": ["retriever", "requester", "$parameters", "item_specific"],
308+
"value": "applied_to_item_2",
309+
"create_or_update": True,
310+
"condition": "{{ components_values['id'] == 2 }}",
311+
},
312+
]
313+
)
314+
315+
316+
@pytest.mark.parametrize(
317+
"manifest, config, expected_conditional_params",
318+
[
319+
(
320+
_MANIFEST_WITH_TRUE_CONDITION,
321+
_CONFIG,
322+
{
323+
"item_1": {"always_included": "true_condition_value", "item_id": 1},
324+
"item_2": {"always_included": "true_condition_value", "item_id": 2},
325+
"default_item": {"always_included": "true_condition_value", "item_id": 4},
326+
},
327+
),
328+
(
329+
_MANIFEST_WITH_FALSE_CONDITION,
330+
_CONFIG,
331+
{
332+
"item_1": {"item_id": 1}, # never_included should not be present
333+
"item_2": {"item_id": 2},
334+
"default_item": {"item_id": 4},
335+
},
336+
),
337+
(
338+
_MANIFEST_WITH_COMPONENTS_VALUES_TRUE_CONDITION,
339+
_CONFIG,
340+
{
341+
"item_1": {"conditional_param": "item_1_special_value", "item_id": 1},
342+
"item_2": {"item_id": 2}, # condition false for item_2
343+
"default_item": {"item_id": 4}, # condition false for default_item
344+
},
345+
),
346+
(
347+
_MANIFEST_WITH_COMPONENTS_VALUES_FALSE_CONDITION,
348+
_CONFIG,
349+
{
350+
"item_1": {"item_id": 1}, # never_matching should not be present
351+
"item_2": {"item_id": 2},
352+
"default_item": {"item_id": 4},
353+
},
354+
),
355+
(
356+
_MANIFEST_WITH_MIXED_CONDITIONS,
357+
_CONFIG,
358+
{
359+
"item_1": {"always_true": "always_applied", "item_id": 1},
360+
"item_2": {
361+
"always_true": "always_applied",
362+
"item_specific": "applied_to_item_2",
363+
"item_id": 2,
364+
},
365+
"default_item": {"always_true": "always_applied", "item_id": 4},
366+
},
367+
),
368+
],
369+
ids=[
370+
"true_condition",
371+
"false_condition",
372+
"components_values_true_condition",
373+
"components_values_false_condition",
374+
"mixed_conditions",
375+
],
376+
)
377+
def test_component_mapping_conditions(manifest, config, expected_conditional_params):
378+
"""Test that ComponentMappingDefinition conditions work correctly for various scenarios."""
379+
source = ConcurrentDeclarativeSource(
380+
source_config=manifest, config=config, catalog=None, state=None
381+
)
382+
383+
for stream in source.streams(config):
384+
if stream.name in expected_conditional_params:
385+
assert (
386+
stream.retriever.requester._parameters == expected_conditional_params[stream.name]
387+
)

0 commit comments

Comments
 (0)