Skip to content

Commit f5a08df

Browse files
add ParametrizedComponentsResolver for dynamic streams
1 parent 5797a2f commit f5a08df

File tree

5 files changed

+240
-1
lines changed

5 files changed

+240
-1
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4186,6 +4186,46 @@ definitions:
41864186
- type
41874187
- stream_config
41884188
- components_mapping
4189+
StreamParametersDefinition:
4190+
title: Stream Parameters Definition
4191+
description: (This component is experimental. Use at your own risk.) Represents a stream parameters definition to set up dynamic streams from defined values in manifest.
4192+
type: object
4193+
required:
4194+
- type
4195+
- lisf_of_parameters_for_stream
4196+
properties:
4197+
type:
4198+
type: string
4199+
enum: [StreamParametersDefinition]
4200+
lisf_of_parameters_for_stream:
4201+
title: Stream Parameters
4202+
description: A list of object of parameters for stream, each object in the list represents params for one stream.
4203+
type: array
4204+
items:
4205+
type: object
4206+
examples:
4207+
- [{"name": "test stream", "$parameters": {"entity": "test entity"}, "primary_key": "test key"}]
4208+
ParametrizedComponentsResolver:
4209+
type: object
4210+
title: Parametrized Components Resolver
4211+
description: (This component is experimental. Use at your own risk.) Resolves and populates dynamic streams from defined parametrized values in manifest.
4212+
properties:
4213+
type:
4214+
type: string
4215+
enum: [ParametrizedComponentsResolver]
4216+
stream_parameters:
4217+
"$ref": "#/definitions/StreamParametersDefinition"
4218+
components_mapping:
4219+
type: array
4220+
items:
4221+
"$ref": "#/definitions/ComponentMappingDefinition"
4222+
$parameters:
4223+
type: object
4224+
additionalProperties: true
4225+
required:
4226+
- type
4227+
- stream_parameters
4228+
- components_mapping
41894229
DynamicDeclarativeStream:
41904230
type: object
41914231
description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template.
@@ -4212,6 +4252,7 @@ definitions:
42124252
anyOf:
42134253
- "$ref": "#/definitions/HttpComponentsResolver"
42144254
- "$ref": "#/definitions/ConfigComponentsResolver"
4255+
- "$ref": "#/definitions/ParametrizedComponentsResolver"
42154256
required:
42164257
- type
42174258
- stream_template

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1512,6 +1512,31 @@ class ConfigComponentsResolver(BaseModel):
15121512
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
15131513

15141514

1515+
class StreamParametersDefinition(BaseModel):
1516+
type: Literal["StreamParametersDefinition"]
1517+
lisf_of_parameters_for_stream: List[Dict[str, Any]] = Field(
1518+
...,
1519+
description="A list of object of parameters for stream, each object in the list represents params for one stream.",
1520+
examples=[
1521+
[
1522+
{
1523+
"name": "test stream",
1524+
"$parameters": {"entity": "test entity"},
1525+
"primary_key": "test key",
1526+
}
1527+
]
1528+
],
1529+
title="Stream Parameters",
1530+
)
1531+
1532+
1533+
class ParametrizedComponentsResolver(BaseModel):
1534+
type: Literal["ParametrizedComponentsResolver"]
1535+
stream_parameters: Union[List[StreamParametersDefinition], StreamParametersDefinition]
1536+
components_mapping: List[ComponentMappingDefinition]
1537+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
1538+
1539+
15151540
class RequestBodyPlainText(BaseModel):
15161541
type: Literal["RequestBodyPlainText"]
15171542
value: str
@@ -2943,7 +2968,9 @@ class DynamicDeclarativeStream(BaseModel):
29432968
stream_template: Union[DeclarativeStream, StateDelegatingStream] = Field(
29442969
..., description="Reference to the stream template.", title="Stream Template"
29452970
)
2946-
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
2971+
components_resolver: Union[
2972+
HttpComponentsResolver, ConfigComponentsResolver, ParametrizedComponentsResolver
2973+
] = Field(
29472974
...,
29482975
description="Component resolve and populates stream templates with components values.",
29492976
title="Components Resolver",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@
162162
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
163163
ConfigComponentsResolver as ConfigComponentsResolverModel,
164164
)
165+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
166+
ParametrizedComponentsResolver as ParametrizedComponentsResolverModel,
167+
)
165168
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
166169
ConfigMigration as ConfigMigrationModel,
167170
)
@@ -505,6 +508,8 @@
505508
ConfigComponentsResolver,
506509
HttpComponentsResolver,
507510
StreamConfig,
511+
StreamParametersDefinition,
512+
ParametrizedComponentsResolver,
508513
)
509514
from airbyte_cdk.sources.declarative.retrievers import (
510515
AsyncRetriever,
@@ -738,6 +743,7 @@ def _init_mappings(self) -> None:
738743
AsyncRetrieverModel: self.create_async_retriever,
739744
HttpComponentsResolverModel: self.create_http_components_resolver,
740745
ConfigComponentsResolverModel: self.create_config_components_resolver,
746+
ParametrizedComponentsResolverModel: self.create_parametrized_components_resolver,
741747
StreamConfigModel: self.create_stream_config,
742748
ComponentMappingDefinitionModel: self.create_components_mapping_definition,
743749
ZipfileDecoderModel: self.create_zipfile_decoder,
@@ -3861,6 +3867,37 @@ def create_config_components_resolver(
38613867
parameters=model.parameters or {},
38623868
)
38633869

3870+
def create_parametrized_components_resolver(
3871+
self, model: ParametrizedComponentsResolverModel, config: Config
3872+
) -> ParametrizedComponentsResolver:
3873+
updated_stream_parameters = []
3874+
for stream_parameters in model.stream_parameters.lisf_of_parameters_for_stream:
3875+
try:
3876+
stream_parameters = self._create_component_from_model(
3877+
stream_parameters, config=config, parameters=model.parameters or {}
3878+
)
3879+
except ValueError as ex: # when an object doesn't have a type, we leave it as is
3880+
pass
3881+
updated_stream_parameters.append(stream_parameters)
3882+
3883+
model.stream_parameters.lisf_of_parameters_for_stream = updated_stream_parameters
3884+
components_mapping = [
3885+
self._create_component_from_model(
3886+
model=components_mapping_definition_model,
3887+
value_type=ModelToComponentFactory._json_schema_type_name_to_type(
3888+
components_mapping_definition_model.value_type
3889+
),
3890+
config=config,
3891+
)
3892+
for components_mapping_definition_model in model.components_mapping
3893+
]
3894+
return ParametrizedComponentsResolver(
3895+
stream_parameters=model.stream_parameters,
3896+
config=config,
3897+
components_mapping=components_mapping,
3898+
parameters=model.parameters or {},
3899+
)
3900+
38643901
_UNSUPPORTED_DECODER_ERROR = (
38653902
"Specified decoder of {decoder_type} is not supported for pagination."
38663903
"Please set as `JsonDecoder`, `XmlDecoder`, or a `CompositeRawDecoder` with an inner_parser of `JsonParser` or `GzipParser` instead."

airbyte_cdk/sources/declarative/resolvers/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
from airbyte_cdk.sources.declarative.models import (
1313
HttpComponentsResolver as HttpComponentsResolverModel,
1414
)
15+
from airbyte_cdk.sources.declarative.models import (
16+
ParametrizedComponentsResolver as ParametrizedComponentsResolverModel,
17+
)
1518
from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
1619
ComponentMappingDefinition,
1720
ComponentsResolver,
@@ -24,10 +27,15 @@
2427
from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import (
2528
HttpComponentsResolver,
2629
)
30+
from airbyte_cdk.sources.declarative.resolvers.paramertized_components_resolver import (
31+
ParametrizedComponentsResolver,
32+
StreamParametersDefinition,
33+
)
2734

2835
COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = {
2936
"HttpComponentsResolver": HttpComponentsResolverModel,
3037
"ConfigComponentsResolver": ConfigComponentsResolverModel,
38+
"ParametrizedComponentsResolver": ParametrizedComponentsResolverModel,
3139
}
3240

3341
__all__ = [
@@ -38,4 +46,6 @@
3846
"StreamConfig",
3947
"ConfigComponentsResolver",
4048
"COMPONENTS_RESOLVER_TYPE_MAPPING",
49+
"ParametrizedComponentsResolver",
50+
"StreamParametersDefinition",
4151
]
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from copy import deepcopy
6+
from dataclasses import InitVar, dataclass, field, Field
7+
from itertools import product
8+
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union
9+
10+
import dpath
11+
import yaml
12+
from typing_extensions import deprecated
13+
from yaml.parser import ParserError
14+
15+
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
16+
from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
17+
ComponentMappingDefinition,
18+
ComponentsResolver,
19+
ResolvedComponentMappingDefinition,
20+
)
21+
from airbyte_cdk.sources.source import ExperimentalClassWarning
22+
from airbyte_cdk.sources.types import Config
23+
24+
25+
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
26+
@dataclass
27+
class StreamParametersDefinition:
28+
"""
29+
Represents a stream parameters definition to set up dynamic streams from defined values in manifest.
30+
"""
31+
32+
lisf_of_parameters_for_stream: List[Dict[str, Any]]
33+
34+
35+
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
36+
@dataclass
37+
class ParametrizedComponentsResolver(ComponentsResolver):
38+
"""
39+
Resolves and populates dynamic streams from defined parametrized values in manifest.
40+
"""
41+
42+
stream_parameters: StreamParametersDefinition
43+
config: Config
44+
components_mapping: List[ComponentMappingDefinition]
45+
parameters: InitVar[Mapping[str, Any]]
46+
_resolved_components: List[ResolvedComponentMappingDefinition] = field(
47+
init=False, repr=False, default_factory=list
48+
)
49+
50+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
51+
"""
52+
Initializes and parses component mappings, converting them to resolved definitions.
53+
54+
Args:
55+
parameters (Mapping[str, Any]): Parameters for interpolation.
56+
"""
57+
58+
for component_mapping in self.components_mapping:
59+
if isinstance(component_mapping.value, (str, InterpolatedString)):
60+
interpolated_value = (
61+
InterpolatedString.create(component_mapping.value, parameters=parameters)
62+
if isinstance(component_mapping.value, str)
63+
else component_mapping.value
64+
)
65+
66+
field_path = [
67+
InterpolatedString.create(path, parameters=parameters)
68+
for path in component_mapping.field_path
69+
]
70+
71+
self._resolved_components.append(
72+
ResolvedComponentMappingDefinition(
73+
field_path=field_path,
74+
value=interpolated_value,
75+
value_type=component_mapping.value_type,
76+
create_or_update=component_mapping.create_or_update,
77+
parameters=parameters,
78+
)
79+
)
80+
else:
81+
raise ValueError(
82+
f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
83+
)
84+
85+
def resolve_components(
86+
self, stream_template_config: Dict[str, Any]
87+
) -> Iterable[Dict[str, Any]]:
88+
kwargs = {"stream_template_config": stream_template_config}
89+
90+
for components_values in self.stream_parameters.lisf_of_parameters_for_stream:
91+
updated_config = deepcopy(stream_template_config)
92+
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
93+
for resolved_component in self._resolved_components:
94+
valid_types = (
95+
(resolved_component.value_type,) if resolved_component.value_type else None
96+
)
97+
value = resolved_component.value.eval(
98+
self.config, valid_types=valid_types, **kwargs
99+
)
100+
path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
101+
parsed_value = self._parse_yaml_if_possible(value)
102+
updated = dpath.set(updated_config, path, parsed_value)
103+
104+
if parsed_value and not updated and resolved_component.create_or_update:
105+
dpath.new(updated_config, path, parsed_value)
106+
107+
yield updated_config
108+
109+
@staticmethod
110+
def _parse_yaml_if_possible(value: Any) -> Any:
111+
"""
112+
Try to turn value into a Python object by YAML-parsing it.
113+
114+
* If value is a `str` and can be parsed by `yaml.safe_load`,
115+
return the parsed result.
116+
* If parsing fails (`yaml.parser.ParserError`) – or value is not
117+
a string at all – return the original value unchanged.
118+
"""
119+
if isinstance(value, str):
120+
try:
121+
return yaml.safe_load(value)
122+
except ParserError: # "{{ record[0] in ['cohortActiveUsers'] }}" # not valid YAML
123+
return value
124+
return value

0 commit comments

Comments
 (0)