diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index f834ca20e..f5e9a8548 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3353,7 +3353,7 @@ definitions: - ["code", "type"] PropertiesFromEndpoint: title: Properties from Endpoint - description: Defines the behavior for fetching the list of properties from an API that will be loaded into the requests to extract records. + description: Defines the behavior for fetching the list of properties from an API that will be loaded into the requests to extract records. Note that stream_slices can't be interpolated from this retriever. type: object required: - type diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py b/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py index 1e294bc8e..210e0bc4e 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py @@ -22,19 +22,27 @@ class PropertiesFromEndpoint: config: Config parameters: InitVar[Mapping[str, Any]] + _cached_properties: Optional[List[str]] = None + def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._property_field_path = [ InterpolatedString(string=property_field, parameters=parameters) for property_field in self.property_field_path ] - def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]: - response_properties = self.retriever.read_records( - records_schema={}, stream_slice=stream_slice - ) - for property_obj in response_properties: - path = [ - node.eval(self.config) if not isinstance(node, str) else node - for node in self._property_field_path - ] - yield dpath.get(property_obj, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure + def get_properties_from_endpoint(self) -> List[str]: + if self._cached_properties is None: + self._cached_properties = list( + map( + self._get_property, # type: ignore # SimpleRetriever and AsyncRetriever only returns Record. Should we change the return type of Retriever.read_records? + self.retriever.read_records(records_schema={}, stream_slice=None), + ) + ) + return self._cached_properties + + def _get_property(self, property_obj: Mapping[str, Any]) -> str: + path = [ + node.eval(self.config) if not isinstance(node, str) else node + for node in self._property_field_path + ] + return str(dpath.get(property_obj, path, default=[])) # type: ignore # extracted will be a MutableMapping, given input data structure diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py index 0a3400df8..384a842be 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py @@ -41,7 +41,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def get_request_property_chunks( self, - property_fields: Iterable[str], + property_fields: List[str], always_include_properties: Optional[List[str]], configured_properties: Optional[Set[str]], ) -> Iterable[List[str]]: diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index 424caad77..8bc7f9671 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -30,23 +30,16 @@ class QueryProperties: config: Config parameters: InitVar[Mapping[str, Any]] - def get_request_property_chunks( - self, - stream_slice: Optional[StreamSlice] = None, - ) -> Iterable[List[str]]: + def get_request_property_chunks(self) -> Iterable[List[str]]: """ Uses the defined property_list to fetch the total set of properties dynamically or from a static list and based on the resulting properties, performs property chunking if applicable. - :param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included - because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object - :param configured_stream: The customer configured stream being synced which is needed to identify which - record fields to query for and emit. """ + fields: List[str] configured_properties = self.property_selector.select() if self.property_selector else None - fields: Union[Iterable[str], List[str]] if isinstance(self.property_list, PropertiesFromEndpoint): - fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice) + fields = self.property_list.get_properties_from_endpoint() else: fields = self.property_list if self.property_list else [] diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 55b956e4d..a30574107 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -385,9 +385,9 @@ def _read_pages( response = None try: if self.additional_query_properties: - for properties in self.additional_query_properties.get_request_property_chunks( - stream_slice=stream_slice, - ): + for ( + properties + ) in self.additional_query_properties.get_request_property_chunks(): stream_slice = StreamSlice( partition=stream_slice.partition or {}, cursor_slice=stream_slice.cursor_slice or {}, @@ -523,7 +523,6 @@ def read_records( """ _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check - most_recent_record_from_slice = None record_generator = partial( self._parse_records, stream_slice=stream_slice, diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py b/unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py index f94925d7d..913288575 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py @@ -44,11 +44,7 @@ def test_get_properties_from_endpoint(): parameters={}, ) - properties = list( - properties_from_endpoint.get_properties_from_endpoint( - stream_slice=StreamSlice(cursor_slice={}, partition={}) - ) - ) + properties = properties_from_endpoint.get_properties_from_endpoint() assert len(properties) == 9 assert properties == expected_properties @@ -89,11 +85,7 @@ def test_get_properties_from_endpoint_with_multiple_field_paths(): parameters={}, ) - properties = list( - properties_from_endpoint.get_properties_from_endpoint( - stream_slice=StreamSlice(cursor_slice={}, partition={}) - ) - ) + properties = properties_from_endpoint.get_properties_from_endpoint() assert len(properties) == 9 assert properties == expected_properties @@ -135,11 +127,51 @@ def test_get_properties_from_endpoint_with_interpolation(): parameters={}, ) - properties = list( - properties_from_endpoint.get_properties_from_endpoint( - stream_slice=StreamSlice(cursor_slice={}, partition={}) - ) - ) + properties = properties_from_endpoint.get_properties_from_endpoint() assert len(properties) == 9 assert properties == expected_properties + + +def test_given_multiple_calls_when_get_properties_from_endpoint_then_only_call_retriever_once(): + retriever = Mock(spec=SimpleRetriever) + retriever.read_records.return_value = iter( + [ + Record(stream_name="players", data={"id": "ace", "value": 1}), + ] + ) + + properties_from_endpoint = PropertiesFromEndpoint( + retriever=retriever, + property_field_path=["value"], + config={}, + parameters={}, + ) + + properties_from_endpoint.get_properties_from_endpoint() + properties_from_endpoint.get_properties_from_endpoint() + properties_from_endpoint.get_properties_from_endpoint() + + assert retriever.read_records.call_count == 1 + + +def test_given_value_is_int_when_get_properties_from_endpoint_then_return_str(): + expected_properties = ["1"] + + retriever = Mock(spec=SimpleRetriever) + retriever.read_records.return_value = iter( + [ + Record(stream_name="players", data={"id": "ace", "value": 1}), + ] + ) + + properties_from_endpoint = PropertiesFromEndpoint( + retriever=retriever, + property_field_path=["value"], + config={}, + parameters={}, + ) + + properties = properties_from_endpoint.get_properties_from_endpoint() + + assert properties == expected_properties diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py index c20658e14..2c0e7192b 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py @@ -1,4 +1,5 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. +from typing import Set import pytest @@ -73,7 +74,7 @@ def test_get_request_property_chunks( expected_property_chunks, ): configured_properties = set(property_fields) - property_fields = iter(property_fields) + property_fields = property_fields property_chunking = PropertyChunking( property_limit_type=property_limit_type, property_limit=property_limit, @@ -100,7 +101,7 @@ def test_get_request_property_chunks_empty_configured_properties(): always_include_properties = ["white", "lotus"] property_fields = ["maui", "taormina", "koh_samui", "saint_jean_cap_ferrat"] - configured_properties = set() + configured_properties: Set[str] = set() property_chunking = PropertyChunking( property_limit_type=PropertyLimitType.property_count, property_limit=3, @@ -155,3 +156,27 @@ def test_get_merge_key(): merge_key = property_chunking.get_merge_key(record=record) assert merge_key == "0" + + +def test_given_single_property_chunk_when_get_request_property_chunks_then_always_include_properties_are_not_added_to_input_list(): + """ + This test is used to validate that we don't manipulate the in-memory values from get_request_property_chunks + """ + property_chunking = PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=None, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ) + + property_fields = ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"] + list( + property_chunking.get_request_property_chunks( + property_fields=property_fields, + always_include_properties=["id"], + configured_properties=None, + ) + ) + + assert property_fields == ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"] diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py index dc4721b07..0dbc4076f 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py @@ -50,8 +50,6 @@ def test_get_request_property_chunks_static_list_with_chunking_property_selectio ["santa", "clover", "junpei", "june", "remove_me"] ) - stream_slice = StreamSlice(cursor_slice={}, partition={}) - query_properties = QueryProperties( property_list=[ "ace", @@ -85,7 +83,7 @@ def test_get_request_property_chunks_static_list_with_chunking_property_selectio parameters={}, ) - property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + property_chunks = list(query_properties.get_request_property_chunks()) assert len(property_chunks) == 2 assert property_chunks[0] == ["santa", "clover", "junpei"] @@ -107,8 +105,6 @@ def test_get_request_property_chunks_static_list_with_always_include_properties( ] ) - stream_slice = StreamSlice(cursor_slice={}, partition={}) - query_properties = QueryProperties( property_list=[ "ace", @@ -141,7 +137,7 @@ def test_get_request_property_chunks_static_list_with_always_include_properties( parameters={}, ) - property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + property_chunks = list(query_properties.get_request_property_chunks()) assert len(property_chunks) == 3 assert property_chunks[0] == ["zero", "ace", "snake", "santa"] @@ -154,8 +150,6 @@ def test_get_request_no_property_chunking_selected_properties_always_include_pro ["santa", "clover", "junpei", "june", "remove_me"] ) - stream_slice = StreamSlice(cursor_slice={}, partition={}) - query_properties = QueryProperties( property_list=[ "ace", @@ -182,7 +176,7 @@ def test_get_request_no_property_chunking_selected_properties_always_include_pro parameters={}, ) - property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + property_chunks = list(query_properties.get_request_property_chunks()) assert len(property_chunks) == 1 assert property_chunks[0] == ["zero", "santa", "clover", "junpei", "june"] @@ -203,8 +197,6 @@ def test_get_request_no_property_chunking_always_include_properties(): ] ) - stream_slice = StreamSlice(cursor_slice={}, partition={}) - query_properties = QueryProperties( property_list=[ "ace", @@ -231,7 +223,7 @@ def test_get_request_no_property_chunking_always_include_properties(): parameters={}, ) - property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + property_chunks = list(query_properties.get_request_property_chunks()) assert len(property_chunks) == 1 assert property_chunks[0] == [ @@ -252,9 +244,6 @@ def test_get_request_property_chunks_dynamic_endpoint(): configured_airbyte_stream = _create_configured_airbyte_stream( ["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"] ) - - stream_slice = StreamSlice(cursor_slice={}, partition={}) - properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint) properties_from_endpoint_mock.get_properties_from_endpoint.return_value = iter( ["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"] @@ -282,7 +271,7 @@ def test_get_request_property_chunks_dynamic_endpoint(): parameters={}, ) - property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + property_chunks = list(query_properties.get_request_property_chunks()) assert len(property_chunks) == 2 assert property_chunks[0] == ["alice", "clover", "dio", "k", "luna"] @@ -290,7 +279,6 @@ def test_get_request_property_chunks_dynamic_endpoint(): def test_get_request_property_chunks_with_configured_catalog_static_list(): - stream_slice = StreamSlice(cursor_slice={}, partition={}) # Simulate configured_airbyte_stream whose json_schema only enables 'luna', 'phi', 'sigma' configured_airbyte_stream = _create_configured_airbyte_stream( ["santa", "clover", "junpei", "june"] @@ -328,7 +316,7 @@ def test_get_request_property_chunks_with_configured_catalog_static_list(): parameters={}, ) - property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + property_chunks = list(query_properties.get_request_property_chunks()) assert len(property_chunks) == 2 assert property_chunks[0] == ["santa", "clover", "junpei"] @@ -340,8 +328,6 @@ def test_get_request_property_chunks_with_configured_catalog_dynamic_endpoint(): ["luna", "phi", "sigma", "remove_me"] ) - stream_slice = StreamSlice(cursor_slice={}, partition={}) - properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint) properties_from_endpoint_mock.get_properties_from_endpoint.return_value = iter( ["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"] @@ -369,15 +355,13 @@ def test_get_request_property_chunks_with_configured_catalog_dynamic_endpoint(): parameters={}, ) - property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + property_chunks = list(query_properties.get_request_property_chunks()) assert len(property_chunks) == 1 assert property_chunks[0] == ["luna", "phi", "sigma"] def test_get_request_property_no_property_selection(): - stream_slice = StreamSlice(cursor_slice={}, partition={}) - query_properties = QueryProperties( property_list=[ "ace", @@ -403,7 +387,7 @@ def test_get_request_property_no_property_selection(): parameters={}, ) - property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + property_chunks = list(query_properties.get_request_property_chunks()) assert len(property_chunks) == 3 assert property_chunks[0] == ["ace", "snake", "santa"]