Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,27 @@ class PropertiesFromEndpoint:
config: Config
parameters: InitVar[Mapping[str, Any]]

_cached_properties: Optional[List[str]] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._property_field_path = [
InterpolatedString(string=property_field, parameters=parameters)
for property_field in self.property_field_path
]

def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]:
response_properties = self.retriever.read_records(
records_schema={}, stream_slice=stream_slice
)
for property_obj in response_properties:
path = [
node.eval(self.config) if not isinstance(node, str) else node
for node in self._property_field_path
]
yield dpath.get(property_obj, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
def get_properties_from_endpoint(self) -> List[str]:
if self._cached_properties is None:
self._cached_properties = list(
map(
self._get_property, # type: ignore # SimpleRetriever and AsyncRetriever only returns Record. Should we change the return type of Retriever.read_records?
self.retriever.read_records(records_schema={}, stream_slice=StreamSlice(partition={}, cursor_slice={})),
)
)
return self._cached_properties

def _get_property(self, property_obj: Mapping[str, Any]) -> str:
path = [
node.eval(self.config) if not isinstance(node, str) else node
for node in self._property_field_path
]
return str(dpath.get(property_obj, path, default=[])) # type: ignore # extracted will be a MutableMapping, given input data structure
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
)

def get_request_property_chunks(
self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]
self, property_fields: List[str], always_include_properties: Optional[List[str]]
) -> Iterable[List[str]]:
if not self.property_limit:
single_property_chunk = list(property_fields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,16 @@ class QueryProperties:
config: Config
parameters: InitVar[Mapping[str, Any]]

def get_request_property_chunks(
self, stream_slice: Optional[StreamSlice] = None
) -> Iterable[List[str]]:
def get_request_property_chunks(self) -> Iterable[List[str]]:
"""
Uses the defined property_list to fetch the total set of properties dynamically or from a static list
and based on the resulting properties, performs property chunking if applicable.
:param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included
because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object
"""
fields: Union[Iterable[str], List[str]]
fields: List[str]
if isinstance(self.property_list, PropertiesFromEndpoint):
fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice)
fields = self.property_list.get_properties_from_endpoint()
else:
fields = self.property_list if self.property_list else []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,7 @@ def _read_pages(
response = None
try:
if self.additional_query_properties:
for properties in self.additional_query_properties.get_request_property_chunks(
stream_slice=stream_slice
):
for properties in self.additional_query_properties.get_request_property_chunks():
stream_slice = StreamSlice(
partition=stream_slice.partition or {},
cursor_slice=stream_slice.cursor_slice or {},
Expand Down Expand Up @@ -523,7 +521,6 @@ def read_records(
"""
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check

most_recent_record_from_slice = None
record_generator = partial(
self._parse_records,
stream_slice=stream_slice,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ def test_get_properties_from_endpoint():
parameters={},
)

properties = list(
properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)
)
properties = properties_from_endpoint.get_properties_from_endpoint()

assert len(properties) == 9
assert properties == expected_properties
Expand Down Expand Up @@ -89,11 +85,7 @@ def test_get_properties_from_endpoint_with_multiple_field_paths():
parameters={},
)

properties = list(
properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)
)
properties = properties_from_endpoint.get_properties_from_endpoint()

assert len(properties) == 9
assert properties == expected_properties
Expand Down Expand Up @@ -135,11 +127,51 @@ def test_get_properties_from_endpoint_with_interpolation():
parameters={},
)

properties = list(
properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)
)
properties = properties_from_endpoint.get_properties_from_endpoint()

assert len(properties) == 9
assert properties == expected_properties


def test_given_multiple_calls_when_get_properties_from_endpoint_then_only_call_retriever_once():
retriever = Mock(spec=SimpleRetriever)
retriever.read_records.return_value = iter(
[
Record(stream_name="players", data={"id": "ace", "value": 1}),
]
)

properties_from_endpoint = PropertiesFromEndpoint(
retriever=retriever,
property_field_path=["value"],
config={},
parameters={},
)

properties_from_endpoint.get_properties_from_endpoint()
properties_from_endpoint.get_properties_from_endpoint()
properties_from_endpoint.get_properties_from_endpoint()

assert retriever.read_records.call_count == 1


def test_given_value_is_int_when_get_properties_from_endpoint_then_return_str():
expected_properties = ["1"]

retriever = Mock(spec=SimpleRetriever)
retriever.read_records.return_value = iter(
[
Record(stream_name="players", data={"id": "ace", "value": 1}),
]
)

properties_from_endpoint = PropertiesFromEndpoint(
retriever=retriever,
property_field_path=["value"],
config={},
parameters={},
)

properties = properties_from_endpoint.get_properties_from_endpoint()

assert properties == expected_properties
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def test_get_request_property_chunks(
property_limit,
expected_property_chunks,
):
property_fields = iter(property_fields)
property_chunking = PropertyChunking(
property_limit_type=property_limit_type,
property_limit=property_limit,
Expand Down Expand Up @@ -104,3 +103,26 @@ def test_get_merge_key():

merge_key = property_chunking.get_merge_key(record=record)
assert merge_key == "0"


def test_given_single_property_chunk_when_get_request_property_chunks_then_always_include_properties_are_not_added_to_input_list():
"""
This test is used to validate that we don't manipulate the in-memory values from get_request_property_chunks
"""
property_chunking = PropertyChunking(
property_limit_type=PropertyLimitType.property_count,
property_limit=None,
record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}),
config=CONFIG,
parameters={},
)

property_fields = ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"]
list(
property_chunking.get_request_property_chunks(
property_fields=property_fields,
always_include_properties=["id"],
)
)

assert property_fields == ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"]
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@


def test_get_request_property_chunks_static_list_with_chunking():
stream_slice = StreamSlice(cursor_slice={}, partition={})

query_properties = QueryProperties(
property_list=[
"ace",
Expand All @@ -43,7 +41,7 @@ def test_get_request_property_chunks_static_list_with_chunking():
parameters={},
)

property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
property_chunks = list(query_properties.get_request_property_chunks())

assert len(property_chunks) == 3
assert property_chunks[0] == ["ace", "snake", "santa"]
Expand All @@ -52,8 +50,6 @@ def test_get_request_property_chunks_static_list_with_chunking():


def test_get_request_property_chunks_static_list_with_always_include_properties():
stream_slice = StreamSlice(cursor_slice={}, partition={})

query_properties = QueryProperties(
property_list=[
"ace",
Expand All @@ -78,7 +74,7 @@ def test_get_request_property_chunks_static_list_with_always_include_properties(
parameters={},
)

property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
property_chunks = list(query_properties.get_request_property_chunks())

assert len(property_chunks) == 3
assert property_chunks[0] == ["zero", "ace", "snake", "santa"]
Expand All @@ -87,7 +83,6 @@ def test_get_request_property_chunks_static_list_with_always_include_properties(


def test_get_request_property_chunks_dynamic_endpoint():
stream_slice = StreamSlice(cursor_slice={}, partition={})

properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint)
properties_from_endpoint_mock.get_properties_from_endpoint.return_value = iter(
Expand All @@ -108,7 +103,7 @@ def test_get_request_property_chunks_dynamic_endpoint():
parameters={},
)

property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
property_chunks = list(query_properties.get_request_property_chunks())

assert len(property_chunks) == 2
assert property_chunks[0] == ["alice", "clover", "dio", "k", "luna"]
Expand Down
Loading