Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -2259,6 +2259,10 @@ definitions:
title: Delete Origin Value
description: Whether to delete the origin value or keep it. Default is False.
type: boolean
replace_record:
title: Replace Origin Record
description: Whether to replace the origin record or not. Default is False.
type: boolean
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,11 @@ class DpathFlattenFields(BaseModel):
description="Whether to delete the origin value or keep it. Default is False.",
title="Delete Origin Value",
)
replace_record: Optional[bool] = Field(
None,
description="Whether to replace the origin record or not. Default is False.",
title="Replace Origin Record",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,10 @@ def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any
)
for added_field_definition_model in model.fields
]
return AddFields(fields=added_field_definitions, parameters=model.parameters or {})
return AddFields(
fields=added_field_definitions,
parameters=model.parameters or {},
)

def create_keys_to_lower_transformation(
self, model: KeysToLowerModel, config: Config, **kwargs: Any
Expand Down Expand Up @@ -742,6 +745,7 @@ def create_dpath_flatten_fields(
delete_origin_value=model.delete_origin_value
if model.delete_origin_value is not None
else False,
replace_record=model.replace_record if model.replace_record is not None else False,
parameters=model.parameters or {},
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ class DpathFlattenFields(RecordTransformation):

field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
replace_record: bool = False whether to replace origin record or not. Default is False.

"""

config: Config
field_path: List[Union[InterpolatedString, str]]
parameters: InitVar[Mapping[str, Any]]
delete_origin_value: bool = False
replace_record: bool = False

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [
Expand All @@ -48,8 +50,12 @@ def transform(
extracted = dpath.get(record, path, default=[])

if isinstance(extracted, dict):
conflicts = set(extracted.keys()) & set(record.keys())
if not conflicts:
if self.delete_origin_value:
dpath.delete(record, path)
if self.replace_record and extracted:
dpath.delete(record, "**")
record.update(extracted)
else:
conflicts = set(extracted.keys()) & set(record.keys())
if not conflicts:
if self.delete_origin_value:
dpath.delete(record, path)
record.update(extracted)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

_ANY_VALUE = -1
_DELETE_ORIGIN_VALUE = True
_REPLACE_WITH_VALUE = True
_DO_NOT_DELETE_ORIGIN_VALUE = False
_DO_NOT_REPLACE_WITH_VALUE = False


@pytest.mark.parametrize(
Expand All @@ -13,6 +15,7 @@
"config",
"field_path",
"delete_origin_value",
"replace_record",
"expected_record",
],
[
Expand All @@ -21,6 +24,7 @@
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, don't delete origin value",
),
Expand All @@ -29,6 +33,7 @@
{},
["field2"],
_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field3": _ANY_VALUE},
id="flatten by dpath, delete origin value",
),
Expand All @@ -40,6 +45,7 @@
{},
["field2", "*", "field4"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{
"field1": _ANY_VALUE,
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
Expand All @@ -55,6 +61,7 @@
{},
["field2", "*", "field4"],
_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE},
id="flatten by dpath with *, delete origin value",
),
Expand All @@ -63,6 +70,7 @@
{"field_path": "field2"},
["{{ config['field_path'] }}"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath from config, don't delete origin value",
),
Expand All @@ -71,6 +79,7 @@
{},
["non-existing-field"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
id="flatten by non-existing dpath, don't delete origin value",
),
Expand All @@ -79,6 +88,7 @@
{},
["*", "non-existing-field"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
id="flatten by non-existing dpath with *, don't delete origin value",
),
Expand All @@ -87,6 +97,7 @@
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, not to update when record has field conflicts, don't delete origin value",
),
Expand All @@ -95,16 +106,39 @@
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, not to update when record has field conflicts, delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_REPLACE_WITH_VALUE,
{"field3": _ANY_VALUE},
id="flatten by dpath, replace with value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["field2"],
_DELETE_ORIGIN_VALUE,
_REPLACE_WITH_VALUE,
{"field3": _ANY_VALUE},
id="flatten by dpath, delete_origin_value do not affect to replace_record",
),
],
)
def test_dpath_flatten_lists(
input_record, config, field_path, delete_origin_value, expected_record
input_record, config, field_path, delete_origin_value, replace_record, expected_record
):
flattener = DpathFlattenFields(
field_path=field_path, parameters={}, config=config, delete_origin_value=delete_origin_value
field_path=field_path,
parameters={},
config=config,
delete_origin_value=delete_origin_value,
replace_record=replace_record,
)
flattener.transform(input_record)
assert input_record == expected_record
Loading