diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 14df7f3aa..24b62a852 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -116,6 +116,19 @@ definitions: type: array items: "$ref": "#/definitions/AddedFieldDefinition" + condition: + description: Fields will be added if expression is evaluated to True. + type: string + default: "" + interpolation_context: + - config + - property + - parameters + examples: + - "{{ property|string == '' }}" + - "{{ property is integer }}" + - "{{ property|length > 5 }}" + - "{{ property == 'some_string_to_match' }}" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 660202e42..3d612b928 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1465,6 +1465,16 @@ class AddFields(BaseModel): description="List of transformations (path and corresponding value) that will be added to the record.", title="Fields", ) + condition: Optional[str] = Field( + "", + description="Fields will be added if expression is evaluated to True.,", + examples=[ + "{{ property|string == '' }}", + "{{ property is integer }}", + "{{ property|length > 5 }}", + "{{ property == 'some_string_to_match' }}", + ], + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 011f1d1a3..686cbb210 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -714,6 +714,7 @@ def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any ] return AddFields( fields=added_field_definitions, + condition=model.condition or "", parameters=model.parameters or {}, ) diff --git a/airbyte_cdk/sources/declarative/transformations/add_fields.py b/airbyte_cdk/sources/declarative/transformations/add_fields.py index d64d86a97..f18b1b70a 100644 --- a/airbyte_cdk/sources/declarative/transformations/add_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/add_fields.py @@ -7,6 +7,7 @@ import dpath +from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.types import Config, FieldPointer, StreamSlice, StreamState @@ -86,11 +87,16 @@ class AddFields(RecordTransformation): fields: List[AddedFieldDefinition] parameters: InitVar[Mapping[str, Any]] + condition: str = "" _parsed_fields: List[ParsedAddFieldDefinition] = field( init=False, repr=False, default_factory=list ) def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._filter_interpolator = InterpolatedBoolean( + condition=self.condition, parameters=parameters + ) + for add_field in self.fields: if len(add_field.path) < 1: raise ValueError( @@ -132,7 +138,9 @@ def transform( for parsed_field in self._parsed_fields: valid_types = (parsed_field.value_type,) if parsed_field.value_type else None value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs) - dpath.new(record, parsed_field.path, value) + is_empty_condition = not self.condition + if is_empty_condition or self._filter_interpolator.eval(config, value=value, **kwargs): + dpath.new(record, parsed_field.path, value) def __eq__(self, other: Any) -> bool: return bool(self.__dict__ == other.__dict__) diff --git a/unit_tests/sources/declarative/transformations/test_add_fields.py b/unit_tests/sources/declarative/transformations/test_add_fields.py index 929af28a6..2d8650291 100644 --- a/unit_tests/sources/declarative/transformations/test_add_fields.py +++ b/unit_tests/sources/declarative/transformations/test_add_fields.py @@ -12,12 +12,13 @@ @pytest.mark.parametrize( - ["input_record", "field", "field_type", "kwargs", "expected"], + ["input_record", "field", "field_type", "condition", "kwargs", "expected"], [ pytest.param( {"k": "v"}, [(["path"], "static_value")], None, + "", {}, {"k": "v", "path": "static_value"}, id="add new static value", @@ -26,6 +27,7 @@ {"k": "v"}, [(["path"], "{{ 1 }}")], None, + "", {}, {"k": "v", "path": 1}, id="add an expression evaluated as a number", @@ -34,6 +36,7 @@ {"k": "v"}, [(["path"], "{{ 1 }}")], str, + "", {}, {"k": "v", "path": "1"}, id="add an expression evaluated as a string using the value_type field", @@ -42,6 +45,7 @@ {"k": "v"}, [(["path"], "static_value"), (["path2"], "static_value2")], None, + "", {}, {"k": "v", "path": "static_value", "path2": "static_value2"}, id="add new multiple static values", @@ -50,6 +54,7 @@ {"k": "v"}, [(["nested", "path"], "static_value")], None, + "", {}, {"k": "v", "nested": {"path": "static_value"}}, id="set static value at nested path", @@ -58,6 +63,7 @@ {"k": "v"}, [(["k"], "new_value")], None, + "", {}, {"k": "new_value"}, id="update value which already exists", @@ -66,6 +72,7 @@ {"k": [0, 1]}, [(["k", 3], "v")], None, + "", {}, {"k": [0, 1, None, "v"]}, id="Set element inside array", @@ -74,6 +81,7 @@ {"k": "v"}, [(["k2"], '{{ config["shop"] }}')], None, + "", {"config": {"shop": "in-n-out"}}, {"k": "v", "k2": "in-n-out"}, id="set a value from the config using bracket notation", @@ -82,6 +90,7 @@ {"k": "v"}, [(["k2"], "{{ config.shop }}")], None, + "", {"config": {"shop": "in-n-out"}}, {"k": "v", "k2": "in-n-out"}, id="set a value from the config using dot notation", @@ -90,6 +99,7 @@ {"k": "v"}, [(["k2"], '{{ stream_slice["start_date"] }}')], None, + "", {"stream_slice": {"start_date": "oct1"}}, {"k": "v", "k2": "oct1"}, id="set a value from the stream slice using bracket notation", @@ -98,6 +108,7 @@ {"k": "v"}, [(["k2"], "{{ stream_slice.start_date }}")], None, + "", {"stream_slice": {"start_date": "oct1"}}, {"k": "v", "k2": "oct1"}, id="set a value from the stream slice using dot notation", @@ -106,6 +117,7 @@ {"k": "v"}, [(["k2"], "{{ record.k }}")], None, + "", {}, {"k": "v", "k2": "v"}, id="set a value from a field in the record using dot notation", @@ -114,6 +126,7 @@ {"k": "v"}, [(["k2"], '{{ record["k"] }}')], None, + "", {}, {"k": "v", "k2": "v"}, id="set a value from a field in the record using bracket notation", @@ -122,6 +135,7 @@ {"k": {"nested": "v"}}, [(["k2"], "{{ record.k.nested }}")], None, + "", {}, {"k": {"nested": "v"}, "k2": "v"}, id="set a value from a nested field in the record using bracket notation", @@ -130,6 +144,7 @@ {"k": {"nested": "v"}}, [(["k2"], '{{ record["k"]["nested"] }}')], None, + "", {}, {"k": {"nested": "v"}, "k2": "v"}, id="set a value from a nested field in the record using bracket notation", @@ -138,16 +153,36 @@ {"k": "v"}, [(["k2"], "{{ 2 + 2 }}")], None, + "", {}, {"k": "v", "k2": 4}, id="set a value from a jinja expression", ), + pytest.param( + {"k": "v"}, + [(["path"], "static_value")], + None, + "{{ False }}", + {}, + {"k": "v"}, + id="do not add any field if condition is boolean False", + ), + pytest.param( + {"k": "v"}, + [(["path"], "static_value")], + None, + "{{ True }}", + {}, + {"k": "v", "path": "static_value"}, + id="add all fields if condition is boolean True", + ), ], ) def test_add_fields( input_record: Mapping[str, Any], field: List[Tuple[FieldPointer, str]], field_type: Optional[str], + condition: Optional[str], kwargs: Mapping[str, Any], expected: Mapping[str, Any], ): @@ -155,5 +190,7 @@ def test_add_fields( AddedFieldDefinition(path=v[0], value=v[1], value_type=field_type, parameters={}) for v in field ] - AddFields(fields=inputs, parameters={"alas": "i live"}).transform(input_record, **kwargs) + AddFields(fields=inputs, condition=condition, parameters={"alas": "i live"}).transform( + input_record, **kwargs + ) assert input_record == expected