Skip to content

Commit 7818b33

Browse files
lazebnyioctavia-squidington-iii
andauthored
feat(cdk): update ConfigComponentsResolver to support list of stream_config (#553)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 155cdc8 commit 7818b33

File tree

7 files changed

+190
-83
lines changed

7 files changed

+190
-83
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4055,6 +4055,11 @@ definitions:
40554055
title: Value Type
40564056
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
40574057
"$ref": "#/definitions/ValueType"
4058+
create_or_update:
4059+
title: Create or Update
4060+
description: Determines whether to create a new path if it doesn't exist (true) or only update existing paths (false). When set to true, the resolver will create new paths in the stream template if they don't exist. When false (default), it will only update existing paths.
4061+
type: boolean
4062+
default: false
40584063
$parameters:
40594064
type: object
40604065
additionalProperties: true
@@ -4106,6 +4111,12 @@ definitions:
41064111
- ["data"]
41074112
- ["data", "streams"]
41084113
- ["data", "{{ parameters.name }}"]
4114+
default_values:
4115+
title: Default Values
4116+
description: A list of default values, each matching the structure expected from the parsed component value.
4117+
type: array
4118+
items:
4119+
type: object
41094120
$parameters:
41104121
type: object
41114122
additionalProperties: true
@@ -4117,7 +4128,11 @@ definitions:
41174128
type: string
41184129
enum: [ConfigComponentsResolver]
41194130
stream_config:
4120-
"$ref": "#/definitions/StreamConfig"
4131+
anyOf:
4132+
- type: array
4133+
items:
4134+
"$ref": "#/definitions/StreamConfig"
4135+
- "$ref": "#/definitions/StreamConfig"
41214136
components_mapping:
41224137
type: array
41234138
items:

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,9 +300,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
300300
}
301301
)
302302

303-
stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
304-
self._source_config, config
305-
)
303+
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
306304

307305
api_budget_model = self._source_config.get("api_budget")
308306
if api_budget_model:

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1478,6 +1478,11 @@ class ComponentMappingDefinition(BaseModel):
14781478
description="The expected data type of the value. If omitted, the type will be inferred from the value provided.",
14791479
title="Value Type",
14801480
)
1481+
create_or_update: Optional[bool] = Field(
1482+
False,
1483+
description="Determines whether to create a new path if it doesn't exist (true) or only update existing paths (false). When set to true, the resolver will create new paths in the stream template if they don't exist. When false (default), it will only update existing paths.",
1484+
title="Create or Update",
1485+
)
14811486
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
14821487

14831488

@@ -1489,12 +1494,17 @@ class StreamConfig(BaseModel):
14891494
examples=[["data"], ["data", "streams"], ["data", "{{ parameters.name }}"]],
14901495
title="Configs Pointer",
14911496
)
1497+
default_values: Optional[List[Dict[str, Any]]] = Field(
1498+
None,
1499+
description="A list of default values, each matching the structure expected from the parsed component value.",
1500+
title="Default Values",
1501+
)
14921502
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
14931503

14941504

14951505
class ConfigComponentsResolver(BaseModel):
14961506
type: Literal["ConfigComponentsResolver"]
1497-
stream_config: StreamConfig
1507+
stream_config: Union[List[StreamConfig], StreamConfig]
14981508
components_mapping: List[ComponentMappingDefinition]
14991509
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
15001510

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3754,6 +3754,7 @@ def create_components_mapping_definition(
37543754
field_path=field_path, # type: ignore[arg-type] # field_path can be str and InterpolatedString
37553755
value=interpolated_value,
37563756
value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type),
3757+
create_or_update=model.create_or_update,
37573758
parameters=model.parameters or {},
37583759
)
37593760

@@ -3800,16 +3801,24 @@ def create_stream_config(
38003801

38013802
return StreamConfig(
38023803
configs_pointer=model_configs_pointer,
3804+
default_values=model.default_values,
38033805
parameters=model.parameters or {},
38043806
)
38053807

38063808
def create_config_components_resolver(
38073809
self, model: ConfigComponentsResolverModel, config: Config
38083810
) -> Any:
3809-
stream_config = self._create_component_from_model(
3810-
model.stream_config, config=config, parameters=model.parameters or {}
3811+
model_stream_configs = (
3812+
model.stream_config if isinstance(model.stream_config, list) else [model.stream_config]
38113813
)
38123814

3815+
stream_configs = [
3816+
self._create_component_from_model(
3817+
stream_config, config=config, parameters=model.parameters or {}
3818+
)
3819+
for stream_config in model_stream_configs
3820+
]
3821+
38133822
components_mapping = [
38143823
self._create_component_from_model(
38153824
model=components_mapping_definition_model,
@@ -3822,7 +3831,7 @@ def create_config_components_resolver(
38223831
]
38233832

38243833
return ConfigComponentsResolver(
3825-
stream_config=stream_config,
3834+
stream_configs=stream_configs,
38263835
config=config,
38273836
components_mapping=components_mapping,
38283837
parameters=model.parameters or {},

airbyte_cdk/sources/declarative/resolvers/components_resolver.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class ComponentMappingDefinition:
2222
value: Union["InterpolatedString", str]
2323
value_type: Optional[Type[Any]]
2424
parameters: InitVar[Mapping[str, Any]]
25+
create_or_update: Optional[bool] = False
2526

2627

2728
@dataclass(frozen=True)
@@ -34,6 +35,7 @@ class ResolvedComponentMappingDefinition:
3435
value: "InterpolatedString"
3536
value_type: Optional[Type[Any]]
3637
parameters: InitVar[Mapping[str, Any]]
38+
create_or_update: Optional[bool] = False
3739

3840

3941
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)

airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44

55
from copy import deepcopy
66
from dataclasses import InitVar, dataclass, field
7-
from typing import Any, Dict, Iterable, List, Mapping, Union
7+
from itertools import product
8+
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union
89

910
import dpath
11+
import yaml
1012
from typing_extensions import deprecated
13+
from yaml.parser import ParserError
1114

1215
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1316
from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
@@ -28,6 +31,7 @@ class StreamConfig:
2831

2932
configs_pointer: List[Union[InterpolatedString, str]]
3033
parameters: InitVar[Mapping[str, Any]]
34+
default_values: Optional[List[Any]] = None
3135

3236
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3337
self.configs_pointer = [
@@ -48,7 +52,7 @@ class ConfigComponentsResolver(ComponentsResolver):
4852
parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
4953
"""
5054

51-
stream_config: StreamConfig
55+
stream_configs: List[StreamConfig]
5256
config: Config
5357
components_mapping: List[ComponentMappingDefinition]
5458
parameters: InitVar[Mapping[str, Any]]
@@ -82,6 +86,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
8286
field_path=field_path,
8387
value=interpolated_value,
8488
value_type=component_mapping.value_type,
89+
create_or_update=component_mapping.create_or_update,
8590
parameters=parameters,
8691
)
8792
)
@@ -90,18 +95,45 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
9095
f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
9196
)
9297

98+
@staticmethod
99+
def _merge_combination(combo: Iterable[Tuple[int, Any]]) -> Dict[str, Any]:
100+
"""Collapse a combination of ``(idx, elem)`` into one config dict."""
101+
result: Dict[str, Any] = {}
102+
for config_index, (elem_index, elem) in enumerate(combo):
103+
if isinstance(elem, dict):
104+
result.update(elem)
105+
else:
106+
# keep non-dict values under an artificial name
107+
result.setdefault(f"source_config_{config_index}", (elem_index, elem))
108+
return result
109+
93110
@property
94-
def _stream_config(self) -> Iterable[Mapping[str, Any]]:
95-
path = [
96-
node.eval(self.config) if not isinstance(node, str) else node
97-
for node in self.stream_config.configs_pointer
111+
def _stream_config(self) -> List[Dict[str, Any]]:
112+
"""
113+
Build every unique stream-configuration combination defined by
114+
each ``StreamConfig`` and any ``default_values``.
115+
"""
116+
all_indexed_streams = []
117+
for stream_config in self.stream_configs:
118+
path = [
119+
node.eval(self.config) if not isinstance(node, str) else node
120+
for node in stream_config.configs_pointer
121+
]
122+
stream_configs_raw = dpath.get(dict(self.config), path, default=[])
123+
stream_configs = (
124+
list(stream_configs_raw)
125+
if isinstance(stream_configs_raw, list)
126+
else [stream_configs_raw]
127+
)
128+
129+
if stream_config.default_values:
130+
stream_configs.extend(stream_config.default_values)
131+
132+
all_indexed_streams.append([(i, item) for i, item in enumerate(stream_configs)])
133+
return [
134+
self._merge_combination(combo) # type: ignore[arg-type]
135+
for combo in product(*all_indexed_streams)
98136
]
99-
stream_config = dpath.get(dict(self.config), path, default=[])
100-
101-
if not isinstance(stream_config, list):
102-
stream_config = [stream_config]
103-
104-
return stream_config
105137

106138
def resolve_components(
107139
self, stream_template_config: Dict[str, Any]
@@ -130,7 +162,27 @@ def resolve_components(
130162
)
131163

132164
path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
165+
parsed_value = self._parse_yaml_if_possible(value)
166+
updated = dpath.set(updated_config, path, parsed_value)
133167

134-
dpath.set(updated_config, path, value)
168+
if parsed_value and not updated and resolved_component.create_or_update:
169+
dpath.new(updated_config, path, parsed_value)
135170

136171
yield updated_config
172+
173+
@staticmethod
174+
def _parse_yaml_if_possible(value: Any) -> Any:
175+
"""
176+
Try to turn value into a Python object by YAML-parsing it.
177+
178+
* If value is a `str` and can be parsed by `yaml.safe_load`,
179+
return the parsed result.
180+
* If parsing fails (`yaml.parser.ParserError`) – or value is not
181+
a string at all – return the original value unchanged.
182+
"""
183+
if isinstance(value, str):
184+
try:
185+
return yaml.safe_load(value)
186+
except ParserError: # "{{ record[0] in ['cohortActiveUsers'] }}" # not valid YAML
187+
return value
188+
return value

0 commit comments

Comments
 (0)