Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3353,7 +3353,7 @@ definitions:
- ["code", "type"]
PropertiesFromEndpoint:
title: Properties from Endpoint
description: Defines the behavior for fetching the list of properties from an API that will be loaded into the requests to extract records.
description: Defines the behavior for fetching the list of properties from an API that will be loaded into the requests to extract records. Note that stream_slices can't be interpolated from this retriever.
type: object
required:
- type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,27 @@ class PropertiesFromEndpoint:
config: Config
parameters: InitVar[Mapping[str, Any]]

_cached_properties: Optional[List[str]] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._property_field_path = [
InterpolatedString(string=property_field, parameters=parameters)
for property_field in self.property_field_path
]

def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]:
response_properties = self.retriever.read_records(
records_schema={}, stream_slice=stream_slice
)
for property_obj in response_properties:
path = [
node.eval(self.config) if not isinstance(node, str) else node
for node in self._property_field_path
]
yield dpath.get(property_obj, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
def get_properties_from_endpoint(self) -> List[str]:
if self._cached_properties is None:
self._cached_properties = list(
map(
self._get_property, # type: ignore # SimpleRetriever and AsyncRetriever only returns Record. Should we change the return type of Retriever.read_records?
self.retriever.read_records(records_schema={}, stream_slice=None),
)
)
return self._cached_properties

def _get_property(self, property_obj: Mapping[str, Any]) -> str:
path = [
node.eval(self.config) if not isinstance(node, str) else node
for node in self._property_field_path
]
return str(dpath.get(property_obj, path, default=[])) # type: ignore # extracted will be a MutableMapping, given input data structure
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

def get_request_property_chunks(
self,
property_fields: Iterable[str],
property_fields: List[str],
always_include_properties: Optional[List[str]],
configured_properties: Optional[Set[str]],
) -> Iterable[List[str]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,16 @@ class QueryProperties:
config: Config
parameters: InitVar[Mapping[str, Any]]

def get_request_property_chunks(
self,
stream_slice: Optional[StreamSlice] = None,
) -> Iterable[List[str]]:
def get_request_property_chunks(self) -> Iterable[List[str]]:
"""
Uses the defined property_list to fetch the total set of properties dynamically or from a static list
and based on the resulting properties, performs property chunking if applicable.
:param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included
because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object
:param configured_stream: The customer configured stream being synced which is needed to identify which
record fields to query for and emit.
"""
fields: List[str]
configured_properties = self.property_selector.select() if self.property_selector else None

fields: Union[Iterable[str], List[str]]
if isinstance(self.property_list, PropertiesFromEndpoint):
fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice)
fields = self.property_list.get_properties_from_endpoint()
else:
fields = self.property_list if self.property_list else []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,9 @@ def _read_pages(
response = None
try:
if self.additional_query_properties:
for properties in self.additional_query_properties.get_request_property_chunks(
stream_slice=stream_slice,
):
for (
properties
) in self.additional_query_properties.get_request_property_chunks():
stream_slice = StreamSlice(
partition=stream_slice.partition or {},
cursor_slice=stream_slice.cursor_slice or {},
Expand Down Expand Up @@ -523,7 +523,6 @@ def read_records(
"""
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check

most_recent_record_from_slice = None
record_generator = partial(
self._parse_records,
stream_slice=stream_slice,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ def test_get_properties_from_endpoint():
parameters={},
)

properties = list(
properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)
)
properties = properties_from_endpoint.get_properties_from_endpoint()

assert len(properties) == 9
assert properties == expected_properties
Expand Down Expand Up @@ -89,11 +85,7 @@ def test_get_properties_from_endpoint_with_multiple_field_paths():
parameters={},
)

properties = list(
properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)
)
properties = properties_from_endpoint.get_properties_from_endpoint()

assert len(properties) == 9
assert properties == expected_properties
Expand Down Expand Up @@ -135,11 +127,51 @@ def test_get_properties_from_endpoint_with_interpolation():
parameters={},
)

properties = list(
properties_from_endpoint.get_properties_from_endpoint(
stream_slice=StreamSlice(cursor_slice={}, partition={})
)
)
properties = properties_from_endpoint.get_properties_from_endpoint()

assert len(properties) == 9
assert properties == expected_properties


def test_given_multiple_calls_when_get_properties_from_endpoint_then_only_call_retriever_once():
retriever = Mock(spec=SimpleRetriever)
retriever.read_records.return_value = iter(
[
Record(stream_name="players", data={"id": "ace", "value": 1}),
]
)

properties_from_endpoint = PropertiesFromEndpoint(
retriever=retriever,
property_field_path=["value"],
config={},
parameters={},
)

properties_from_endpoint.get_properties_from_endpoint()
properties_from_endpoint.get_properties_from_endpoint()
properties_from_endpoint.get_properties_from_endpoint()

assert retriever.read_records.call_count == 1


def test_given_value_is_int_when_get_properties_from_endpoint_then_return_str():
expected_properties = ["1"]

retriever = Mock(spec=SimpleRetriever)
retriever.read_records.return_value = iter(
[
Record(stream_name="players", data={"id": "ace", "value": 1}),
]
)

properties_from_endpoint = PropertiesFromEndpoint(
retriever=retriever,
property_field_path=["value"],
config={},
parameters={},
)

properties = properties_from_endpoint.get_properties_from_endpoint()

assert properties == expected_properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
from typing import Set

import pytest

Expand Down Expand Up @@ -73,7 +74,7 @@ def test_get_request_property_chunks(
expected_property_chunks,
):
configured_properties = set(property_fields)
property_fields = iter(property_fields)
property_fields = property_fields
property_chunking = PropertyChunking(
property_limit_type=property_limit_type,
property_limit=property_limit,
Expand All @@ -100,7 +101,7 @@ def test_get_request_property_chunks_empty_configured_properties():

always_include_properties = ["white", "lotus"]
property_fields = ["maui", "taormina", "koh_samui", "saint_jean_cap_ferrat"]
configured_properties = set()
configured_properties: Set[str] = set()
property_chunking = PropertyChunking(
property_limit_type=PropertyLimitType.property_count,
property_limit=3,
Expand Down Expand Up @@ -155,3 +156,27 @@ def test_get_merge_key():

merge_key = property_chunking.get_merge_key(record=record)
assert merge_key == "0"


def test_given_single_property_chunk_when_get_request_property_chunks_then_always_include_properties_are_not_added_to_input_list():
"""
This test is used to validate that we don't manipulate the in-memory values from get_request_property_chunks
"""
property_chunking = PropertyChunking(
property_limit_type=PropertyLimitType.property_count,
property_limit=None,
record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}),
config=CONFIG,
parameters={},
)

property_fields = ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"]
list(
property_chunking.get_request_property_chunks(
property_fields=property_fields,
always_include_properties=["id"],
configured_properties=None,
)
)

assert property_fields == ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"]
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ def test_get_request_property_chunks_static_list_with_chunking_property_selectio
["santa", "clover", "junpei", "june", "remove_me"]
)

stream_slice = StreamSlice(cursor_slice={}, partition={})

query_properties = QueryProperties(
property_list=[
"ace",
Expand Down Expand Up @@ -85,7 +83,7 @@ def test_get_request_property_chunks_static_list_with_chunking_property_selectio
parameters={},
)

property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
property_chunks = list(query_properties.get_request_property_chunks())

assert len(property_chunks) == 2
assert property_chunks[0] == ["santa", "clover", "junpei"]
Expand All @@ -107,8 +105,6 @@ def test_get_request_property_chunks_static_list_with_always_include_properties(
]
)

stream_slice = StreamSlice(cursor_slice={}, partition={})

query_properties = QueryProperties(
property_list=[
"ace",
Expand Down Expand Up @@ -141,7 +137,7 @@ def test_get_request_property_chunks_static_list_with_always_include_properties(
parameters={},
)

property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
property_chunks = list(query_properties.get_request_property_chunks())

assert len(property_chunks) == 3
assert property_chunks[0] == ["zero", "ace", "snake", "santa"]
Expand All @@ -154,8 +150,6 @@ def test_get_request_no_property_chunking_selected_properties_always_include_pro
["santa", "clover", "junpei", "june", "remove_me"]
)

stream_slice = StreamSlice(cursor_slice={}, partition={})

query_properties = QueryProperties(
property_list=[
"ace",
Expand All @@ -182,7 +176,7 @@ def test_get_request_no_property_chunking_selected_properties_always_include_pro
parameters={},
)

property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
property_chunks = list(query_properties.get_request_property_chunks())

assert len(property_chunks) == 1
assert property_chunks[0] == ["zero", "santa", "clover", "junpei", "june"]
Expand All @@ -203,8 +197,6 @@ def test_get_request_no_property_chunking_always_include_properties():
]
)

stream_slice = StreamSlice(cursor_slice={}, partition={})

query_properties = QueryProperties(
property_list=[
"ace",
Expand All @@ -231,7 +223,7 @@ def test_get_request_no_property_chunking_always_include_properties():
parameters={},
)

property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
property_chunks = list(query_properties.get_request_property_chunks())

assert len(property_chunks) == 1
assert property_chunks[0] == [
Expand All @@ -252,9 +244,6 @@ def test_get_request_property_chunks_dynamic_endpoint():
configured_airbyte_stream = _create_configured_airbyte_stream(
["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"]
)

stream_slice = StreamSlice(cursor_slice={}, partition={})

properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint)
properties_from_endpoint_mock.get_properties_from_endpoint.return_value = iter(
["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"]
Expand Down Expand Up @@ -282,15 +271,14 @@ def test_get_request_property_chunks_dynamic_endpoint():
parameters={},
)

property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
property_chunks = list(query_properties.get_request_property_chunks())

assert len(property_chunks) == 2
assert property_chunks[0] == ["alice", "clover", "dio", "k", "luna"]
assert property_chunks[1] == ["phi", "quark", "sigma", "tenmyouji"]


def test_get_request_property_chunks_with_configured_catalog_static_list():
stream_slice = StreamSlice(cursor_slice={}, partition={})
# Simulate configured_airbyte_stream whose json_schema only enables 'luna', 'phi', 'sigma'
configured_airbyte_stream = _create_configured_airbyte_stream(
["santa", "clover", "junpei", "june"]
Expand Down Expand Up @@ -328,7 +316,7 @@ def test_get_request_property_chunks_with_configured_catalog_static_list():
parameters={},
)

property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
property_chunks = list(query_properties.get_request_property_chunks())

assert len(property_chunks) == 2
assert property_chunks[0] == ["santa", "clover", "junpei"]
Expand All @@ -340,8 +328,6 @@ def test_get_request_property_chunks_with_configured_catalog_dynamic_endpoint():
["luna", "phi", "sigma", "remove_me"]
)

stream_slice = StreamSlice(cursor_slice={}, partition={})

properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint)
properties_from_endpoint_mock.get_properties_from_endpoint.return_value = iter(
["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"]
Expand Down Expand Up @@ -369,15 +355,13 @@ def test_get_request_property_chunks_with_configured_catalog_dynamic_endpoint():
parameters={},
)

property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
property_chunks = list(query_properties.get_request_property_chunks())

assert len(property_chunks) == 1
assert property_chunks[0] == ["luna", "phi", "sigma"]


def test_get_request_property_no_property_selection():
stream_slice = StreamSlice(cursor_slice={}, partition={})

query_properties = QueryProperties(
property_list=[
"ace",
Expand All @@ -403,7 +387,7 @@ def test_get_request_property_no_property_selection():
parameters={},
)

property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice))
property_chunks = list(query_properties.get_request_property_chunks())

assert len(property_chunks) == 3
assert property_chunks[0] == ["ace", "snake", "santa"]
Expand Down
Loading