Skip to content

Commit 3630289

Browse files
author
Oleksandr Bazarnov
committed
Merge remote-tracking branch 'origin/main' into baz/cdk/manifest-migrations
2 parents b2a49d3 + 9de6cef commit 3630289

File tree

5 files changed

+163
-4
lines changed

5 files changed

+163
-4
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2330,6 +2330,27 @@ definitions:
23302330
$parameters:
23312331
type: object
23322332
additionalProperties: true
2333+
KeyTransformation:
2334+
title: Transformation to apply for extracted object keys by Dpath Flatten Fields
2335+
type: object
2336+
required:
2337+
- type
2338+
properties:
2339+
type:
2340+
type: string
2341+
enum: [ KeyTransformation ]
2342+
prefix:
2343+
title: Key Prefix
2344+
description: Prefix to add for object keys. If not provided original keys remain unchanged.
2345+
type: string
2346+
examples:
2347+
- flattened_
2348+
suffix:
2349+
title: Key Suffix
2350+
description: Suffix to add for object keys. If not provided original keys remain unchanged.
2351+
type: string
2352+
examples:
2353+
- _flattened
23332354
DpathFlattenFields:
23342355
title: Dpath Flatten Fields
23352356
description: A transformation that flatten field values to the to top of the record.
@@ -2358,6 +2379,11 @@ definitions:
23582379
title: Replace Origin Record
23592380
description: Whether to replace the origin record or not. Default is False.
23602381
type: boolean
2382+
key_transformation:
2383+
title: Key transformation
2384+
description: Transformation for object keys. If not provided, original key will be used.
2385+
type: object
2386+
"$ref": "#/definitions/KeyTransformation"
23612387
$parameters:
23622388
type: object
23632389
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,25 @@ class FlattenFields(BaseModel):
881881
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
882882

883883

884+
class KeyTransformation(BaseModel):
885+
prefix: Optional[Union[str, None]] = Field(
886+
None,
887+
description="Prefix to add for object keys. If not provided original keys remain unchanged.",
888+
examples=[
889+
"flattened_",
890+
],
891+
title="Key Prefix",
892+
)
893+
suffix: Optional[Union[str, None]] = Field(
894+
None,
895+
description="Suffix to add for object keys. If not provided original keys remain unchanged.",
896+
examples=[
897+
"_flattened",
898+
],
899+
title="Key Suffix",
900+
)
901+
902+
884903
class DpathFlattenFields(BaseModel):
885904
type: Literal["DpathFlattenFields"]
886905
field_path: List[str] = Field(
@@ -899,6 +918,11 @@ class DpathFlattenFields(BaseModel):
899918
description="Whether to replace the origin record or not. Default is False.",
900919
title="Replace Origin Record",
901920
)
921+
key_transformation: Optional[Union[KeyTransformation, None]] = Field(
922+
None,
923+
description="Transformation for object keys. If not provided, original key will be used.",
924+
title="Key transformation",
925+
)
902926
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
903927

904928

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,7 @@
502502
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
503503
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
504504
DpathFlattenFields,
505+
KeyTransformation,
505506
)
506507
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
507508
FlattenFields,
@@ -814,13 +815,24 @@ def create_dpath_flatten_fields(
814815
self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any
815816
) -> DpathFlattenFields:
816817
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
818+
key_transformation = (
819+
KeyTransformation(
820+
config=config,
821+
prefix=model.key_transformation.prefix,
822+
suffix=model.key_transformation.suffix,
823+
parameters=model.parameters or {},
824+
)
825+
if model.key_transformation is not None
826+
else None
827+
)
817828
return DpathFlattenFields(
818829
config=config,
819830
field_path=model_field_path,
820831
delete_origin_value=model.delete_origin_value
821832
if model.delete_origin_value is not None
822833
else False,
823834
replace_record=model.replace_record if model.replace_record is not None else False,
835+
key_transformation=key_transformation,
824836
parameters=model.parameters or {},
825837
)
826838

airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,24 @@
88
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
99

1010

11+
@dataclass
12+
class KeyTransformation:
13+
config: Config
14+
parameters: InitVar[Mapping[str, Any]]
15+
prefix: Optional[str] = None
16+
suffix: Optional[str] = None
17+
18+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
19+
if self.prefix is not None:
20+
self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval(
21+
self.config
22+
)
23+
if self.suffix is not None:
24+
self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval(
25+
self.config
26+
)
27+
28+
1129
@dataclass
1230
class DpathFlattenFields(RecordTransformation):
1331
"""
@@ -16,6 +34,7 @@ class DpathFlattenFields(RecordTransformation):
1634
field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
1735
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
1836
replace_record: bool = False whether to replace origin record or not. Default is False.
37+
key_transformation: KeyTransformation = None how to transform extracted object keys
1938
2039
"""
2140

@@ -24,17 +43,35 @@ class DpathFlattenFields(RecordTransformation):
2443
parameters: InitVar[Mapping[str, Any]]
2544
delete_origin_value: bool = False
2645
replace_record: bool = False
46+
key_transformation: Optional[KeyTransformation] = None
2747

2848
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
49+
self._parameters = parameters
2950
self._field_path = [
30-
InterpolatedString.create(path, parameters=parameters) for path in self.field_path
51+
InterpolatedString.create(path, parameters=self._parameters) for path in self.field_path
3152
]
3253
for path_index in range(len(self.field_path)):
3354
if isinstance(self.field_path[path_index], str):
3455
self._field_path[path_index] = InterpolatedString.create(
35-
self.field_path[path_index], parameters=parameters
56+
self.field_path[path_index], parameters=self._parameters
3657
)
3758

59+
def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]:
60+
if self.key_transformation:
61+
if self.key_transformation.prefix:
62+
extracted = {
63+
f"{self.key_transformation.prefix}{key}": value
64+
for key, value in extracted.items()
65+
}
66+
67+
if self.key_transformation.suffix:
68+
extracted = {
69+
f"{key}{self.key_transformation.suffix}": value
70+
for key, value in extracted.items()
71+
}
72+
73+
return extracted
74+
3875
def transform(
3976
self,
4077
record: Dict[str, Any],
@@ -50,6 +87,8 @@ def transform(
5087
extracted = dpath.get(record, path, default=[])
5188

5289
if isinstance(extracted, dict):
90+
extracted = self._apply_key_transformation(extracted)
91+
5392
if self.replace_record and extracted:
5493
dpath.delete(record, "**")
5594
record.update(extracted)

unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
import pytest
22

3-
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import DpathFlattenFields
3+
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
4+
DpathFlattenFields,
5+
KeyTransformation,
6+
)
47

58
_ANY_VALUE = -1
69
_DELETE_ORIGIN_VALUE = True
710
_REPLACE_WITH_VALUE = True
811
_DO_NOT_DELETE_ORIGIN_VALUE = False
912
_DO_NOT_REPLACE_WITH_VALUE = False
13+
_NO_KEY_PREFIX = None
14+
_NO_KEY_SUFFIX = None
15+
_NO_KEY_TRANSFORMATIONS = None
1016

1117

1218
@pytest.mark.parametrize(
@@ -16,6 +22,7 @@
1622
"field_path",
1723
"delete_origin_value",
1824
"replace_record",
25+
"key_transformation",
1926
"expected_record",
2027
],
2128
[
@@ -25,6 +32,7 @@
2532
["field2"],
2633
_DO_NOT_DELETE_ORIGIN_VALUE,
2734
_DO_NOT_REPLACE_WITH_VALUE,
35+
_NO_KEY_TRANSFORMATIONS,
2836
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
2937
id="flatten by dpath, don't delete origin value",
3038
),
@@ -34,6 +42,7 @@
3442
["field2"],
3543
_DELETE_ORIGIN_VALUE,
3644
_DO_NOT_REPLACE_WITH_VALUE,
45+
_NO_KEY_TRANSFORMATIONS,
3746
{"field1": _ANY_VALUE, "field3": _ANY_VALUE},
3847
id="flatten by dpath, delete origin value",
3948
),
@@ -46,6 +55,7 @@
4655
["field2", "*", "field4"],
4756
_DO_NOT_DELETE_ORIGIN_VALUE,
4857
_DO_NOT_REPLACE_WITH_VALUE,
58+
_NO_KEY_TRANSFORMATIONS,
4959
{
5060
"field1": _ANY_VALUE,
5161
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
@@ -62,6 +72,7 @@
6272
["field2", "*", "field4"],
6373
_DELETE_ORIGIN_VALUE,
6474
_DO_NOT_REPLACE_WITH_VALUE,
75+
_NO_KEY_TRANSFORMATIONS,
6576
{"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE},
6677
id="flatten by dpath with *, delete origin value",
6778
),
@@ -71,6 +82,7 @@
7182
["{{ config['field_path'] }}"],
7283
_DO_NOT_DELETE_ORIGIN_VALUE,
7384
_DO_NOT_REPLACE_WITH_VALUE,
85+
_NO_KEY_TRANSFORMATIONS,
7486
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
7587
id="flatten by dpath from config, don't delete origin value",
7688
),
@@ -80,6 +92,7 @@
8092
["non-existing-field"],
8193
_DO_NOT_DELETE_ORIGIN_VALUE,
8294
_DO_NOT_REPLACE_WITH_VALUE,
95+
_NO_KEY_TRANSFORMATIONS,
8396
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
8497
id="flatten by non-existing dpath, don't delete origin value",
8598
),
@@ -89,6 +102,7 @@
89102
["*", "non-existing-field"],
90103
_DO_NOT_DELETE_ORIGIN_VALUE,
91104
_DO_NOT_REPLACE_WITH_VALUE,
105+
_NO_KEY_TRANSFORMATIONS,
92106
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
93107
id="flatten by non-existing dpath with *, don't delete origin value",
94108
),
@@ -98,6 +112,7 @@
98112
["field2"],
99113
_DO_NOT_DELETE_ORIGIN_VALUE,
100114
_DO_NOT_REPLACE_WITH_VALUE,
115+
_NO_KEY_TRANSFORMATIONS,
101116
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
102117
id="flatten by dpath, not to update when record has field conflicts, don't delete origin value",
103118
),
@@ -107,6 +122,7 @@
107122
["field2"],
108123
_DO_NOT_DELETE_ORIGIN_VALUE,
109124
_DO_NOT_REPLACE_WITH_VALUE,
125+
_NO_KEY_TRANSFORMATIONS,
110126
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
111127
id="flatten by dpath, not to update when record has field conflicts, delete origin value",
112128
),
@@ -116,6 +132,7 @@
116132
["field2"],
117133
_DO_NOT_DELETE_ORIGIN_VALUE,
118134
_REPLACE_WITH_VALUE,
135+
_NO_KEY_TRANSFORMATIONS,
119136
{"field3": _ANY_VALUE},
120137
id="flatten by dpath, replace with value",
121138
),
@@ -125,20 +142,61 @@
125142
["field2"],
126143
_DELETE_ORIGIN_VALUE,
127144
_REPLACE_WITH_VALUE,
145+
_NO_KEY_TRANSFORMATIONS,
128146
{"field3": _ANY_VALUE},
129147
id="flatten by dpath, delete_origin_value do not affect to replace_record",
130148
),
149+
pytest.param(
150+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
151+
{},
152+
["field2"],
153+
_DO_NOT_DELETE_ORIGIN_VALUE,
154+
_REPLACE_WITH_VALUE,
155+
("prefix_", _NO_KEY_SUFFIX),
156+
{"prefix_field3": _ANY_VALUE},
157+
id="flatten by dpath, not delete origin value, replace record, add keys prefix",
158+
),
159+
pytest.param(
160+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
161+
{},
162+
["field2"],
163+
_DO_NOT_DELETE_ORIGIN_VALUE,
164+
_REPLACE_WITH_VALUE,
165+
(_NO_KEY_PREFIX, "_suffix"),
166+
{"field3_suffix": _ANY_VALUE},
167+
id="flatten by dpath, not delete origin value, replace record, add keys suffix",
168+
),
169+
pytest.param(
170+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
171+
{},
172+
["field2"],
173+
_DO_NOT_DELETE_ORIGIN_VALUE,
174+
_REPLACE_WITH_VALUE,
175+
("prefix_", "_suffix"),
176+
{"prefix_field3_suffix": _ANY_VALUE},
177+
id="flatten by dpath, not delete origin value, replace record, add keys prefix and suffix",
178+
),
131179
],
132180
)
133181
def test_dpath_flatten_lists(
134-
input_record, config, field_path, delete_origin_value, replace_record, expected_record
182+
input_record,
183+
config,
184+
field_path,
185+
delete_origin_value,
186+
replace_record,
187+
key_transformation,
188+
expected_record,
135189
):
190+
if key_transformation:
191+
key_transformation = KeyTransformation(config, {}, *key_transformation)
192+
136193
flattener = DpathFlattenFields(
137194
field_path=field_path,
138195
parameters={},
139196
config=config,
140197
delete_origin_value=delete_origin_value,
141198
replace_record=replace_record,
199+
key_transformation=key_transformation,
142200
)
143201
flattener.transform(input_record)
144202
assert input_record == expected_record

0 commit comments

Comments
 (0)