diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 25840f06f..937273679 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2054,6 +2054,7 @@ def create_default_paginator( config: Config, *, url_base: str, + extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None, decoder: Optional[Decoder] = None, cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None, ) -> Union[DefaultPaginator, PaginatorTestReadDecorator]: @@ -2075,7 +2076,10 @@ def create_default_paginator( else None ) pagination_strategy = self._create_component_from_model( - model=model.pagination_strategy, config=config, decoder=decoder_to_use + model=model.pagination_strategy, + config=config, + decoder=decoder_to_use, + extractor_model=extractor_model, ) if cursor_used_for_stop_condition: pagination_strategy = StopConditionPaginationStrategyDecorator( @@ -2572,7 +2576,12 @@ def create_oauth_authenticator( ) def create_offset_increment( - self, model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any + self, + model: OffsetIncrementModel, + config: Config, + decoder: Decoder, + extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None, + **kwargs: Any, ) -> OffsetIncrement: if isinstance(decoder, PaginationDecoderDecorator): inner_decoder = decoder.decoder @@ -2587,10 +2596,24 @@ def create_offset_increment( self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder)) ) + # Ideally we would instantiate the runtime extractor from highest most level (in this case the SimpleRetriever) + # so that it can be shared by OffSetIncrement and RecordSelector. However, due to how we instantiate the + # decoder with various decorators here, but not in create_record_selector, it is simpler to retain existing + # behavior by having two separate extractors with identical behavior since they use the same extractor model. + # When we have more time to investigate we can look into reusing the same component. + extractor = ( + self._create_component_from_model( + model=extractor_model, config=config, decoder=decoder_to_use + ) + if extractor_model + else None + ) + return OffsetIncrement( page_size=model.page_size, config=config, decoder=decoder_to_use, + extractor=extractor, inject_on_first_request=model.inject_on_first_request or False, parameters=model.parameters or {}, ) @@ -2954,6 +2977,7 @@ def create_simple_retriever( model=model.paginator, config=config, url_base=url_base, + extractor_model=model.record_selector.extractor, decoder=decoder, cursor_used_for_stop_condition=cursor_used_for_stop_condition, ) diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py index 512d8143c..4370155de 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py @@ -12,6 +12,7 @@ JsonDecoder, PaginationDecoderDecorator, ) +from airbyte_cdk.sources.declarative.extractors.dpath_extractor import RecordExtractor from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import ( PaginationStrategy, @@ -46,6 +47,7 @@ class OffsetIncrement(PaginationStrategy): config: Config page_size: Optional[Union[str, int]] parameters: InitVar[Mapping[str, Any]] + extractor: Optional[RecordExtractor] decoder: Decoder = field( default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={})) ) @@ -75,6 +77,14 @@ def next_page_token( ) -> Optional[Any]: decoded_response = next(self.decoder.decode(response)) + if self.extractor: + page_size_from_response = len(list(self.extractor.extract_records(response=response))) + # The extractor could return 0 records which is valid, but evaluates to False. Our fallback in other + # cases as the best effort option is to use the incoming last_page_size + last_page_size = ( + page_size_from_response if page_size_from_response is not None else last_page_size + ) + # Stop paginating when there are fewer records than the page size or the current page has no records if ( self._page_size diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 856106bfe..2432bab46 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -66,6 +66,7 @@ from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel from airbyte_cdk.sources.declarative.models import DefaultPaginator as DefaultPaginatorModel +from airbyte_cdk.sources.declarative.models import DpathExtractor as DpathExtractorModel from airbyte_cdk.sources.declarative.models import ( GroupingPartitionRouter as GroupingPartitionRouterModel, ) @@ -1831,6 +1832,7 @@ def test_create_default_paginator(): component_definition=paginator_manifest, config=input_config, url_base="https://airbyte.io", + extractor_model=DpathExtractor(field_path=["results"], config=input_config, parameters={}), decoder=JsonDecoder(parameters={}), ) @@ -1968,6 +1970,7 @@ def test_create_default_paginator(): DefaultPaginator( pagination_strategy=OffsetIncrement( page_size=10, + extractor=None, config={"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]}, parameters={}, ), @@ -2641,18 +2644,31 @@ def test_create_offset_increment(): page_size=10, inject_on_first_request=True, ) + + expected_extractor = DpathExtractor(field_path=["results"], config=input_config, parameters={}) + extractor_model = DpathExtractorModel( + type="DpathExtractor", field_path=expected_extractor.field_path + ) + expected_strategy = OffsetIncrement( - page_size=10, inject_on_first_request=True, parameters={}, config=input_config + page_size=10, + inject_on_first_request=True, + extractor=expected_extractor, + parameters={}, + config=input_config, ) strategy = factory.create_offset_increment( - model, input_config, decoder=JsonDecoder(parameters={}) + model, input_config, extractor_model=extractor_model, decoder=JsonDecoder(parameters={}) ) assert strategy.page_size == expected_strategy.page_size assert strategy.inject_on_first_request == expected_strategy.inject_on_first_request assert strategy.config == input_config + assert isinstance(strategy.extractor, DpathExtractor) + assert strategy.extractor.field_path == expected_extractor.field_path + class MyCustomSchemaLoader(SchemaLoader): def get_json_schema(self) -> Mapping[str, Any]: diff --git a/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py b/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py index 6e7b60a92..925ce0c59 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py @@ -9,6 +9,7 @@ import requests from airbyte_cdk.sources.declarative.decoders import JsonDecoder, XmlDecoder +from airbyte_cdk.sources.declarative.extractors import DpathExtractor from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import ( DefaultPaginator, @@ -327,7 +328,13 @@ def test_initial_token_with_offset_pagination(): ) url_base = "https://airbyte.io" config = {} - strategy = OffsetIncrement(config={}, page_size=2, parameters={}, inject_on_first_request=True) + strategy = OffsetIncrement( + config={}, + page_size=2, + extractor=DpathExtractor(field_path=[], parameters={}, config={}), + parameters={}, + inject_on_first_request=True, + ) paginator = DefaultPaginator( strategy, config, @@ -348,7 +355,13 @@ def test_initial_token_with_offset_pagination(): "pagination_strategy,last_page_size,expected_next_page_token,expected_second_next_page_token", [ pytest.param( - OffsetIncrement(config={}, page_size=10, parameters={}, inject_on_first_request=True), + OffsetIncrement( + config={}, + page_size=10, + extractor=DpathExtractor(field_path=["results"], parameters={}, config={}), + parameters={}, + inject_on_first_request=True, + ), 10, {"next_page_token": 10}, {"next_page_token": 20}, @@ -373,10 +386,23 @@ def test_no_inject_on_first_request_offset_pagination( """ Validate that the stateless next_page_token() works when the first page does not inject the value """ - + response_body = { + "results": [ + {"id": 1}, + {"id": 2}, + {"id": 3}, + {"id": 4}, + {"id": 5}, + {"id": 6}, + {"id": 7}, + {"id": 8}, + {"id": 9}, + {"id": 10}, + ] + } response = requests.Response() response.headers = {"A_HEADER": "HEADER_VALUE"} - response._content = {} + response._content = json.dumps(response_body).encode("utf-8") last_record = Record(data={}, stream_name="test") @@ -430,7 +456,12 @@ def test_limit_page_fetched(): def test_paginator_with_page_option_no_page_size(): - pagination_strategy = OffsetIncrement(config={}, page_size=None, parameters={}) + pagination_strategy = OffsetIncrement( + config={}, + page_size=None, + extractor=DpathExtractor(field_path=[], parameters={}, config={}), + parameters={}, + ) with pytest.raises(ValueError): ( diff --git a/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py b/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py index 4cd827e88..28f6717f5 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py @@ -8,19 +8,25 @@ import pytest import requests +from airbyte_cdk.sources.declarative.extractors import DpathExtractor from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import ( OffsetIncrement, ) @pytest.mark.parametrize( - "page_size, parameters, last_page_size, last_record, last_page_token_value, expected_next_page_token, expected_offset", + "page_size, parameters, response_results, last_page_size, last_record, last_page_token_value, expected_next_page_token, expected_offset", [ - pytest.param("2", {}, 2, {"id": 1}, 4, 6, 2, id="test_same_page_size"), - pytest.param(2, {}, 2, {"id": 1}, 4, 6, 2, id="test_same_page_size"), + pytest.param( + "2", {}, [{"id": 1}, {"id": 2}], 2, {"id": 2}, 4, 6, 2, id="test_same_page_size" + ), + pytest.param( + 2, {}, [{"id": 1}, {"id": 2}], 2, {"id": 2}, 4, 6, 2, id="test_same_page_size" + ), pytest.param( "{{ parameters['page_size'] }}", {"page_size": 3}, + [{"id": 1}, {"id": 2}], 2, {"id": 1}, 3, @@ -28,37 +34,54 @@ 0, id="test_larger_page_size", ), - pytest.param(None, {}, 0, [], 3, None, 0, id="test_stop_if_no_records"), + pytest.param(None, {}, [], 0, [], 3, None, 0, id="test_stop_if_no_records"), pytest.param( "{{ response['page_metadata']['limit'] }}", {}, + [{"id": 1}, {"id": 2}], 2, - {"id": 1}, + {"id": 2}, 3, None, 0, id="test_page_size_from_response", ), pytest.param( - 2, {}, 2, {"id": 1}, None, 2, 2, id="test_get_second_page_with_first_page_not_injected" + 2, + {}, + [{"id": 1}, {"id": 2}], + 2, + {"id": 2}, + None, + 2, + 2, + id="test_get_second_page_with_first_page_not_injected", ), ], ) def test_offset_increment_paginator_strategy( page_size, parameters, + response_results, last_page_size, last_record, last_page_token_value, expected_next_page_token, expected_offset, ): - paginator_strategy = OffsetIncrement(page_size=page_size, parameters=parameters, config={}) + extractor = DpathExtractor(field_path=["results"], parameters={}, config={}) + paginator_strategy = OffsetIncrement( + page_size=page_size, extractor=extractor, parameters=parameters, config={} + ) response = requests.Response() response.headers = {"A_HEADER": "HEADER_VALUE"} - response_body = {"next": "https://airbyte.io/next_url", "page_metadata": {"limit": 5}} + response_body = { + "results": response_results, + "next": "https://airbyte.io/next_url", + "page_metadata": {"limit": 5}, + } response._content = json.dumps(response_body).encode("utf-8") next_page_token = paginator_strategy.next_page_token( @@ -73,9 +96,28 @@ def test_offset_increment_paginator_strategy( assert expected_next_page_token == next_page_token +def test_offset_increment_response_without_record_path(): + extractor = DpathExtractor(field_path=["results"], parameters={}, config={}) + paginator_strategy = OffsetIncrement(page_size=2, extractor=extractor, parameters={}, config={}) + + response = requests.Response() + + response.headers = {"A_HEADER": "HEADER_VALUE"} + response_body = {"next": "https://airbyte.io/next_url", "page_metadata": {"limit": 5}} + response._content = json.dumps(response_body).encode("utf-8") + + next_page_token = paginator_strategy.next_page_token(response, 2, None, 4) + assert next_page_token is None + + # Validate that the PaginationStrategy is stateless and calling next_page_token() again returns the same result + next_page_token = paginator_strategy.next_page_token(response, 2, None, 4) + assert next_page_token is None + + def test_offset_increment_paginator_strategy_rises(): paginator_strategy = OffsetIncrement( page_size="{{ parameters['page_size'] }}", + extractor=DpathExtractor(field_path=["results"], parameters={}, config={}), parameters={"page_size": "invalid value"}, config={}, ) @@ -94,8 +136,13 @@ def test_offset_increment_paginator_strategy_rises(): def test_offset_increment_paginator_strategy_initial_token( inject_on_first_request: bool, expected_initial_token: Optional[Any] ): + extractor = DpathExtractor(field_path=[""], parameters={}, config={}) paginator_strategy = OffsetIncrement( - page_size=20, parameters={}, config={}, inject_on_first_request=inject_on_first_request + page_size=20, + extractor=extractor, + parameters={}, + config={}, + inject_on_first_request=inject_on_first_request, ) assert paginator_strategy.initial_token == expected_initial_token