Skip to content

Commit cad709e

Browse files
committed
Add unit tests to new query property methods, add docstrings, and fix bug around character chunking
1 parent da5d85c commit cad709e

File tree

10 files changed

+472
-9
lines changed

10 files changed

+472
-9
lines changed
Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
22

33
from dataclasses import InitVar, dataclass
4-
from typing import Any, List, Mapping, Union
4+
from typing import Any, List, Mapping, Optional, Union
55

66
from airbyte_cdk.sources.types import Config, Record
77

88

99
@dataclass
1010
class GroupByKey:
1111
"""
12-
tbd
12+
Record merge strategy that combines records together according to values on the record for one or many keys.
1313
"""
1414

1515
key: Union[str, List[str]]
@@ -19,6 +19,12 @@ class GroupByKey:
1919
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
2020
self._keys = [self.key] if isinstance(self.key, str) else self.key
2121

22-
def get_group_key(self, record: Record) -> str:
23-
resolved_keys = [str(record.data.get(key)) for key in self._keys]
22+
def get_group_key(self, record: Record) -> Optional[str]:
23+
resolved_keys = []
24+
for key in self._keys:
25+
key_value = record.data.get(key)
26+
if key_value:
27+
resolved_keys.append(key_value)
28+
else:
29+
return None
2430
return ",".join(resolved_keys)

airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
22

33
from dataclasses import InitVar, dataclass
4-
from typing import Any, Iterable, List, Mapping, Optional, Union
4+
from typing import Any, Iterable, List, Mapping, Optional
55

66
import dpath
77

airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ class PropertyLimitType(Enum):
2020
@dataclass
2121
class PropertyChunking:
2222
"""
23-
tbd
23+
Defines the behavior for how the complete list of properties to query for are broken down into smaller groups
24+
that will be used for multiple requests to the target API.
2425
"""
2526

2627
property_limit_type: PropertyLimitType
@@ -48,6 +49,7 @@ def get_request_property_chunks(
4849
current_chunk = list(always_include_properties) if always_include_properties else []
4950
chunk_size = 0
5051
for property_field in property_fields:
52+
# If property_limit_type is not defined, we default to property_count which is just an incrementing count
5153
property_field_size = (
5254
len(property_field)
5355
if self.property_limit_type == PropertyLimitType.characters
@@ -61,5 +63,5 @@ def get_request_property_chunks(
6163
chunk_size += property_field_size
6264
yield current_chunk
6365

64-
def get_merge_key(self, record: Record) -> str:
66+
def get_merge_key(self, record: Record) -> Optional[str]:
6567
return self._record_merge_strategy.get_group_key(record=record)

airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
@dataclass
1414
class QueryProperties:
1515
"""
16-
tbd
16+
Low-code component that encompasses the behavior to inject additional property values into the outbound API
17+
requests. Property values can be defined statically within the manifest or dynamically by making requests
18+
to a partner API to retrieve the properties. Query properties also allow for splitting of the total set of
19+
properties into smaller chunks to satisfy API restrictions around the total amount of data retrieved
1720
"""
1821

1922
property_list: Optional[Union[List[str], PropertiesFromEndpoint]]

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,10 @@ def read_records(
505505
current_record
506506
)
507507
)
508-
merged_records[merge_key].update(current_record)
508+
if merge_key:
509+
merged_records[merge_key].update(current_record)
510+
else:
511+
yield stream_data
509512
else:
510513
yield stream_data
511514
if self.cursor:
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
import pytest
4+
5+
from airbyte_cdk.sources.declarative.requesters.query_properties import GroupByKey
6+
from airbyte_cdk.sources.types import Record
7+
8+
9+
@pytest.mark.parametrize(
10+
"key,record,expected_merge_key",
11+
[
12+
pytest.param(
13+
["id"],
14+
Record(
15+
stream_name="test",
16+
data={"id": "0", "first_name": "Belinda", "last_name": "Lindsey"},
17+
),
18+
"0",
19+
id="test_get_merge_key_single",
20+
),
21+
pytest.param(
22+
["last_name", "first_name"],
23+
Record(
24+
stream_name="test", data={"id": "1", "first_name": "Zion", "last_name": "Lindsey"}
25+
),
26+
"Lindsey,Zion",
27+
id="test_get_merge_key_single_multiple",
28+
),
29+
pytest.param(
30+
[""],
31+
Record(stream_name="test", data={}),
32+
None,
33+
id="test_get_merge_key_not_present",
34+
),
35+
],
36+
)
37+
def test_get_merge_key(key, record, expected_merge_key):
38+
group_by_key = GroupByKey(key=key, config={}, parameters={})
39+
40+
merge_key = group_by_key.get_group_key(record=record)
41+
assert merge_key == expected_merge_key
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
from unittest.mock import Mock
4+
5+
from airbyte_cdk.sources.declarative.requesters.query_properties import PropertiesFromEndpoint
6+
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
7+
from airbyte_cdk.sources.types import Record, StreamSlice
8+
9+
CONFIG = {}
10+
11+
12+
def test_get_properties_from_endpoint():
13+
expected_properties = [
14+
"gentarou",
15+
"light",
16+
"aoi",
17+
"clover",
18+
"junpei",
19+
"akane",
20+
"unknown",
21+
"hazuki",
22+
"teruaki",
23+
]
24+
25+
retriever = Mock(spec=SimpleRetriever)
26+
retriever.read_records.return_value = iter(
27+
[
28+
Record(stream_name="players", data={"id": "ace", "name": "gentarou"}),
29+
Record(stream_name="players", data={"id": "snake", "name": "light"}),
30+
Record(stream_name="players", data={"id": "santa", "name": "aoi"}),
31+
Record(stream_name="players", data={"id": "clover", "name": "clover"}),
32+
Record(stream_name="players", data={"id": "junpei", "name": "junpei"}),
33+
Record(stream_name="players", data={"id": "june", "name": "akane"}),
34+
Record(stream_name="players", data={"id": "seven", "name": "unknown"}),
35+
Record(stream_name="players", data={"id": "lotus", "name": "hazuki"}),
36+
Record(stream_name="players", data={"id": "nine", "name": "teruaki"}),
37+
]
38+
)
39+
40+
properties_from_endpoint = PropertiesFromEndpoint(
41+
retriever=retriever,
42+
property_field_path=["name"],
43+
config=CONFIG,
44+
parameters={},
45+
)
46+
47+
properties = list(
48+
properties_from_endpoint.get_properties_from_endpoint(
49+
stream_slice=StreamSlice(cursor_slice={}, partition={})
50+
)
51+
)
52+
53+
assert len(properties) == 9
54+
assert properties == expected_properties
55+
56+
57+
def test_get_properties_from_endpoint_with_multiple_field_paths():
58+
expected_properties = [
59+
"gentarou",
60+
"light",
61+
"aoi",
62+
"clover",
63+
"junpei",
64+
"akane",
65+
"unknown",
66+
"hazuki",
67+
"teruaki",
68+
]
69+
70+
retriever = Mock(spec=SimpleRetriever)
71+
retriever.read_records.return_value = iter(
72+
[
73+
Record(stream_name="players", data={"id": "ace", "names": {"first_name": "gentarou"}}),
74+
Record(stream_name="players", data={"id": "snake", "names": {"first_name": "light"}}),
75+
Record(stream_name="players", data={"id": "santa", "names": {"first_name": "aoi"}}),
76+
Record(stream_name="players", data={"id": "clover", "names": {"first_name": "clover"}}),
77+
Record(stream_name="players", data={"id": "junpei", "names": {"first_name": "junpei"}}),
78+
Record(stream_name="players", data={"id": "june", "names": {"first_name": "akane"}}),
79+
Record(stream_name="players", data={"id": "seven", "names": {"first_name": "unknown"}}),
80+
Record(stream_name="players", data={"id": "lotus", "names": {"first_name": "hazuki"}}),
81+
Record(stream_name="players", data={"id": "nine", "names": {"first_name": "teruaki"}}),
82+
]
83+
)
84+
85+
properties_from_endpoint = PropertiesFromEndpoint(
86+
retriever=retriever,
87+
property_field_path=["names", "first_name"],
88+
config=CONFIG,
89+
parameters={},
90+
)
91+
92+
properties = list(
93+
properties_from_endpoint.get_properties_from_endpoint(
94+
stream_slice=StreamSlice(cursor_slice={}, partition={})
95+
)
96+
)
97+
98+
assert len(properties) == 9
99+
assert properties == expected_properties
100+
101+
102+
def test_get_properties_from_endpoint_with_interpolation():
103+
config = {"top_level_field": "names"}
104+
expected_properties = [
105+
"gentarou",
106+
"light",
107+
"aoi",
108+
"clover",
109+
"junpei",
110+
"akane",
111+
"unknown",
112+
"hazuki",
113+
"teruaki",
114+
]
115+
116+
retriever = Mock(spec=SimpleRetriever)
117+
retriever.read_records.return_value = iter(
118+
[
119+
Record(stream_name="players", data={"id": "ace", "names": {"first_name": "gentarou"}}),
120+
Record(stream_name="players", data={"id": "snake", "names": {"first_name": "light"}}),
121+
Record(stream_name="players", data={"id": "santa", "names": {"first_name": "aoi"}}),
122+
Record(stream_name="players", data={"id": "clover", "names": {"first_name": "clover"}}),
123+
Record(stream_name="players", data={"id": "junpei", "names": {"first_name": "junpei"}}),
124+
Record(stream_name="players", data={"id": "june", "names": {"first_name": "akane"}}),
125+
Record(stream_name="players", data={"id": "seven", "names": {"first_name": "unknown"}}),
126+
Record(stream_name="players", data={"id": "lotus", "names": {"first_name": "hazuki"}}),
127+
Record(stream_name="players", data={"id": "nine", "names": {"first_name": "teruaki"}}),
128+
]
129+
)
130+
131+
properties_from_endpoint = PropertiesFromEndpoint(
132+
retriever=retriever,
133+
property_field_path=["{{ config['top_level_field'] }}", "first_name"],
134+
config=config,
135+
parameters={},
136+
)
137+
138+
properties = list(
139+
properties_from_endpoint.get_properties_from_endpoint(
140+
stream_slice=StreamSlice(cursor_slice={}, partition={})
141+
)
142+
)
143+
144+
assert len(properties) == 9
145+
assert properties == expected_properties
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
import pytest
4+
5+
from airbyte_cdk.sources.declarative.requesters.query_properties import GroupByKey, PropertyChunking
6+
from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import (
7+
PropertyLimitType,
8+
)
9+
from airbyte_cdk.sources.types import Record
10+
11+
CONFIG = {}
12+
13+
14+
@pytest.mark.parametrize(
15+
"property_fields,always_include_properties,property_limit_type,property_limit,expected_property_chunks",
16+
[
17+
pytest.param(
18+
["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"],
19+
None,
20+
PropertyLimitType.property_count,
21+
2,
22+
[["rick", "chelsea"], ["victoria", "tim"], ["saxon", "lochlan"], ["piper"]],
23+
id="test_property_chunking",
24+
),
25+
pytest.param(
26+
["rick", "chelsea", "victoria", "tim"],
27+
["mook", "gaitok"],
28+
PropertyLimitType.property_count,
29+
2,
30+
[["mook", "gaitok", "rick", "chelsea"], ["mook", "gaitok", "victoria", "tim"]],
31+
id="test_property_chunking_with_always_include_fields",
32+
),
33+
pytest.param(
34+
["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"],
35+
None,
36+
PropertyLimitType.property_count,
37+
None,
38+
[["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"]],
39+
id="test_property_chunking_no_limit",
40+
),
41+
pytest.param(
42+
["kate", "laurie", "jaclyn"],
43+
None,
44+
PropertyLimitType.characters,
45+
10,
46+
[["kate", "laurie"], ["jaclyn"]],
47+
id="test_property_chunking_limit_characters",
48+
),
49+
pytest.param(
50+
[],
51+
None,
52+
PropertyLimitType.property_count,
53+
5,
54+
[[]],
55+
id="test_property_chunking_no_properties",
56+
),
57+
],
58+
)
59+
def test_get_request_property_chunks(
60+
property_fields,
61+
always_include_properties,
62+
property_limit_type,
63+
property_limit,
64+
expected_property_chunks,
65+
):
66+
property_fields = iter(property_fields)
67+
property_chunking = PropertyChunking(
68+
property_limit_type=property_limit_type,
69+
property_limit=property_limit,
70+
record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}),
71+
config=CONFIG,
72+
parameters={},
73+
)
74+
75+
property_chunks = list(
76+
property_chunking.get_request_property_chunks(
77+
property_fields=property_fields, always_include_properties=always_include_properties
78+
)
79+
)
80+
81+
assert len(property_chunks) == len(expected_property_chunks)
82+
for i, expected_property_chunk in enumerate(expected_property_chunks):
83+
assert property_chunks[i] == expected_property_chunk
84+
85+
86+
def test_get_merge_key():
87+
record = Record(stream_name="test", data={"id": "0"})
88+
property_chunking = PropertyChunking(
89+
property_limit_type=PropertyLimitType.property_count,
90+
property_limit=10,
91+
record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}),
92+
config=CONFIG,
93+
parameters={},
94+
)
95+
96+
merge_key = property_chunking.get_merge_key(record=record)
97+
assert merge_key == "0"

0 commit comments

Comments
 (0)