Skip to content

Commit 2ae3f57

Browse files
author
maxime.c
committed
code review
1 parent 71065f0 commit 2ae3f57

File tree

5 files changed

+135
-16
lines changed

5 files changed

+135
-16
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,9 @@
362362
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
363363
PaginationReset as PaginationResetModel,
364364
)
365+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
366+
Action1 as PaginationResetActionModel,
367+
)
365368
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
366369
ParametrizedComponentsResolver as ParametrizedComponentsResolverModel,
367370
)
@@ -3362,16 +3365,22 @@ def _create_pagination_tracker_factory(
33623365

33633366
# Until we figure out a way to use any cursor for PaginationTracker, we will have to have this cursor selector logic
33643367
cursor_factory: Callable[[], Optional[ConcurrentCursor]] = lambda: None
3365-
if isinstance(cursor, ConcurrentCursor):
3366-
cursor_factory = lambda: cursor.copy_without_state() # type: ignore # the if condition validates that it is a ConcurrentCursor
3367-
elif isinstance(cursor, ConcurrentPerPartitionCursor):
3368-
cursor_factory = lambda: cursor._cursor_factory.create( # type: ignore # if this becomes a problem, we would need to extract the cursor_factory instantiation logic and make it accessible here
3369-
{}, datetime.timedelta(0)
3370-
)
3371-
elif not isinstance(cursor, FinalStateCursor):
3372-
LOGGER.warning(
3373-
"Unknown cursor for PaginationTracker. Pagination resets might not work properly"
3374-
)
3368+
if model.action == PaginationResetActionModel.RESET:
3369+
# in that case, we will let cursor_factory to return None even if the stream has a cursor
3370+
pass
3371+
elif model.action == PaginationResetActionModel.SPLIT_USING_CURSOR:
3372+
if isinstance(cursor, ConcurrentCursor):
3373+
cursor_factory = lambda: cursor.copy_without_state() # type: ignore # the if condition validates that it is a ConcurrentCursor
3374+
elif isinstance(cursor, ConcurrentPerPartitionCursor):
3375+
cursor_factory = lambda: cursor._cursor_factory.create( # type: ignore # if this becomes a problem, we would need to extract the cursor_factory instantiation logic and make it accessible here
3376+
{}, datetime.timedelta(0)
3377+
)
3378+
elif not isinstance(cursor, FinalStateCursor):
3379+
LOGGER.warning(
3380+
"Unknown cursor for PaginationTracker. Pagination resets might not work properly"
3381+
)
3382+
else:
3383+
raise ValueError(f"Unknown PaginationReset action: {model.action}")
33753384

33763385
limit = model.limits.number_of_records if model and model.limits else None
33773386
return lambda: PaginationTracker(cursor_factory(), limit)

airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def __init__(
2222
"""
2323
self._cursor = cursor
2424
self._limit = max_number_of_records
25-
self.reset()
25+
self._reset()
2626

2727
"""
2828
Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will
@@ -41,9 +41,8 @@ def observe(self, record: Record) -> None:
4141
def has_reached_limit(self) -> bool:
4242
return self._limit is not None and self._record_count >= self._limit
4343

44-
def reset(self) -> None:
44+
def _reset(self) -> None:
4545
self._record_count = 0
46-
self._number_of_attempt_with_same_slice = 0
4746

4847
def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice:
4948
new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice
@@ -61,4 +60,5 @@ def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSli
6160
else:
6261
self._number_of_attempt_with_same_slice = 0
6362

63+
self._reset()
6464
return new_slice

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,6 @@ def _read_pages(
437437
break
438438

439439
if reset_pagination or pagination_tracker.has_reached_limit():
440-
pagination_tracker.reset()
441440
next_page_token = self._get_initial_next_page_token()
442441
previous_slice = stream_slice
443442
stream_slice = pagination_tracker.reduce_slice_range_if_possible(stream_slice)

unit_tests/sources/declarative/retrievers/test_pagination_tracker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ def test_given_enough_records_when_has_reached_limit_return_true(self):
4444

4545
assert tracker.has_reached_limit()
4646

47-
def test_given_reset_before_limit_reached_when_has_reached_limit_return_true(self):
47+
def test_given_reduce_slice_before_limit_reached_when_has_reached_limit_return_true(self):
4848
tracker = PaginationTracker(max_number_of_records=2)
4949

5050
tracker.observe(_A_RECORD)
51-
tracker.reset()
51+
tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE)
5252
tracker.observe(_A_RECORD)
5353

5454
assert not tracker.has_reached_limit()

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4864,6 +4864,117 @@ def test_given_per_partition_cursor_when_read_then_reset_pagination():
48644864
assert len(list(filter(lambda message: message.type == Type.RECORD, messages))) == 4
48654865

48664866

4867+
def test_given_pagination_reset_action_is_reset_even_though_stream_is_incremental_when_read_then_reset_pagination():
4868+
input_config = {}
4869+
manifest = {
4870+
"version": "0.34.2",
4871+
"type": "DeclarativeSource",
4872+
"check": {"type": "CheckStream", "stream_names": ["Test"]},
4873+
"streams": [
4874+
{
4875+
"type": "DeclarativeStream",
4876+
"name": "Test",
4877+
"schema_loader": {
4878+
"type": "InlineSchemaLoader",
4879+
"schema": {"type": "object"},
4880+
},
4881+
"retriever": {
4882+
"type": "SimpleRetriever",
4883+
"requester": {
4884+
"type": "HttpRequester",
4885+
"url_base": "https://example.org",
4886+
"path": "/test?from={{ stream_interval.start_time }}",
4887+
"authenticator": {"type": "NoAuth"},
4888+
"error_handler": {
4889+
"type": "DefaultErrorHandler",
4890+
"response_filters": [
4891+
{
4892+
"http_codes": [400],
4893+
"action": "RESET_PAGINATION",
4894+
"failure_type": "system_error",
4895+
},
4896+
],
4897+
},
4898+
},
4899+
"record_selector": {
4900+
"type": "RecordSelector",
4901+
"extractor": {"type": "DpathExtractor", "field_path": ["results"]},
4902+
},
4903+
"paginator": {
4904+
"type": "DefaultPaginator",
4905+
"page_token_option": {"type": "RequestPath"},
4906+
"pagination_strategy": {
4907+
"type": "CursorPagination",
4908+
"cursor_value": "{{ response.next }}",
4909+
},
4910+
},
4911+
"pagination_reset": {
4912+
"type": "PaginationReset",
4913+
"action": "RESET",
4914+
},
4915+
},
4916+
"incremental_sync": {
4917+
"type": "DatetimeBasedCursor",
4918+
"start_datetime": {"datetime": "2022-01-01"},
4919+
"end_datetime": "2022-12-31",
4920+
"datetime_format": "%Y-%m-%d",
4921+
"cursor_datetime_formats": ["%Y-%m-%d"],
4922+
"cursor_granularity": "P1D",
4923+
"step": "P1Y",
4924+
"cursor_field": "updated_at",
4925+
},
4926+
}
4927+
],
4928+
"spec": {
4929+
"type": "Spec",
4930+
"documentation_url": "https://example.org",
4931+
"connection_specification": {},
4932+
},
4933+
}
4934+
4935+
catalog = create_catalog("Test")
4936+
source = ConcurrentDeclarativeSource(
4937+
source_config=manifest,
4938+
config=input_config,
4939+
catalog=catalog,
4940+
state=None,
4941+
)
4942+
4943+
with HttpMocker() as http_mocker:
4944+
# Slice from 2022-01-01 to 2022-12-31
4945+
http_mocker.get(
4946+
HttpRequest("https://example.org/test?from=2022-01-01"),
4947+
HttpResponse(
4948+
json.dumps(
4949+
{
4950+
"results": [{"id": 1, "updated_at": "2022-02-01"}, {"id": 2, "updated_at": "2022-03-01"}],
4951+
"next": "https://example.org/test?from=2022-01-01&cursor=toto"
4952+
}
4953+
),
4954+
200,
4955+
),
4956+
)
4957+
http_mocker.get(
4958+
HttpRequest("https://example.org/test?from=2022-01-01&cursor=toto"),
4959+
[
4960+
HttpResponse(json.dumps({}), 400),
4961+
HttpResponse(
4962+
json.dumps(
4963+
{
4964+
"results": [{"id": 3, "updated_at": "2022-04-01"}]
4965+
}
4966+
),
4967+
200
4968+
),
4969+
]
4970+
)
4971+
messages = list(
4972+
source.read(logger=source.logger, config=input_config, catalog=catalog, state=[])
4973+
)
4974+
4975+
assert len(list(filter(lambda message: message.type == Type.RECORD, messages))) == 5
4976+
4977+
48674978
def test_given_record_selector_is_filtering_when_read_then_raise_error():
48684979
"""
48694980
This test is here to show the limitations of pagination reset. If it starts failing, maybe we just want to delete

0 commit comments

Comments
 (0)