Skip to content
17 changes: 17 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ definitions:
type: array
items:
"$ref": "#/definitions/AddedFieldDefinition"
condition:
description: Fields will be added if expression is evaluated to True.,
type: string
default: ""
interpolation_context:
- config
- property
- parameters
examples:
- "{{ property|string == '' }}"
- "{{ property is integer }}"
- "{{ property|length > 5 }}"
- "{{ property == 'some_string_to_match' }}"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -2259,6 +2272,10 @@ definitions:
title: Delete Origin Value
description: Whether to delete the origin value or keep it. Default is False.
type: boolean
replace_record:
title: Replace Origin Record
description: Whether to replace the origin record or not. Default is False.
type: boolean
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,11 @@ class DpathFlattenFields(BaseModel):
description="Whether to delete the origin value or keep it. Default is False.",
title="Delete Origin Value",
)
replace_record: Optional[bool] = Field(
None,
description="Whether to replace the origin record or not. Default is False.",
title="Replace Origin Record",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down Expand Up @@ -1460,6 +1465,16 @@ class AddFields(BaseModel):
description="List of transformations (path and corresponding value) that will be added to the record.",
title="Fields",
)
condition: Optional[str] = Field(
"",
description="Fields will be added if expression is evaluated to True.,",
examples=[
"{{ property|string == '' }}",
"{{ property is integer }}",
"{{ property|length > 5 }}",
"{{ property == 'some_string_to_match' }}",
],
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,11 @@ def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any
)
for added_field_definition_model in model.fields
]
return AddFields(fields=added_field_definitions, parameters=model.parameters or {})
return AddFields(
fields=added_field_definitions,
condition=model.condition,
parameters=model.parameters or {},
)

def create_keys_to_lower_transformation(
self, model: KeysToLowerModel, config: Config, **kwargs: Any
Expand Down Expand Up @@ -742,6 +746,7 @@ def create_dpath_flatten_fields(
delete_origin_value=model.delete_origin_value
if model.delete_origin_value is not None
else False,
replace_record=model.replace_record if model.replace_record is not None else False,
parameters=model.parameters or {},
)

Expand Down
12 changes: 10 additions & 2 deletions airbyte_cdk/sources/declarative/transformations/add_fields.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Dict, List, Mapping, Optional, Type, Union

import dpath

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, FieldPointer, StreamSlice, StreamState
Expand Down Expand Up @@ -86,11 +87,16 @@ class AddFields(RecordTransformation):

fields: List[AddedFieldDefinition]
parameters: InitVar[Mapping[str, Any]]
condition: str = ""
_parsed_fields: List[ParsedAddFieldDefinition] = field(
init=False, repr=False, default_factory=list
)

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._filter_interpolator = InterpolatedBoolean(
condition=self.condition, parameters=parameters
)

for add_field in self.fields:
if len(add_field.path) < 1:
raise ValueError(
Expand Down Expand Up @@ -132,7 +138,9 @@ def transform(
for parsed_field in self._parsed_fields:
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs)
dpath.new(record, parsed_field.path, value)
is_empty_condition = not self.condition
if is_empty_condition or self._filter_interpolator.eval(config, value=value, **kwargs):
dpath.new(record, parsed_field.path, value)

def __eq__(self, other: Any) -> bool:
return bool(self.__dict__ == other.__dict__)
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ class DpathFlattenFields(RecordTransformation):

field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
replace_record: bool = False whether to replace origin record or not. Default is False.

"""

config: Config
field_path: List[Union[InterpolatedString, str]]
parameters: InitVar[Mapping[str, Any]]
delete_origin_value: bool = False
replace_record: bool = False

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [
Expand All @@ -48,8 +50,12 @@ def transform(
extracted = dpath.get(record, path, default=[])

if isinstance(extracted, dict):
conflicts = set(extracted.keys()) & set(record.keys())
if not conflicts:
if self.delete_origin_value:
dpath.delete(record, path)
if self.replace_record and extracted:
dpath.delete(record, "**")
record.update(extracted)
else:
conflicts = set(extracted.keys()) & set(record.keys())
if not conflicts:
if self.delete_origin_value:
dpath.delete(record, path)
record.update(extracted)
41 changes: 39 additions & 2 deletions unit_tests/sources/declarative/transformations/test_add_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@


@pytest.mark.parametrize(
["input_record", "field", "field_type", "kwargs", "expected"],
["input_record", "field", "field_type", "condition", "kwargs", "expected"],
[
pytest.param(
{"k": "v"},
[(["path"], "static_value")],
None,
"",
{},
{"k": "v", "path": "static_value"},
id="add new static value",
Expand All @@ -26,6 +27,7 @@
{"k": "v"},
[(["path"], "{{ 1 }}")],
None,
"",
{},
{"k": "v", "path": 1},
id="add an expression evaluated as a number",
Expand All @@ -34,6 +36,7 @@
{"k": "v"},
[(["path"], "{{ 1 }}")],
str,
"",
{},
{"k": "v", "path": "1"},
id="add an expression evaluated as a string using the value_type field",
Expand All @@ -42,6 +45,7 @@
{"k": "v"},
[(["path"], "static_value"), (["path2"], "static_value2")],
None,
"",
{},
{"k": "v", "path": "static_value", "path2": "static_value2"},
id="add new multiple static values",
Expand All @@ -50,6 +54,7 @@
{"k": "v"},
[(["nested", "path"], "static_value")],
None,
"",
{},
{"k": "v", "nested": {"path": "static_value"}},
id="set static value at nested path",
Expand All @@ -58,6 +63,7 @@
{"k": "v"},
[(["k"], "new_value")],
None,
"",
{},
{"k": "new_value"},
id="update value which already exists",
Expand All @@ -66,6 +72,7 @@
{"k": [0, 1]},
[(["k", 3], "v")],
None,
"",
{},
{"k": [0, 1, None, "v"]},
id="Set element inside array",
Expand All @@ -74,6 +81,7 @@
{"k": "v"},
[(["k2"], '{{ config["shop"] }}')],
None,
"",
{"config": {"shop": "in-n-out"}},
{"k": "v", "k2": "in-n-out"},
id="set a value from the config using bracket notation",
Expand All @@ -82,6 +90,7 @@
{"k": "v"},
[(["k2"], "{{ config.shop }}")],
None,
"",
{"config": {"shop": "in-n-out"}},
{"k": "v", "k2": "in-n-out"},
id="set a value from the config using dot notation",
Expand All @@ -90,6 +99,7 @@
{"k": "v"},
[(["k2"], '{{ stream_slice["start_date"] }}')],
None,
"",
{"stream_slice": {"start_date": "oct1"}},
{"k": "v", "k2": "oct1"},
id="set a value from the stream slice using bracket notation",
Expand All @@ -98,6 +108,7 @@
{"k": "v"},
[(["k2"], "{{ stream_slice.start_date }}")],
None,
"",
{"stream_slice": {"start_date": "oct1"}},
{"k": "v", "k2": "oct1"},
id="set a value from the stream slice using dot notation",
Expand All @@ -106,6 +117,7 @@
{"k": "v"},
[(["k2"], "{{ record.k }}")],
None,
"",
{},
{"k": "v", "k2": "v"},
id="set a value from a field in the record using dot notation",
Expand All @@ -114,6 +126,7 @@
{"k": "v"},
[(["k2"], '{{ record["k"] }}')],
None,
"",
{},
{"k": "v", "k2": "v"},
id="set a value from a field in the record using bracket notation",
Expand All @@ -122,6 +135,7 @@
{"k": {"nested": "v"}},
[(["k2"], "{{ record.k.nested }}")],
None,
"",
{},
{"k": {"nested": "v"}, "k2": "v"},
id="set a value from a nested field in the record using bracket notation",
Expand All @@ -130,6 +144,7 @@
{"k": {"nested": "v"}},
[(["k2"], '{{ record["k"]["nested"] }}')],
None,
"",
{},
{"k": {"nested": "v"}, "k2": "v"},
id="set a value from a nested field in the record using bracket notation",
Expand All @@ -138,22 +153,44 @@
{"k": "v"},
[(["k2"], "{{ 2 + 2 }}")],
None,
"",
{},
{"k": "v", "k2": 4},
id="set a value from a jinja expression",
),
pytest.param(
{"k": "v"},
[(["path"], "static_value")],
None,
"{{ False }}",
{},
{"k": "v"},
id="do not add any field if condition is boolean False",
),
pytest.param(
{"k": "v"},
[(["path"], "static_value")],
None,
"{{ True }}",
{},
{"k": "v", "path": "static_value"},
id="add all fields if condition is boolean True",
),
],
)
def test_add_fields(
input_record: Mapping[str, Any],
field: List[Tuple[FieldPointer, str]],
field_type: Optional[str],
condition: Optional[str],
kwargs: Mapping[str, Any],
expected: Mapping[str, Any],
):
inputs = [
AddedFieldDefinition(path=v[0], value=v[1], value_type=field_type, parameters={})
for v in field
]
AddFields(fields=inputs, parameters={"alas": "i live"}).transform(input_record, **kwargs)
AddFields(fields=inputs, condition=condition, parameters={"alas": "i live"}).transform(
input_record, **kwargs
)
assert input_record == expected
Loading
Loading