Skip to content

Commit c6a9d05

Browse files
feat: Add wildcard support to RecordExpander and remove TypeError
Changes: 1. Remove TypeError from exception handler (only catch KeyError per dpath.get docs) 2. Add wildcard (*) support to RecordExpander for matching multiple arrays 3. Update docstring and schema to document wildcard support 4. Add 5 new unit tests for wildcard expansion scenarios 5. Regenerate models from updated schema When wildcards are used, RecordExpander: - Uses dpath.values() to find all matches - Filters for list-valued matches only - Expands items from all matched lists - Returns nothing if no list matches found All 29 tests passing. Requested by @DanyloGL. Co-Authored-By: unknown <>
1 parent c8a2643 commit c6a9d05

File tree

4 files changed

+142
-24
lines changed

4 files changed

+142
-24
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1934,7 +1934,7 @@ definitions:
19341934
additionalProperties: true
19351935
RecordExpander:
19361936
title: Record Expander
1937-
description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation.
1937+
description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. Supports wildcards (*) for matching multiple arrays.
19381938
type: object
19391939
required:
19401940
- type
@@ -1945,7 +1945,7 @@ definitions:
19451945
enum: [RecordExpander]
19461946
expand_records_from_field:
19471947
title: Expand Records From Field
1948-
description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records.
1948+
description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.
19491949
type: array
19501950
items:
19511951
type: string
@@ -1955,6 +1955,7 @@ definitions:
19551955
- ["lines", "data"]
19561956
- ["items"]
19571957
- ["nested", "array"]
1958+
- ["sections", "*", "items"]
19581959
remain_original_record:
19591960
title: Remain Original Record
19601961
description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.

airbyte_cdk/sources/declarative/expanders/record_expander.py

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ class RecordExpander:
2020
within each record and emits each item as a separate record. Optionally, the original
2121
parent record can be embedded in each expanded item for context preservation.
2222
23+
The expand_records_from_field path supports wildcards (*) for matching multiple arrays.
24+
When wildcards are used, items from all matched arrays are extracted and emitted.
25+
2326
Examples of instantiating this component:
2427
```
2528
record_expander:
@@ -30,8 +33,18 @@ class RecordExpander:
3033
remain_original_record: true
3134
```
3235
36+
```
37+
record_expander:
38+
type: RecordExpander
39+
expand_records_from_field:
40+
- "sections"
41+
- "*"
42+
- "items"
43+
remain_original_record: false
44+
```
45+
3346
Attributes:
34-
expand_records_from_field (List[Union[InterpolatedString, str]]): Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records.
47+
expand_records_from_field (List[Union[InterpolatedString, str]]): Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*).
3548
remain_original_record (bool): If True, each expanded record will include the original parent record in an "original_record" field. Defaults to False.
3649
config (Config): The user-provided configuration as specified by the source's spec
3750
"""
@@ -55,22 +68,40 @@ def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMap
5568

5669
expand_path = [path.eval(self.config) for path in self._expand_path]
5770

58-
try:
59-
nested_array = dpath.get(record, expand_path)
60-
except (KeyError, TypeError):
61-
return
62-
63-
if not isinstance(nested_array, list):
64-
return
65-
66-
if len(nested_array) == 0:
67-
return
68-
69-
for item in nested_array:
70-
if isinstance(item, dict):
71-
expanded_record = dict(item)
72-
if self.remain_original_record:
73-
expanded_record["original_record"] = record
74-
yield expanded_record
75-
else:
76-
yield item
71+
if "*" in expand_path:
72+
matches = dpath.values(record, expand_path)
73+
list_nodes = [m for m in matches if isinstance(m, list)]
74+
if not list_nodes:
75+
return
76+
77+
for nested_array in list_nodes:
78+
if len(nested_array) == 0:
79+
continue
80+
for item in nested_array:
81+
if isinstance(item, dict):
82+
expanded_record = dict(item)
83+
if self.remain_original_record:
84+
expanded_record["original_record"] = record
85+
yield expanded_record
86+
else:
87+
yield item
88+
else:
89+
try:
90+
nested_array = dpath.get(record, expand_path)
91+
except KeyError:
92+
return
93+
94+
if not isinstance(nested_array, list):
95+
return
96+
97+
if len(nested_array) == 0:
98+
return
99+
100+
for item in nested_array:
101+
if isinstance(item, dict):
102+
expanded_record = dict(item)
103+
if self.remain_original_record:
104+
expanded_record["original_record"] = record
105+
yield expanded_record
106+
else:
107+
yield item

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,8 +491,13 @@ class RecordExpander(BaseModel):
491491
type: Literal["RecordExpander"]
492492
expand_records_from_field: List[str] = Field(
493493
...,
494-
description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records.",
495-
examples=[["lines", "data"], ["items"], ["nested", "array"]],
494+
description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.",
495+
examples=[
496+
["lines", "data"],
497+
["items"],
498+
["nested", "array"],
499+
["sections", "*", "items"],
500+
],
496501
title="Expand Records From Field",
497502
)
498503
remain_original_record: Optional[bool] = Field(

unit_tests/sources/declarative/extractors/test_dpath_extractor.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,82 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco
229229
],
230230
[{"id": "child_1"}, {"id": "child_2"}, {"id": "child_3"}],
231231
),
232+
(
233+
["data"],
234+
["sections", "*", "items"],
235+
False,
236+
{
237+
"data": {
238+
"sections": [
239+
{"name": "section1", "items": [{"id": "item_1"}, {"id": "item_2"}]},
240+
{"name": "section2", "items": [{"id": "item_3"}]},
241+
]
242+
}
243+
},
244+
[{"id": "item_1"}, {"id": "item_2"}, {"id": "item_3"}],
245+
),
246+
(
247+
["data"],
248+
["sections", "*", "items"],
249+
True,
250+
{
251+
"data": {
252+
"sections": [
253+
{"name": "section1", "items": [{"id": "item_1"}]},
254+
]
255+
}
256+
},
257+
[
258+
{
259+
"id": "item_1",
260+
"original_record": {
261+
"sections": [
262+
{"name": "section1", "items": [{"id": "item_1"}]},
263+
]
264+
},
265+
}
266+
],
267+
),
268+
(
269+
["data"],
270+
["sections", "*", "items"],
271+
False,
272+
{
273+
"data": {
274+
"sections": [
275+
{"name": "section1", "items": []},
276+
{"name": "section2", "items": []},
277+
]
278+
}
279+
},
280+
[],
281+
),
282+
(
283+
["data"],
284+
["sections", "*", "items"],
285+
False,
286+
{
287+
"data": {
288+
"sections": [
289+
{"name": "section1"},
290+
{"name": "section2", "items": "not_an_array"},
291+
]
292+
}
293+
},
294+
[],
295+
),
296+
(
297+
["data"],
298+
["*", "items"],
299+
False,
300+
{
301+
"data": {
302+
"group1": {"items": [{"id": "item_1"}]},
303+
"group2": {"items": [{"id": "item_2"}, {"id": "item_3"}]},
304+
}
305+
},
306+
[{"id": "item_1"}, {"id": "item_2"}, {"id": "item_3"}],
307+
),
232308
],
233309
ids=[
234310
"test_expand_nested_array",
@@ -239,6 +315,11 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco
239315
"test_expand_deeply_nested_path",
240316
"test_expand_mixed_types_in_array",
241317
"test_expand_multiple_parent_records",
318+
"test_expand_wildcard_multiple_lists",
319+
"test_expand_wildcard_with_original_record",
320+
"test_expand_wildcard_all_empty_arrays",
321+
"test_expand_wildcard_no_list_matches",
322+
"test_expand_wildcard_dict_values",
242323
],
243324
)
244325
def test_dpath_extractor_with_expansion(

0 commit comments

Comments
 (0)