Skip to content

Commit 91690f4

Browse files
refactor: Extract record expansion logic into RecordExpander class
- Create new RecordExpander class in airbyte_cdk/sources/declarative/expanders/ - Move expand_records_from_field and remain_original_record parameters from DpathExtractor to RecordExpander - Update DpathExtractor to accept optional record_expander attribute - Register RecordExpander in manifest component transformer - Update unit tests to use new RecordExpander class structure - All 24 tests passing, MyPy and Ruff checks passing This refactoring improves separation of concerns by isolating record expansion logic into a dedicated component. Co-Authored-By: unknown <>
1 parent 24c8ac9 commit 91690f4

File tree

5 files changed

+115
-57
lines changed

5 files changed

+115
-57
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander
6+
7+
__all__ = ["RecordExpander"]
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import InitVar, dataclass
6+
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
7+
8+
import dpath
9+
10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
11+
from airbyte_cdk.sources.types import Config
12+
13+
14+
@dataclass
15+
class RecordExpander:
16+
"""
17+
Expands records by extracting items from a nested array field.
18+
19+
When configured, this component extracts items from a specified nested array path
20+
within each record and emits each item as a separate record. Optionally, the original
21+
parent record can be embedded in each expanded item for context preservation.
22+
23+
Examples of instantiating this component:
24+
```
25+
record_expander:
26+
type: RecordExpander
27+
expand_records_from_field:
28+
- "lines"
29+
- "data"
30+
remain_original_record: true
31+
```
32+
33+
Attributes:
34+
expand_records_from_field (List[Union[InterpolatedString, str]]): Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records.
35+
remain_original_record (bool): If True, each expanded record will include the original parent record in an "original_record" field. Defaults to False.
36+
config (Config): The user-provided configuration as specified by the source's spec
37+
"""
38+
39+
expand_records_from_field: List[Union[InterpolatedString, str]]
40+
config: Config
41+
parameters: InitVar[Mapping[str, Any]]
42+
remain_original_record: bool = False
43+
44+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
45+
self._expand_path: Optional[List[InterpolatedString]] = [
46+
InterpolatedString.create(path, parameters=parameters)
47+
for path in self.expand_records_from_field
48+
]
49+
50+
def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
51+
"""Expand a record by extracting items from a nested array field."""
52+
if not self._expand_path:
53+
yield record
54+
return
55+
56+
expand_path = [path.eval(self.config) for path in self._expand_path]
57+
58+
try:
59+
nested_array = dpath.get(record, expand_path)
60+
except (KeyError, TypeError):
61+
yield record
62+
return
63+
64+
if not isinstance(nested_array, list):
65+
yield record
66+
return
67+
68+
if len(nested_array) == 0:
69+
return
70+
71+
for item in nested_array:
72+
if isinstance(item, dict):
73+
expanded_record = dict(item)
74+
if self.remain_original_record:
75+
expanded_record["original_record"] = record
76+
yield expanded_record
77+
else:
78+
yield item

airbyte_cdk/sources/declarative/extractors/dpath_extractor.py

Lines changed: 21 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import requests
1010

1111
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
12+
from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander
1213
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
1314
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1415
from airbyte_cdk.sources.types import Config
@@ -24,11 +25,10 @@ class DpathExtractor(RecordExtractor):
2425
If the field path points to an empty object, an empty array is returned.
2526
If the field path points to a non-existing path, an empty array is returned.
2627
27-
Optionally, records can be expanded by extracting items from a nested array field.
28-
When expand_records_from_field is configured, each extracted record is expanded by
29-
extracting items from the specified nested array path and emitting each item as a
30-
separate record. If remain_original_record is True, each expanded record will include
31-
the original parent record in an "original_record" field.
28+
Optionally, records can be expanded by providing a RecordExpander component.
29+
When record_expander is configured, each extracted record is passed through the
30+
expander which extracts items from nested array fields and emits each item as a
31+
separate record.
3232
3333
Examples of instantiating this transform:
3434
```
@@ -59,26 +59,26 @@ class DpathExtractor(RecordExtractor):
5959
field_path:
6060
- "data"
6161
- "object"
62-
expand_records_from_field:
63-
- "lines"
64-
- "data"
65-
remain_original_record: true
62+
record_expander:
63+
type: RecordExpander
64+
expand_records_from_field:
65+
- "lines"
66+
- "data"
67+
remain_original_record: true
6668
```
6769
6870
Attributes:
6971
field_path (Union[InterpolatedString, str]): Path to the field that should be extracted
7072
config (Config): The user-provided configuration as specified by the source's spec
7173
decoder (Decoder): The decoder responsible to transfom the response in a Mapping
72-
expand_records_from_field (Optional[List[Union[InterpolatedString, str]]]): Path to a nested array field within each extracted record. If provided, items from this array will be extracted and emitted as separate records.
73-
remain_original_record (bool): If True and expand_records_from_field is set, each expanded record will include the original parent record in an "original_record" field. Defaults to False.
74+
record_expander (Optional[RecordExpander]): Optional component to expand records by extracting items from nested array fields
7475
"""
7576

7677
field_path: List[Union[InterpolatedString, str]]
7778
config: Config
7879
parameters: InitVar[Mapping[str, Any]]
7980
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
80-
expand_records_from_field: Optional[List[Union[InterpolatedString, str]]] = None
81-
remain_original_record: bool = False
81+
record_expander: Optional[RecordExpander] = None
8282

8383
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
8484
self._field_path = [
@@ -90,46 +90,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
9090
self.field_path[path_index], parameters=parameters
9191
)
9292

93-
if self.expand_records_from_field:
94-
self._expand_path: Optional[List[InterpolatedString]] = [
95-
InterpolatedString.create(path, parameters=parameters)
96-
for path in self.expand_records_from_field
97-
]
98-
else:
99-
self._expand_path = None
100-
101-
def _expand_record(
102-
self, record: MutableMapping[Any, Any]
103-
) -> Iterable[MutableMapping[Any, Any]]:
104-
"""Expand a record by extracting items from a nested array field."""
105-
if not self._expand_path:
106-
yield record
107-
return
108-
109-
expand_path = [path.eval(self.config) for path in self._expand_path]
110-
111-
try:
112-
nested_array = dpath.get(record, expand_path)
113-
except (KeyError, TypeError):
114-
yield record
115-
return
116-
117-
if not isinstance(nested_array, list):
118-
yield record
119-
return
120-
121-
if len(nested_array) == 0:
122-
return
123-
124-
for item in nested_array:
125-
if isinstance(item, dict):
126-
expanded_record = dict(item)
127-
if self.remain_original_record:
128-
expanded_record["original_record"] = record
129-
yield expanded_record
130-
else:
131-
yield item
132-
13393
def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
13494
for body in self.decoder.decode(response):
13595
if len(self._field_path) == 0:
@@ -142,8 +102,14 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin
142102
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
143103
if isinstance(extracted, list):
144104
for record in extracted:
145-
yield from self._expand_record(record)
105+
if self.record_expander:
106+
yield from self.record_expander.expand_record(record)
107+
else:
108+
yield record
146109
elif extracted:
147-
yield from self._expand_record(extracted)
110+
if self.record_expander:
111+
yield from self.record_expander.expand_record(extracted)
112+
else:
113+
yield extracted
148114
else:
149115
yield from []

airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"DefaultPaginator.page_size_option": "RequestOption",
4343
# DpathExtractor
4444
"DpathExtractor.decoder": "JsonDecoder",
45+
"DpathExtractor.record_expander": "RecordExpander",
4546
# HttpRequester
4647
"HttpRequester.error_handler": "DefaultErrorHandler",
4748
# ListPartitionRouter

unit_tests/sources/declarative/extractors/test_dpath_extractor.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
IterableDecoder,
1515
JsonDecoder,
1616
)
17+
from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander
1718
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
1819

1920
config = {"field": "record_array"}
@@ -247,13 +248,18 @@ def test_dpath_extractor_with_expansion(
247248
body,
248249
expected_records: List,
249250
):
251+
record_expander = RecordExpander(
252+
expand_records_from_field=expand_records_from_field,
253+
config=config,
254+
parameters=parameters,
255+
remain_original_record=remain_original_record,
256+
)
250257
extractor = DpathExtractor(
251258
field_path=field_path,
252259
config=config,
253260
decoder=decoder_json,
254261
parameters=parameters,
255-
expand_records_from_field=expand_records_from_field,
256-
remain_original_record=remain_original_record,
262+
record_expander=record_expander,
257263
)
258264

259265
response = create_response(body)

0 commit comments

Comments
 (0)