diff --git a/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py b/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py index a858c04fb..4987ea38c 100644 --- a/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py +++ b/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py @@ -1,6 +1,8 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + from typing import Optional -from airbyte_cdk.sources.declarative.models import FailureType +from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.types import Record, StreamSlice from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor from airbyte_cdk.utils.traced_exception import AirbyteTracedException @@ -44,10 +46,21 @@ def has_reached_limit(self) -> bool: def _reset(self) -> None: self._record_count = 0 - def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice: - new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice + def reduce_slice_range_if_possible( + self, previous_stream_slice: StreamSlice, original_stream_slice: StreamSlice + ) -> StreamSlice: + """ + :param previous_stream_slice: Stream slice that was just processed (It can be the same as original_stream_slice or already reduced) + :param original_stream_slice: The original stream slice before any reduction + :return: Reduced stream slice + """ + new_slice = ( + self._cursor.reduce_slice_range(original_stream_slice) + if self._cursor + else previous_stream_slice + ) - if new_slice == stream_slice: + if new_slice == previous_stream_slice: self._number_of_attempt_with_same_slice += 1 if ( self._number_of_attempt_with_same_slice diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index a30574107..3204589ea 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -374,6 +374,7 @@ def _read_pages( stream_state: Mapping[str, Any], stream_slice: StreamSlice, ) -> Iterable[Record]: + original_stream_slice = stream_slice pagination_tracker = self.pagination_tracker_factory() reset_pagination = False next_page_token = self._get_initial_next_page_token() @@ -440,7 +441,9 @@ def _read_pages( if reset_pagination or pagination_tracker.has_reached_limit(): next_page_token = self._get_initial_next_page_token() previous_slice = stream_slice - stream_slice = pagination_tracker.reduce_slice_range_if_possible(stream_slice) + stream_slice = pagination_tracker.reduce_slice_range_if_possible( + stream_slice, original_stream_slice + ) LOGGER.info( f"Hitting PaginationReset event. StreamSlice used will go from {previous_slice} to {stream_slice}" ) diff --git a/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py b/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py index 3573fb4cd..6b8207cc1 100644 --- a/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py +++ b/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py @@ -48,7 +48,7 @@ def test_given_reduce_slice_before_limit_reached_when_has_reached_limit_return_t tracker = PaginationTracker(max_number_of_records=2) tracker.observe(_A_RECORD) - tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE) + tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE, _A_STREAM_SLICE) tracker.observe(_A_RECORD) assert not tracker.has_reached_limit() @@ -57,7 +57,7 @@ def test_given_no_cursor_when_reduce_slice_range_then_return_same_slice(self): tracker = PaginationTracker() original_slice = StreamSlice(partition={}, cursor_slice={}) - result_slice = tracker.reduce_slice_range_if_possible(original_slice) + result_slice = tracker.reduce_slice_range_if_possible(original_slice, original_slice) assert result_slice == original_slice @@ -65,23 +65,32 @@ def test_given_no_cursor_when_reduce_slice_range_multiple_times_then_raise(self) tracker = PaginationTracker() original_slice = StreamSlice(partition={}, cursor_slice={}) - tracker.reduce_slice_range_if_possible(original_slice) + tracker.reduce_slice_range_if_possible(original_slice, original_slice) with pytest.raises(AirbyteTracedException): - tracker.reduce_slice_range_if_possible(original_slice) + tracker.reduce_slice_range_if_possible(original_slice, original_slice) def test_given_cursor_when_reduce_slice_range_then_return_cursor_stream_slice(self): tracker = PaginationTracker(cursor=self._cursor) self._cursor.reduce_slice_range.return_value = _A_STREAM_SLICE new_slice = tracker.reduce_slice_range_if_possible( - StreamSlice(partition={}, cursor_slice={}) + StreamSlice(partition={}, cursor_slice={}), StreamSlice(partition={}, cursor_slice={}) ) assert new_slice == _A_STREAM_SLICE def test_given_cursor_cant_reduce_slice_when_reduce_slice_range_then_raise(self): tracker = PaginationTracker(cursor=self._cursor) + original_slice = StreamSlice(partition={}, cursor_slice={}) self._cursor.reduce_slice_range.return_value = _A_STREAM_SLICE with pytest.raises(AirbyteTracedException): - tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE) + tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE, original_slice) + + def test_cursor_called_with_original_slice_when_reduce_slice_range(self): + tracker = PaginationTracker(cursor=self._cursor) + original_slice = StreamSlice(partition={}, cursor_slice={}) + + tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE, original_slice) + + self._cursor.reduce_slice_range.assert_called_once_with(original_slice)