Skip to content

Commit 711384c

Browse files
committed
implement config transformations: AddFields and RemoveFields`
1 parent 4727b28 commit 711384c

File tree

5 files changed

+392
-1
lines changed

5 files changed

+392
-1
lines changed

airbyte_cdk/sources/declarative/transformations/config_transformations/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44

5+
from .add_fields import AddFields
56
from .remap_field import RemapField
7+
from .remove_fields import RemoveFields
68

7-
__all__ = ["RemapField"]
9+
__all__ = ["RemapField", "AddFields", "RemoveFields"]
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass, field
6+
from typing import Any, List, MutableMapping, Optional, Type, Union
7+
8+
import dpath
9+
10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
11+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
12+
from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
13+
ConfigTransformation,
14+
)
15+
from airbyte_cdk.sources.types import FieldPointer
16+
17+
18+
@dataclass(frozen=True)
19+
class AddedFieldDefinition:
20+
"""Defines the field to add on a config"""
21+
22+
path: FieldPointer
23+
value: Union[InterpolatedString, str]
24+
value_type: Optional[Type[Any]] = None
25+
26+
27+
@dataclass(frozen=True)
28+
class ParsedAddFieldDefinition:
29+
"""Defines the field to add on a config"""
30+
31+
path: FieldPointer
32+
value: InterpolatedString
33+
value_type: Optional[Type[Any]] = None
34+
35+
36+
@dataclass
37+
class AddFields(ConfigTransformation):
38+
"""
39+
Transformation which adds fields to a config. The path of the added field can be nested. Adding nested fields will create all
40+
necessary parent objects (like mkdir -p).
41+
42+
This transformation has access to the config being transformed.
43+
44+
Examples of instantiating this transformation via YAML:
45+
- type: AddFields
46+
fields:
47+
# hardcoded constant
48+
- path: ["path"]
49+
value: "static_value"
50+
51+
# nested path
52+
- path: ["path", "to", "field"]
53+
value: "static"
54+
55+
# from config
56+
- path: ["derived_field"]
57+
value: "{{ config.original_field }}"
58+
59+
# by supplying any valid Jinja template directive or expression
60+
- path: ["two_times_two"]
61+
value: "{{ 2 * 2 }}"
62+
63+
Attributes:
64+
fields (List[AddedFieldDefinition]): A list of transformations (path and corresponding value) that will be added to the config
65+
"""
66+
67+
fields: List[AddedFieldDefinition]
68+
condition: str = ""
69+
_parsed_fields: List[ParsedAddFieldDefinition] = field(
70+
init=False, repr=False, default_factory=list
71+
)
72+
73+
def __post_init__(self) -> None:
74+
self._filter_interpolator = InterpolatedBoolean(condition=self.condition, parameters={})
75+
76+
for add_field in self.fields:
77+
if len(add_field.path) < 1:
78+
raise ValueError(
79+
f"Expected a non-zero-length path for the AddFields transformation {add_field}"
80+
)
81+
82+
if not isinstance(add_field.value, InterpolatedString):
83+
if not isinstance(add_field.value, str):
84+
raise ValueError(
85+
f"Expected a string value for the AddFields transformation: {add_field}"
86+
)
87+
else:
88+
self._parsed_fields.append(
89+
ParsedAddFieldDefinition(
90+
add_field.path,
91+
InterpolatedString.create(add_field.value, parameters={}),
92+
value_type=add_field.value_type,
93+
)
94+
)
95+
else:
96+
self._parsed_fields.append(
97+
ParsedAddFieldDefinition(
98+
add_field.path,
99+
add_field.value,
100+
value_type=add_field.value_type,
101+
)
102+
)
103+
104+
def transform(
105+
self,
106+
config: MutableMapping[str, Any],
107+
) -> None:
108+
"""
109+
Transforms a config by adding fields based on the provided field definitions.
110+
111+
:param config: The user-provided configuration to be transformed
112+
"""
113+
for parsed_field in self._parsed_fields:
114+
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
115+
value = parsed_field.value.eval(config, valid_types=valid_types)
116+
is_empty_condition = not self.condition
117+
if is_empty_condition or self._filter_interpolator.eval(
118+
config, value=value, path=parsed_field.path
119+
):
120+
dpath.new(config, parsed_field.path, value)
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
import logging
5+
from dataclasses import dataclass
6+
from typing import Any, List, MutableMapping
7+
8+
import dpath
9+
import dpath.exceptions
10+
11+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
12+
from airbyte_cdk.sources.declarative.transformations.config_transformations.config_transformation import (
13+
ConfigTransformation,
14+
)
15+
from airbyte_cdk.sources.types import FieldPointer
16+
17+
logger = logging.getLogger("airbyte")
18+
19+
20+
@dataclass
21+
class RemoveFields(ConfigTransformation):
22+
"""
23+
A transformation which removes fields from a config. The fields removed are designated using FieldPointers.
24+
During transformation, if a field or any of its parents does not exist in the config, no error is thrown.
25+
26+
If an input field pointer references an item in a list (e.g: ["k", 0] in the object {"k": ["a", "b", "c"]}) then
27+
the object at that index is set to None rather than being entirely removed from the list.
28+
29+
It's possible to remove objects nested in lists e.g: removing [".", 0, "k"] from {".": [{"k": "V"}]} results in {".": [{}]}
30+
31+
Usage syntax:
32+
33+
```yaml
34+
config_transformations:
35+
- type: RemoveFields
36+
field_pointers:
37+
- ["path", "to", "field1"]
38+
- ["path2"]
39+
condition: "{{ config.some_flag }}" # Optional condition
40+
```
41+
42+
Attributes:
43+
field_pointers (List[FieldPointer]): pointers to the fields that should be removed
44+
condition (str): Optional condition that determines if the fields should be removed
45+
"""
46+
47+
field_pointers: List[FieldPointer]
48+
condition: str = ""
49+
50+
def __post_init__(self) -> None:
51+
self._filter_interpolator = InterpolatedBoolean(condition=self.condition, parameters={})
52+
53+
def transform(
54+
self,
55+
config: MutableMapping[str, Any],
56+
) -> None:
57+
"""
58+
Transforms a config by removing fields based on the provided field pointers.
59+
60+
:param config: The user-provided configuration to be transformed
61+
"""
62+
if self.condition and not self._filter_interpolator.eval(config):
63+
return
64+
65+
for pointer in self.field_pointers:
66+
try:
67+
dpath.delete(config, pointer)
68+
except dpath.exceptions.PathNotFound:
69+
pass
70+
except Exception as e:
71+
logger.error(f"Error removing field {pointer}: {e}")
72+
raise e
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import pytest
6+
7+
from airbyte_cdk.sources.declarative.transformations.config_transformations.add_fields import (
8+
AddedFieldDefinition,
9+
AddFields,
10+
)
11+
12+
13+
def test_given_valid_static_value_fields_added():
14+
transformation = AddFields(
15+
fields=[
16+
AddedFieldDefinition(path=["new_field"], value="static_value"),
17+
AddedFieldDefinition(path=["another_field"], value="another_value"),
18+
]
19+
)
20+
config = {}
21+
22+
transformation.transform(config)
23+
24+
assert config == {
25+
"new_field": "static_value",
26+
"another_field": "another_value",
27+
}
28+
29+
30+
def test_given_valid_nested_fields_static_value_added():
31+
transformation = AddFields(
32+
fields=[
33+
AddedFieldDefinition(path=["parent", "child", "grandchild"], value="nested_value"),
34+
]
35+
)
36+
config = {}
37+
38+
transformation.transform(config)
39+
40+
assert config == {"parent": {"child": {"grandchild": "nested_value"}}}
41+
42+
43+
def test_given_valid_interpolated_input_field_added():
44+
transformation = AddFields(
45+
fields=[
46+
AddedFieldDefinition(path=["derived_field"], value="{{ config.original_field }}"),
47+
AddedFieldDefinition(path=["expression_result"], value="{{ 2 * 3 }}"),
48+
]
49+
)
50+
config = {"original_field": "original_value"}
51+
52+
transformation.transform(config)
53+
54+
assert config == {
55+
"original_field": "original_value",
56+
"derived_field": "original_value",
57+
"expression_result": 6,
58+
}
59+
60+
61+
def test_given_invalid_field_raises_exception():
62+
with pytest.raises(ValueError):
63+
AddFields(fields=[AddedFieldDefinition(path=[], value="value")])
64+
65+
with pytest.raises(ValueError):
66+
AddFields(fields=[AddedFieldDefinition(path=["valid_path"], value=123)])
67+
68+
69+
def test_given_field_already_exists_value_is_overwritten():
70+
transformation = AddFields(
71+
fields=[
72+
AddedFieldDefinition(path=["existing_field"], value="new_value"),
73+
]
74+
)
75+
config = {"existing_field": "existing_value"}
76+
77+
transformation.transform(config)
78+
79+
assert config["existing_field"] == "new_value"
80+
81+
82+
def test_with_condition_only_adds_fields_when_condition_is_met():
83+
transformation = AddFields(
84+
fields=[
85+
AddedFieldDefinition(path=["conditional_field"], value="added_value"),
86+
],
87+
condition="{{ config.flag == true }}",
88+
)
89+
90+
config_true = {"flag": True}
91+
transformation.transform(config_true)
92+
93+
config_false = {"flag": False}
94+
transformation.transform(config_false)
95+
96+
assert "conditional_field" in config_true
97+
assert "conditional_field" not in config_false

0 commit comments

Comments
 (0)