Skip to content

Commit cd1dc82

Browse files
author
maxime.c
committed
cache properties from endpoint
1 parent 20ae208 commit cd1dc82

File tree

2 files changed

+61
-25
lines changed

2 files changed

+61
-25
lines changed

airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
99
from airbyte_cdk.sources.declarative.retrievers import Retriever
10+
from airbyte_cdk.sources.streams.core import StreamData
1011
from airbyte_cdk.sources.types import Config, StreamSlice
1112

1213

@@ -22,19 +23,22 @@ class PropertiesFromEndpoint:
2223
config: Config
2324
parameters: InitVar[Mapping[str, Any]]
2425

26+
_cached_properties: Optional[List[str]] = None
27+
2528
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
2629
self._property_field_path = [
2730
InterpolatedString(string=property_field, parameters=parameters)
2831
for property_field in self.property_field_path
2932
]
3033

31-
def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]:
32-
response_properties = self.retriever.read_records(
33-
records_schema={}, stream_slice=stream_slice
34-
)
35-
for property_obj in response_properties:
36-
path = [
37-
node.eval(self.config) if not isinstance(node, str) else node
38-
for node in self._property_field_path
39-
]
40-
yield dpath.get(property_obj, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
34+
def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> List[str]:
35+
if self._cached_properties is None:
36+
self._cached_properties = list(map(self._get_property, self.retriever.read_records(records_schema={}, stream_slice=stream_slice))) # type: ignore # extracted will be a MutableMapping, given input data structure
37+
return self._cached_properties
38+
39+
def _get_property(self, property_obj: Mapping[str, Any]) -> str:
40+
path = [
41+
node.eval(self.config) if not isinstance(node, str) else node
42+
for node in self._property_field_path
43+
]
44+
return str(dpath.get(property_obj, path, default=[])) # type: ignore # extracted will be a MutableMapping, given input data structure

unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,7 @@ def test_get_properties_from_endpoint():
4444
parameters={},
4545
)
4646

47-
properties = list(
48-
properties_from_endpoint.get_properties_from_endpoint(
49-
stream_slice=StreamSlice(cursor_slice={}, partition={})
50-
)
51-
)
47+
properties = properties_from_endpoint.get_properties_from_endpoint(stream_slice=StreamSlice(cursor_slice={}, partition={}))
5248

5349
assert len(properties) == 9
5450
assert properties == expected_properties
@@ -89,11 +85,7 @@ def test_get_properties_from_endpoint_with_multiple_field_paths():
8985
parameters={},
9086
)
9187

92-
properties = list(
93-
properties_from_endpoint.get_properties_from_endpoint(
94-
stream_slice=StreamSlice(cursor_slice={}, partition={})
95-
)
96-
)
88+
properties = properties_from_endpoint.get_properties_from_endpoint(stream_slice=StreamSlice(cursor_slice={}, partition={}))
9789

9890
assert len(properties) == 9
9991
assert properties == expected_properties
@@ -135,11 +127,51 @@ def test_get_properties_from_endpoint_with_interpolation():
135127
parameters={},
136128
)
137129

138-
properties = list(
139-
properties_from_endpoint.get_properties_from_endpoint(
140-
stream_slice=StreamSlice(cursor_slice={}, partition={})
141-
)
142-
)
130+
properties = properties_from_endpoint.get_properties_from_endpoint(stream_slice=StreamSlice(cursor_slice={}, partition={}))
143131

144132
assert len(properties) == 9
145133
assert properties == expected_properties
134+
135+
136+
def test_given_multiple_calls_when_get_properties_from_endpoint_then_only_call_retriever_once():
137+
retriever = Mock(spec=SimpleRetriever)
138+
retriever.read_records.return_value = iter(
139+
[
140+
Record(stream_name="players", data={"id": "ace", "value": 1}),
141+
]
142+
)
143+
144+
properties_from_endpoint = PropertiesFromEndpoint(
145+
retriever=retriever,
146+
property_field_path=["value"],
147+
config={},
148+
parameters={},
149+
)
150+
151+
properties_from_endpoint.get_properties_from_endpoint(stream_slice=StreamSlice(cursor_slice={}, partition={}))
152+
properties_from_endpoint.get_properties_from_endpoint(stream_slice=StreamSlice(cursor_slice={}, partition={}))
153+
properties_from_endpoint.get_properties_from_endpoint(stream_slice=StreamSlice(cursor_slice={}, partition={}))
154+
155+
assert retriever.read_records.call_count == 1
156+
157+
158+
def test_given_value_is_int_when_get_properties_from_endpoint_then_return_str():
159+
expected_properties = ["1"]
160+
161+
retriever = Mock(spec=SimpleRetriever)
162+
retriever.read_records.return_value = iter(
163+
[
164+
Record(stream_name="players", data={"id": "ace", "value": 1}),
165+
]
166+
)
167+
168+
properties_from_endpoint = PropertiesFromEndpoint(
169+
retriever=retriever,
170+
property_field_path=["value"],
171+
config={},
172+
parameters={},
173+
)
174+
175+
properties = properties_from_endpoint.get_properties_from_endpoint(stream_slice=StreamSlice(cursor_slice={}, partition={}))
176+
177+
assert properties == expected_properties

0 commit comments

Comments
 (0)