Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2054,6 +2054,7 @@ def create_default_paginator(
config: Config,
*,
url_base: str,
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
decoder: Optional[Decoder] = None,
cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None,
) -> Union[DefaultPaginator, PaginatorTestReadDecorator]:
Expand All @@ -2075,7 +2076,10 @@ def create_default_paginator(
else None
)
pagination_strategy = self._create_component_from_model(
model=model.pagination_strategy, config=config, decoder=decoder_to_use
model=model.pagination_strategy,
config=config,
decoder=decoder_to_use,
extractor_model=extractor_model,
)
if cursor_used_for_stop_condition:
pagination_strategy = StopConditionPaginationStrategyDecorator(
Expand Down Expand Up @@ -2572,7 +2576,12 @@ def create_oauth_authenticator(
)

def create_offset_increment(
self, model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any
self,
model: OffsetIncrementModel,
config: Config,
decoder: Decoder,
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
**kwargs: Any,
) -> OffsetIncrement:
if isinstance(decoder, PaginationDecoderDecorator):
inner_decoder = decoder.decoder
Expand All @@ -2587,10 +2596,24 @@ def create_offset_increment(
self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder))
)

# Ideally we would instantiate the runtime extractor from highest most level (in this case the SimpleRetriever)
# so that it can be shared by OffSetIncrement and RecordSelector. However, due to how we instantiate the
# decoder with various decorators here, but not in create_record_selector, it is simpler to retain existing
# behavior by having two separate extractors with identical behavior since they use the same extractor model.
# When we have more time to investigate we can look into reusing the same component.
extractor = (
self._create_component_from_model(
model=extractor_model, config=config, decoder=decoder_to_use
)
if extractor_model
else None
)

return OffsetIncrement(
page_size=model.page_size,
config=config,
decoder=decoder_to_use,
extractor=extractor,
inject_on_first_request=model.inject_on_first_request or False,
parameters=model.parameters or {},
)
Expand Down Expand Up @@ -2954,6 +2977,7 @@ def create_simple_retriever(
model=model.paginator,
config=config,
url_base=url_base,
extractor_model=model.record_selector.extractor,
decoder=decoder,
cursor_used_for_stop_condition=cursor_used_for_stop_condition,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
JsonDecoder,
PaginationDecoderDecorator,
)
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
PaginationStrategy,
Expand Down Expand Up @@ -46,6 +47,7 @@ class OffsetIncrement(PaginationStrategy):
config: Config
page_size: Optional[Union[str, int]]
parameters: InitVar[Mapping[str, Any]]
extractor: Optional[RecordExtractor]
decoder: Decoder = field(
default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
)
Expand Down Expand Up @@ -75,6 +77,14 @@ def next_page_token(
) -> Optional[Any]:
decoded_response = next(self.decoder.decode(response))

if self.extractor:
page_size_from_response = len(list(self.extractor.extract_records(response=response)))
# The extractor could return 0 records which is valid, but evaluates to False. Our fallback in other
# cases as the best effort option is to use the incoming last_page_size
last_page_size = (
page_size_from_response if page_size_from_response is not None else last_page_size
)

# Stop paginating when there are fewer records than the page size or the current page has no records
if (
self._page_size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel
from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel
from airbyte_cdk.sources.declarative.models import DefaultPaginator as DefaultPaginatorModel
from airbyte_cdk.sources.declarative.models import DpathExtractor as DpathExtractorModel
from airbyte_cdk.sources.declarative.models import (
GroupingPartitionRouter as GroupingPartitionRouterModel,
)
Expand Down Expand Up @@ -1831,6 +1832,7 @@ def test_create_default_paginator():
component_definition=paginator_manifest,
config=input_config,
url_base="https://airbyte.io",
extractor_model=DpathExtractor(field_path=["results"], config=input_config, parameters={}),
decoder=JsonDecoder(parameters={}),
)

Expand Down Expand Up @@ -1968,6 +1970,7 @@ def test_create_default_paginator():
DefaultPaginator(
pagination_strategy=OffsetIncrement(
page_size=10,
extractor=None,
config={"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]},
parameters={},
),
Expand Down Expand Up @@ -2641,18 +2644,31 @@ def test_create_offset_increment():
page_size=10,
inject_on_first_request=True,
)

expected_extractor = DpathExtractor(field_path=["results"], config=input_config, parameters={})
extractor_model = DpathExtractorModel(
type="DpathExtractor", field_path=expected_extractor.field_path
)

expected_strategy = OffsetIncrement(
page_size=10, inject_on_first_request=True, parameters={}, config=input_config
page_size=10,
inject_on_first_request=True,
extractor=expected_extractor,
parameters={},
config=input_config,
)

strategy = factory.create_offset_increment(
model, input_config, decoder=JsonDecoder(parameters={})
model, input_config, extractor_model=extractor_model, decoder=JsonDecoder(parameters={})
)

assert strategy.page_size == expected_strategy.page_size
assert strategy.inject_on_first_request == expected_strategy.inject_on_first_request
assert strategy.config == input_config

assert isinstance(strategy.extractor, DpathExtractor)
assert strategy.extractor.field_path == expected_extractor.field_path


class MyCustomSchemaLoader(SchemaLoader):
def get_json_schema(self) -> Mapping[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import requests

from airbyte_cdk.sources.declarative.decoders import JsonDecoder, XmlDecoder
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import (
DefaultPaginator,
Expand Down Expand Up @@ -327,7 +328,13 @@ def test_initial_token_with_offset_pagination():
)
url_base = "https://airbyte.io"
config = {}
strategy = OffsetIncrement(config={}, page_size=2, parameters={}, inject_on_first_request=True)
strategy = OffsetIncrement(
config={},
page_size=2,
extractor=DpathExtractor(field_path=[], parameters={}, config={}),
parameters={},
inject_on_first_request=True,
)
paginator = DefaultPaginator(
strategy,
config,
Expand All @@ -348,7 +355,13 @@ def test_initial_token_with_offset_pagination():
"pagination_strategy,last_page_size,expected_next_page_token,expected_second_next_page_token",
[
pytest.param(
OffsetIncrement(config={}, page_size=10, parameters={}, inject_on_first_request=True),
OffsetIncrement(
config={},
page_size=10,
extractor=DpathExtractor(field_path=["results"], parameters={}, config={}),
parameters={},
inject_on_first_request=True,
),
10,
{"next_page_token": 10},
{"next_page_token": 20},
Expand All @@ -373,10 +386,23 @@ def test_no_inject_on_first_request_offset_pagination(
"""
Validate that the stateless next_page_token() works when the first page does not inject the value
"""

response_body = {
"results": [
{"id": 1},
{"id": 2},
{"id": 3},
{"id": 4},
{"id": 5},
{"id": 6},
{"id": 7},
{"id": 8},
{"id": 9},
{"id": 10},
]
}
response = requests.Response()
response.headers = {"A_HEADER": "HEADER_VALUE"}
response._content = {}
response._content = json.dumps(response_body).encode("utf-8")

last_record = Record(data={}, stream_name="test")

Expand Down Expand Up @@ -430,7 +456,12 @@ def test_limit_page_fetched():


def test_paginator_with_page_option_no_page_size():
pagination_strategy = OffsetIncrement(config={}, page_size=None, parameters={})
pagination_strategy = OffsetIncrement(
config={},
page_size=None,
extractor=DpathExtractor(field_path=[], parameters={}, config={}),
parameters={},
)

with pytest.raises(ValueError):
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,57 +8,80 @@
import pytest
import requests

from airbyte_cdk.sources.declarative.extractors import DpathExtractor
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import (
OffsetIncrement,
)


@pytest.mark.parametrize(
"page_size, parameters, last_page_size, last_record, last_page_token_value, expected_next_page_token, expected_offset",
"page_size, parameters, response_results, last_page_size, last_record, last_page_token_value, expected_next_page_token, expected_offset",
[
pytest.param("2", {}, 2, {"id": 1}, 4, 6, 2, id="test_same_page_size"),
pytest.param(2, {}, 2, {"id": 1}, 4, 6, 2, id="test_same_page_size"),
pytest.param(
"2", {}, [{"id": 1}, {"id": 2}], 2, {"id": 2}, 4, 6, 2, id="test_same_page_size"
),
pytest.param(
2, {}, [{"id": 1}, {"id": 2}], 2, {"id": 2}, 4, 6, 2, id="test_same_page_size"
),
pytest.param(
"{{ parameters['page_size'] }}",
{"page_size": 3},
[{"id": 1}, {"id": 2}],
2,
{"id": 1},
3,
None,
0,
id="test_larger_page_size",
),
pytest.param(None, {}, 0, [], 3, None, 0, id="test_stop_if_no_records"),
pytest.param(None, {}, [], 0, [], 3, None, 0, id="test_stop_if_no_records"),
pytest.param(
"{{ response['page_metadata']['limit'] }}",
{},
[{"id": 1}, {"id": 2}],
2,
{"id": 1},
{"id": 2},
3,
None,
0,
id="test_page_size_from_response",
),
pytest.param(
2, {}, 2, {"id": 1}, None, 2, 2, id="test_get_second_page_with_first_page_not_injected"
2,
{},
[{"id": 1}, {"id": 2}],
2,
{"id": 2},
None,
2,
2,
id="test_get_second_page_with_first_page_not_injected",
),
],
)
def test_offset_increment_paginator_strategy(
page_size,
parameters,
response_results,
last_page_size,
last_record,
last_page_token_value,
expected_next_page_token,
expected_offset,
):
paginator_strategy = OffsetIncrement(page_size=page_size, parameters=parameters, config={})
extractor = DpathExtractor(field_path=["results"], parameters={}, config={})
paginator_strategy = OffsetIncrement(
page_size=page_size, extractor=extractor, parameters=parameters, config={}
)

response = requests.Response()

response.headers = {"A_HEADER": "HEADER_VALUE"}
response_body = {"next": "https://airbyte.io/next_url", "page_metadata": {"limit": 5}}
response_body = {
"results": response_results,
"next": "https://airbyte.io/next_url",
"page_metadata": {"limit": 5},
}
response._content = json.dumps(response_body).encode("utf-8")

next_page_token = paginator_strategy.next_page_token(
Expand All @@ -73,9 +96,28 @@ def test_offset_increment_paginator_strategy(
assert expected_next_page_token == next_page_token


def test_offset_increment_response_without_record_path():
extractor = DpathExtractor(field_path=["results"], parameters={}, config={})
paginator_strategy = OffsetIncrement(page_size=2, extractor=extractor, parameters={}, config={})

response = requests.Response()

response.headers = {"A_HEADER": "HEADER_VALUE"}
response_body = {"next": "https://airbyte.io/next_url", "page_metadata": {"limit": 5}}
response._content = json.dumps(response_body).encode("utf-8")

next_page_token = paginator_strategy.next_page_token(response, 2, None, 4)
assert next_page_token is None

# Validate that the PaginationStrategy is stateless and calling next_page_token() again returns the same result
next_page_token = paginator_strategy.next_page_token(response, 2, None, 4)
assert next_page_token is None


def test_offset_increment_paginator_strategy_rises():
paginator_strategy = OffsetIncrement(
page_size="{{ parameters['page_size'] }}",
extractor=DpathExtractor(field_path=["results"], parameters={}, config={}),
parameters={"page_size": "invalid value"},
config={},
)
Expand All @@ -94,8 +136,13 @@ def test_offset_increment_paginator_strategy_rises():
def test_offset_increment_paginator_strategy_initial_token(
inject_on_first_request: bool, expected_initial_token: Optional[Any]
):
extractor = DpathExtractor(field_path=[""], parameters={}, config={})
paginator_strategy = OffsetIncrement(
page_size=20, parameters={}, config={}, inject_on_first_request=inject_on_first_request
page_size=20,
extractor=extractor,
parameters={},
config={},
inject_on_first_request=inject_on_first_request,
)

assert paginator_strategy.initial_token == expected_initial_token
Loading