Skip to content

Commit d077d99

Browse files
feat: Add expand_records_from_field and remain_original_record to DpathExtractor
- Add optional expand_records_from_field parameter to extract items from nested arrays - Add optional remain_original_record parameter to preserve parent record context - Implement _expand_record method to handle array expansion logic - Add comprehensive unit tests covering all edge cases - Maintain backward compatibility with existing functionality Co-Authored-By: unknown <>
1 parent 80b7668 commit d077d99

File tree

2 files changed

+198
-3
lines changed

2 files changed

+198
-3
lines changed

airbyte_cdk/sources/declarative/extractors/dpath_extractor.py

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
from dataclasses import InitVar, dataclass, field
6-
from typing import Any, Iterable, List, Mapping, MutableMapping, Union
6+
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
77

88
import dpath
99
import requests
@@ -24,6 +24,12 @@ class DpathExtractor(RecordExtractor):
2424
If the field path points to an empty object, an empty array is returned.
2525
If the field path points to a non-existing path, an empty array is returned.
2626
27+
Optionally, records can be expanded by extracting items from a nested array field.
28+
When expand_records_from_field is configured, each extracted record is expanded by
29+
extracting items from the specified nested array path and emitting each item as a
30+
separate record. If remain_original_record is True, each expanded record will include
31+
the original parent record in an "original_record" field.
32+
2733
Examples of instantiating this transform:
2834
```
2935
extractor:
@@ -47,16 +53,32 @@ class DpathExtractor(RecordExtractor):
4753
field_path: []
4854
```
4955
56+
```
57+
extractor:
58+
type: DpathExtractor
59+
field_path:
60+
- "data"
61+
- "object"
62+
expand_records_from_field:
63+
- "lines"
64+
- "data"
65+
remain_original_record: true
66+
```
67+
5068
Attributes:
5169
field_path (Union[InterpolatedString, str]): Path to the field that should be extracted
5270
config (Config): The user-provided configuration as specified by the source's spec
5371
decoder (Decoder): The decoder responsible to transfom the response in a Mapping
72+
expand_records_from_field (Optional[List[Union[InterpolatedString, str]]]): Path to a nested array field within each extracted record. If provided, items from this array will be extracted and emitted as separate records.
73+
remain_original_record (bool): If True and expand_records_from_field is set, each expanded record will include the original parent record in an "original_record" field. Defaults to False.
5474
"""
5575

5676
field_path: List[Union[InterpolatedString, str]]
5777
config: Config
5878
parameters: InitVar[Mapping[str, Any]]
5979
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
80+
expand_records_from_field: Optional[List[Union[InterpolatedString, str]]] = None
81+
remain_original_record: bool = False
6082

6183
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
6284
self._field_path = [
@@ -67,6 +89,44 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
6789
self._field_path[path_index] = InterpolatedString.create(
6890
self.field_path[path_index], parameters=parameters
6991
)
92+
93+
if self.expand_records_from_field:
94+
self._expand_path = [
95+
InterpolatedString.create(path, parameters=parameters)
96+
for path in self.expand_records_from_field
97+
]
98+
else:
99+
self._expand_path = None
100+
101+
def _expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
102+
"""Expand a record by extracting items from a nested array field."""
103+
if not self._expand_path:
104+
yield record
105+
return
106+
107+
expand_path = [path.eval(self.config) for path in self._expand_path]
108+
109+
try:
110+
nested_array = dpath.get(record, expand_path)
111+
except (KeyError, TypeError):
112+
yield record
113+
return
114+
115+
if not isinstance(nested_array, list):
116+
yield record
117+
return
118+
119+
if len(nested_array) == 0:
120+
return
121+
122+
for item in nested_array:
123+
if isinstance(item, dict):
124+
expanded_record = dict(item)
125+
if self.remain_original_record:
126+
expanded_record["original_record"] = record
127+
yield expanded_record
128+
else:
129+
yield item
70130

71131
def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
72132
for body in self.decoder.decode(response):
@@ -79,8 +139,9 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin
79139
else:
80140
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
81141
if isinstance(extracted, list):
82-
yield from extracted
142+
for record in extracted:
143+
yield from self._expand_record(record)
83144
elif extracted:
84-
yield extracted
145+
yield from self._expand_record(extracted)
85146
else:
86147
yield from []

unit_tests/sources/declarative/extractors/test_dpath_extractor.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,137 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco
121121
actual_records = list(extractor.extract_records(response))
122122

123123
assert actual_records == expected_records
124+
125+
126+
@pytest.mark.parametrize(
127+
"field_path, expand_records_from_field, remain_original_record, body, expected_records",
128+
[
129+
(
130+
["data", "object"],
131+
["lines", "data"],
132+
False,
133+
{
134+
"data": {
135+
"object": {
136+
"id": "in_123",
137+
"created": 1234567890,
138+
"lines": {
139+
"data": [
140+
{"id": "il_1", "amount": 100},
141+
{"id": "il_2", "amount": 200},
142+
]
143+
},
144+
}
145+
}
146+
},
147+
[
148+
{"id": "il_1", "amount": 100},
149+
{"id": "il_2", "amount": 200},
150+
],
151+
),
152+
(
153+
["data", "object"],
154+
["lines", "data"],
155+
True,
156+
{
157+
"data": {
158+
"object": {
159+
"id": "in_123",
160+
"created": 1234567890,
161+
"lines": {
162+
"data": [
163+
{"id": "il_1", "amount": 100},
164+
]
165+
},
166+
}
167+
}
168+
},
169+
[
170+
{
171+
"id": "il_1",
172+
"amount": 100,
173+
"original_record": {
174+
"id": "in_123",
175+
"created": 1234567890,
176+
"lines": {"data": [{"id": "il_1", "amount": 100}]},
177+
},
178+
},
179+
],
180+
),
181+
(
182+
["data"],
183+
["items"],
184+
False,
185+
{"data": {"id": "parent_1", "items": []}},
186+
[],
187+
),
188+
(
189+
["data"],
190+
["items"],
191+
False,
192+
{"data": {"id": "parent_1"}},
193+
[{"id": "parent_1"}],
194+
),
195+
(
196+
["data"],
197+
["items"],
198+
False,
199+
{"data": {"id": "parent_1", "items": "not_an_array"}},
200+
[{"id": "parent_1", "items": "not_an_array"}],
201+
),
202+
(
203+
["data"],
204+
["nested", "array"],
205+
False,
206+
{"data": {"id": "parent_1", "nested": {"array": [{"id": "child_1"}, {"id": "child_2"}]}}},
207+
[{"id": "child_1"}, {"id": "child_2"}],
208+
),
209+
(
210+
["data"],
211+
["items"],
212+
False,
213+
{"data": {"id": "parent_1", "items": [1, 2, "string", {"id": "dict_item"}]}},
214+
[1, 2, "string", {"id": "dict_item"}],
215+
),
216+
(
217+
[],
218+
["items"],
219+
False,
220+
[
221+
{"id": "parent_1", "items": [{"id": "child_1"}]},
222+
{"id": "parent_2", "items": [{"id": "child_2"}, {"id": "child_3"}]},
223+
],
224+
[{"id": "child_1"}, {"id": "child_2"}, {"id": "child_3"}],
225+
),
226+
],
227+
ids=[
228+
"test_expand_nested_array",
229+
"test_expand_with_original_record",
230+
"test_expand_empty_array_yields_nothing",
231+
"test_expand_missing_path_yields_original",
232+
"test_expand_non_array_yields_original",
233+
"test_expand_deeply_nested_path",
234+
"test_expand_mixed_types_in_array",
235+
"test_expand_multiple_parent_records",
236+
],
237+
)
238+
def test_dpath_extractor_with_expansion(
239+
field_path: List,
240+
expand_records_from_field: List,
241+
remain_original_record: bool,
242+
body,
243+
expected_records: List,
244+
):
245+
extractor = DpathExtractor(
246+
field_path=field_path,
247+
config=config,
248+
decoder=decoder_json,
249+
parameters=parameters,
250+
expand_records_from_field=expand_records_from_field,
251+
remain_original_record=remain_original_record,
252+
)
253+
254+
response = create_response(body)
255+
actual_records = list(extractor.extract_records(response))
256+
257+
assert actual_records == expected_records

0 commit comments

Comments
 (0)