diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index b0eec7d3d..665b4025a 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2259,6 +2259,10 @@ definitions: title: Delete Origin Value description: Whether to delete the origin value or keep it. Default is False. type: boolean + replace_record: + title: Replace Origin Record + description: Whether to replace the origin record or not. Default is False. + type: boolean $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index a49b66c03..4c60f4291 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -877,6 +877,11 @@ class DpathFlattenFields(BaseModel): description="Whether to delete the origin value or keep it. Default is False.", title="Delete Origin Value", ) + replace_record: Optional[bool] = Field( + None, + description="Whether to replace the origin record or not. Default is False.", + title="Replace Origin Record", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c46c76cef..76144370a 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -706,7 +706,10 @@ def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any ) for added_field_definition_model in model.fields ] - return AddFields(fields=added_field_definitions, parameters=model.parameters or {}) + return AddFields( + fields=added_field_definitions, + parameters=model.parameters or {}, + ) def create_keys_to_lower_transformation( self, model: KeysToLowerModel, config: Config, **kwargs: Any @@ -742,6 +745,7 @@ def create_dpath_flatten_fields( delete_origin_value=model.delete_origin_value if model.delete_origin_value is not None else False, + replace_record=model.replace_record if model.replace_record is not None else False, parameters=model.parameters or {}, ) diff --git a/airbyte_cdk/sources/declarative/transformations/add_fields.py b/airbyte_cdk/sources/declarative/transformations/add_fields.py index 59e0c2aeb..d64d86a97 100644 --- a/airbyte_cdk/sources/declarative/transformations/add_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/add_fields.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # from dataclasses import InitVar, dataclass, field diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py index 73162d848..1486f7667 100644 --- a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -15,6 +15,7 @@ class DpathFlattenFields(RecordTransformation): field_path: List[Union[InterpolatedString, str]] path to the field to flatten. delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. + replace_record: bool = False whether to replace origin record or not. Default is False. """ @@ -22,6 +23,7 @@ class DpathFlattenFields(RecordTransformation): field_path: List[Union[InterpolatedString, str]] parameters: InitVar[Mapping[str, Any]] delete_origin_value: bool = False + replace_record: bool = False def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path = [ @@ -48,8 +50,12 @@ def transform( extracted = dpath.get(record, path, default=[]) if isinstance(extracted, dict): - conflicts = set(extracted.keys()) & set(record.keys()) - if not conflicts: - if self.delete_origin_value: - dpath.delete(record, path) + if self.replace_record and extracted: + dpath.delete(record, "**") record.update(extracted) + else: + conflicts = set(extracted.keys()) & set(record.keys()) + if not conflicts: + if self.delete_origin_value: + dpath.delete(record, path) + record.update(extracted) diff --git a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py index 578999636..06842bc7d 100644 --- a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py +++ b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py @@ -4,7 +4,9 @@ _ANY_VALUE = -1 _DELETE_ORIGIN_VALUE = True +_REPLACE_WITH_VALUE = True _DO_NOT_DELETE_ORIGIN_VALUE = False +_DO_NOT_REPLACE_WITH_VALUE = False @pytest.mark.parametrize( @@ -13,6 +15,7 @@ "config", "field_path", "delete_origin_value", + "replace_record", "expected_record", ], [ @@ -21,6 +24,7 @@ {}, ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, + _DO_NOT_REPLACE_WITH_VALUE, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, id="flatten by dpath, don't delete origin value", ), @@ -29,6 +33,7 @@ {}, ["field2"], _DELETE_ORIGIN_VALUE, + _DO_NOT_REPLACE_WITH_VALUE, {"field1": _ANY_VALUE, "field3": _ANY_VALUE}, id="flatten by dpath, delete origin value", ), @@ -40,6 +45,7 @@ {}, ["field2", "*", "field4"], _DO_NOT_DELETE_ORIGIN_VALUE, + _DO_NOT_REPLACE_WITH_VALUE, { "field1": _ANY_VALUE, "field2": {"field3": {"field4": {"field5": _ANY_VALUE}}}, @@ -55,6 +61,7 @@ {}, ["field2", "*", "field4"], _DELETE_ORIGIN_VALUE, + _DO_NOT_REPLACE_WITH_VALUE, {"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE}, id="flatten by dpath with *, delete origin value", ), @@ -63,6 +70,7 @@ {"field_path": "field2"}, ["{{ config['field_path'] }}"], _DO_NOT_DELETE_ORIGIN_VALUE, + _DO_NOT_REPLACE_WITH_VALUE, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, id="flatten by dpath from config, don't delete origin value", ), @@ -71,6 +79,7 @@ {}, ["non-existing-field"], _DO_NOT_DELETE_ORIGIN_VALUE, + _DO_NOT_REPLACE_WITH_VALUE, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, id="flatten by non-existing dpath, don't delete origin value", ), @@ -79,6 +88,7 @@ {}, ["*", "non-existing-field"], _DO_NOT_DELETE_ORIGIN_VALUE, + _DO_NOT_REPLACE_WITH_VALUE, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, id="flatten by non-existing dpath with *, don't delete origin value", ), @@ -87,6 +97,7 @@ {}, ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, + _DO_NOT_REPLACE_WITH_VALUE, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, id="flatten by dpath, not to update when record has field conflicts, don't delete origin value", ), @@ -95,16 +106,39 @@ {}, ["field2"], _DO_NOT_DELETE_ORIGIN_VALUE, + _DO_NOT_REPLACE_WITH_VALUE, {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, id="flatten by dpath, not to update when record has field conflicts, delete origin value", ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + _REPLACE_WITH_VALUE, + {"field3": _ANY_VALUE}, + id="flatten by dpath, replace with value", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DELETE_ORIGIN_VALUE, + _REPLACE_WITH_VALUE, + {"field3": _ANY_VALUE}, + id="flatten by dpath, delete_origin_value do not affect to replace_record", + ), ], ) def test_dpath_flatten_lists( - input_record, config, field_path, delete_origin_value, expected_record + input_record, config, field_path, delete_origin_value, replace_record, expected_record ): flattener = DpathFlattenFields( - field_path=field_path, parameters={}, config=config, delete_origin_value=delete_origin_value + field_path=field_path, + parameters={}, + config=config, + delete_origin_value=delete_origin_value, + replace_record=replace_record, ) flattener.transform(input_record) assert input_record == expected_record