Skip to content

Commit ef09cfb

Browse files
darynaishchenkooctavia-squidington-iii
andauthored
feat(low-code): add ParametrizedComponentsResolver (#596)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent dcdd18b commit ef09cfb

File tree

6 files changed

+413
-1
lines changed

6 files changed

+413
-1
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4186,6 +4186,46 @@ definitions:
41864186
- type
41874187
- stream_config
41884188
- components_mapping
4189+
StreamParametersDefinition:
4190+
title: Stream Parameters Definition
4191+
description: (This component is experimental. Use at your own risk.) Represents a stream parameters definition to set up dynamic streams from defined values in manifest.
4192+
type: object
4193+
required:
4194+
- type
4195+
- list_of_parameters_for_stream
4196+
properties:
4197+
type:
4198+
type: string
4199+
enum: [StreamParametersDefinition]
4200+
list_of_parameters_for_stream:
4201+
title: Stream Parameters
4202+
description: A list of object of parameters for stream, each object in the list represents params for one stream.
4203+
type: array
4204+
items:
4205+
type: object
4206+
examples:
4207+
- [{"name": "test stream", "$parameters": {"entity": "test entity"}, "primary_key": "test key"}]
4208+
ParametrizedComponentsResolver:
4209+
type: object
4210+
title: Parametrized Components Resolver
4211+
description: (This component is experimental. Use at your own risk.) Resolves and populates dynamic streams from defined parametrized values in manifest.
4212+
properties:
4213+
type:
4214+
type: string
4215+
enum: [ParametrizedComponentsResolver]
4216+
stream_parameters:
4217+
"$ref": "#/definitions/StreamParametersDefinition"
4218+
components_mapping:
4219+
type: array
4220+
items:
4221+
"$ref": "#/definitions/ComponentMappingDefinition"
4222+
$parameters:
4223+
type: object
4224+
additionalProperties: true
4225+
required:
4226+
- type
4227+
- stream_parameters
4228+
- components_mapping
41894229
DynamicDeclarativeStream:
41904230
type: object
41914231
description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template.
@@ -4212,6 +4252,7 @@ definitions:
42124252
anyOf:
42134253
- "$ref": "#/definitions/HttpComponentsResolver"
42144254
- "$ref": "#/definitions/ConfigComponentsResolver"
4255+
- "$ref": "#/definitions/ParametrizedComponentsResolver"
42154256
required:
42164257
- type
42174258
- stream_template

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1512,6 +1512,31 @@ class ConfigComponentsResolver(BaseModel):
15121512
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
15131513

15141514

1515+
class StreamParametersDefinition(BaseModel):
1516+
type: Literal["StreamParametersDefinition"]
1517+
list_of_parameters_for_stream: List[Dict[str, Any]] = Field(
1518+
...,
1519+
description="A list of object of parameters for stream, each object in the list represents params for one stream.",
1520+
examples=[
1521+
[
1522+
{
1523+
"name": "test stream",
1524+
"$parameters": {"entity": "test entity"},
1525+
"primary_key": "test key",
1526+
}
1527+
]
1528+
],
1529+
title="Stream Parameters",
1530+
)
1531+
1532+
1533+
class ParametrizedComponentsResolver(BaseModel):
1534+
type: Literal["ParametrizedComponentsResolver"]
1535+
stream_parameters: StreamParametersDefinition
1536+
components_mapping: List[ComponentMappingDefinition]
1537+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
1538+
1539+
15151540
class RequestBodyPlainText(BaseModel):
15161541
type: Literal["RequestBodyPlainText"]
15171542
value: str
@@ -2943,7 +2968,9 @@ class DynamicDeclarativeStream(BaseModel):
29432968
stream_template: Union[DeclarativeStream, StateDelegatingStream] = Field(
29442969
..., description="Reference to the stream template.", title="Stream Template"
29452970
)
2946-
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
2971+
components_resolver: Union[
2972+
HttpComponentsResolver, ConfigComponentsResolver, ParametrizedComponentsResolver
2973+
] = Field(
29472974
...,
29482975
description="Component resolve and populates stream templates with components values.",
29492976
title="Components Resolver",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,9 @@
351351
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
352352
PageIncrement as PageIncrementModel,
353353
)
354+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
355+
ParametrizedComponentsResolver as ParametrizedComponentsResolverModel,
356+
)
354357
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
355358
ParentStreamConfig as ParentStreamConfigModel,
356359
)
@@ -504,7 +507,9 @@
504507
ComponentMappingDefinition,
505508
ConfigComponentsResolver,
506509
HttpComponentsResolver,
510+
ParametrizedComponentsResolver,
507511
StreamConfig,
512+
StreamParametersDefinition,
508513
)
509514
from airbyte_cdk.sources.declarative.retrievers import (
510515
AsyncRetriever,
@@ -738,6 +743,7 @@ def _init_mappings(self) -> None:
738743
AsyncRetrieverModel: self.create_async_retriever,
739744
HttpComponentsResolverModel: self.create_http_components_resolver,
740745
ConfigComponentsResolverModel: self.create_config_components_resolver,
746+
ParametrizedComponentsResolverModel: self.create_parametrized_components_resolver,
741747
StreamConfigModel: self.create_stream_config,
742748
ComponentMappingDefinitionModel: self.create_components_mapping_definition,
743749
ZipfileDecoderModel: self.create_zipfile_decoder,
@@ -3861,6 +3867,29 @@ def create_config_components_resolver(
38613867
parameters=model.parameters or {},
38623868
)
38633869

3870+
def create_parametrized_components_resolver(
3871+
self, model: ParametrizedComponentsResolverModel, config: Config
3872+
) -> ParametrizedComponentsResolver:
3873+
stream_parameters = StreamParametersDefinition(
3874+
list_of_parameters_for_stream=model.stream_parameters.list_of_parameters_for_stream
3875+
)
3876+
components_mapping = [
3877+
self._create_component_from_model(
3878+
model=components_mapping_definition_model,
3879+
value_type=ModelToComponentFactory._json_schema_type_name_to_type(
3880+
components_mapping_definition_model.value_type
3881+
),
3882+
config=config,
3883+
)
3884+
for components_mapping_definition_model in model.components_mapping
3885+
]
3886+
return ParametrizedComponentsResolver(
3887+
stream_parameters=stream_parameters,
3888+
config=config,
3889+
components_mapping=components_mapping,
3890+
parameters=model.parameters or {},
3891+
)
3892+
38643893
_UNSUPPORTED_DECODER_ERROR = (
38653894
"Specified decoder of {decoder_type} is not supported for pagination."
38663895
"Please set as `JsonDecoder`, `XmlDecoder`, or a `CompositeRawDecoder` with an inner_parser of `JsonParser` or `GzipParser` instead."

airbyte_cdk/sources/declarative/resolvers/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
from airbyte_cdk.sources.declarative.models import (
1313
HttpComponentsResolver as HttpComponentsResolverModel,
1414
)
15+
from airbyte_cdk.sources.declarative.models import (
16+
ParametrizedComponentsResolver as ParametrizedComponentsResolverModel,
17+
)
1518
from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
1619
ComponentMappingDefinition,
1720
ComponentsResolver,
@@ -24,10 +27,15 @@
2427
from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import (
2528
HttpComponentsResolver,
2629
)
30+
from airbyte_cdk.sources.declarative.resolvers.parametrized_components_resolver import (
31+
ParametrizedComponentsResolver,
32+
StreamParametersDefinition,
33+
)
2734

2835
COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = {
2936
"HttpComponentsResolver": HttpComponentsResolverModel,
3037
"ConfigComponentsResolver": ConfigComponentsResolverModel,
38+
"ParametrizedComponentsResolver": ParametrizedComponentsResolverModel,
3139
}
3240

3341
__all__ = [
@@ -38,4 +46,6 @@
3846
"StreamConfig",
3947
"ConfigComponentsResolver",
4048
"COMPONENTS_RESOLVER_TYPE_MAPPING",
49+
"ParametrizedComponentsResolver",
50+
"StreamParametersDefinition",
4151
]
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from copy import deepcopy
6+
from dataclasses import InitVar, dataclass, field
7+
from typing import Any, Dict, Iterable, List, Mapping
8+
9+
import dpath
10+
import yaml
11+
from typing_extensions import deprecated
12+
from yaml.parser import ParserError
13+
14+
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
15+
from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
16+
ComponentMappingDefinition,
17+
ComponentsResolver,
18+
ResolvedComponentMappingDefinition,
19+
)
20+
from airbyte_cdk.sources.source import ExperimentalClassWarning
21+
from airbyte_cdk.sources.types import Config
22+
23+
24+
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
25+
@dataclass
26+
class StreamParametersDefinition:
27+
"""
28+
Represents a stream parameters definition to set up dynamic streams from defined values in manifest.
29+
"""
30+
31+
list_of_parameters_for_stream: List[Dict[str, Any]]
32+
33+
34+
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
35+
@dataclass
36+
class ParametrizedComponentsResolver(ComponentsResolver):
37+
"""
38+
Resolves and populates dynamic streams from defined parametrized values in manifest.
39+
"""
40+
41+
stream_parameters: StreamParametersDefinition
42+
config: Config
43+
components_mapping: List[ComponentMappingDefinition]
44+
parameters: InitVar[Mapping[str, Any]]
45+
_resolved_components: List[ResolvedComponentMappingDefinition] = field(
46+
init=False, repr=False, default_factory=list
47+
)
48+
49+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
50+
"""
51+
Initializes and parses component mappings, converting them to resolved definitions.
52+
53+
Args:
54+
parameters (Mapping[str, Any]): Parameters for interpolation.
55+
"""
56+
57+
for component_mapping in self.components_mapping:
58+
if isinstance(component_mapping.value, (str, InterpolatedString)):
59+
interpolated_value = (
60+
InterpolatedString.create(component_mapping.value, parameters=parameters)
61+
if isinstance(component_mapping.value, str)
62+
else component_mapping.value
63+
)
64+
65+
field_path = [
66+
InterpolatedString.create(path, parameters=parameters)
67+
for path in component_mapping.field_path
68+
]
69+
70+
self._resolved_components.append(
71+
ResolvedComponentMappingDefinition(
72+
field_path=field_path,
73+
value=interpolated_value,
74+
value_type=component_mapping.value_type,
75+
create_or_update=component_mapping.create_or_update,
76+
parameters=parameters,
77+
)
78+
)
79+
else:
80+
raise ValueError(
81+
f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
82+
)
83+
84+
def resolve_components(
85+
self, stream_template_config: Dict[str, Any]
86+
) -> Iterable[Dict[str, Any]]:
87+
kwargs = {"stream_template_config": stream_template_config}
88+
89+
for components_values in self.stream_parameters.list_of_parameters_for_stream:
90+
updated_config = deepcopy(stream_template_config)
91+
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
92+
for resolved_component in self._resolved_components:
93+
valid_types = (
94+
(resolved_component.value_type,) if resolved_component.value_type else None
95+
)
96+
value = resolved_component.value.eval(
97+
self.config, valid_types=valid_types, **kwargs
98+
)
99+
path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
100+
parsed_value = self._parse_yaml_if_possible(value)
101+
# https://github.com/dpath-maintainers/dpath-python/blob/master/dpath/__init__.py#L136
102+
# dpath.set returns the number of changed elements, 0 when no elements changed
103+
updated = dpath.set(updated_config, path, parsed_value)
104+
105+
if parsed_value and not updated and resolved_component.create_or_update:
106+
dpath.new(updated_config, path, parsed_value)
107+
108+
yield updated_config
109+
110+
@staticmethod
111+
def _parse_yaml_if_possible(value: Any) -> Any:
112+
"""
113+
Try to turn value into a Python object by YAML-parsing it.
114+
115+
* If value is a `str` and can be parsed by `yaml.safe_load`,
116+
return the parsed result.
117+
* If parsing fails (`yaml.parser.ParserError`) – or value is not
118+
a string at all – return the original value unchanged.
119+
"""
120+
if isinstance(value, str):
121+
try:
122+
return yaml.safe_load(value)
123+
except ParserError: # "{{ record[0] in ['cohortActiveUsers'] }}" # not valid YAML
124+
return value
125+
return value

0 commit comments

Comments
 (0)