Skip to content

Commit ea877a8

Browse files
committed
Fix merging records for property chunking
1 parent fe7c458 commit ea877a8

File tree

2 files changed

+32
-3
lines changed

2 files changed

+32
-3
lines changed

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing import (
1111
Any,
1212
Callable,
13+
Dict,
1314
Iterable,
1415
List,
1516
Mapping,
@@ -404,7 +405,7 @@ def _read_pages(
404405
)
405406
)
406407
if merge_key:
407-
merged_records[merge_key].update(current_record)
408+
deep_merge(merged_records[merge_key], current_record)
408409
else:
409410
# We should still emit records even if the record did not have a merge key
410411
last_page_size += 1
@@ -623,6 +624,20 @@ def _to_partition_key(to_serialize: Any) -> str:
623624
return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)
624625

625626

627+
def deep_merge(target: Dict[str, Any], source: Union[Record, Dict[str, Any]]) -> None:
628+
"""
629+
Recursively merge two dictionaries, combining nested dictionaries instead of overwriting them.
630+
631+
:param target: The dictionary to merge into (modified in place)
632+
:param source: The dictionary to merge from
633+
"""
634+
for key, value in source.items():
635+
if key in target and isinstance(target[key], dict) and isinstance(value, dict):
636+
deep_merge(target[key], value)
637+
else:
638+
target[key] = value
639+
640+
626641
@dataclass
627642
class SimpleRetrieverTestReadDecorator(SimpleRetriever):
628643
"""

unit_tests/sources/declarative/retrievers/test_simple_retriever.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,6 +1148,10 @@ def test_simple_retriever_with_additional_query_properties():
11481148
"last_name": "hongou",
11491149
"nonary": "second",
11501150
"bracelet": "1",
1151+
"dict_field": {
1152+
"key1": "value1",
1153+
"key2": "value2",
1154+
},
11511155
},
11521156
associated_slice=None,
11531157
stream_name=stream_name,
@@ -1216,7 +1220,12 @@ def test_simple_retriever_with_additional_query_properties():
12161220
record_selector.select_records.side_effect = [
12171221
[
12181222
Record(
1219-
data={"id": "a", "first_name": "gentarou", "last_name": "hongou"},
1223+
data={
1224+
"id": "a",
1225+
"first_name": "gentarou",
1226+
"last_name": "hongou",
1227+
"dict_field": {"key1": "value1"},
1228+
},
12201229
associated_slice=None,
12211230
stream_name=stream_name,
12221231
),
@@ -1263,7 +1272,12 @@ def test_simple_retriever_with_additional_query_properties():
12631272
stream_name=stream_name,
12641273
),
12651274
Record(
1266-
data={"id": "a", "nonary": "second", "bracelet": "1"},
1275+
data={
1276+
"id": "a",
1277+
"nonary": "second",
1278+
"bracelet": "1",
1279+
"dict_field": {"key2": "value2"},
1280+
},
12671281
associated_slice=None,
12681282
stream_name=stream_name,
12691283
),

0 commit comments

Comments
 (0)