Skip to content

Commit a2bc95d

Browse files
author
maxime.c
committed
pagination reset
1 parent 7ab013d commit a2bc95d

File tree

13 files changed

+815
-79
lines changed

13 files changed

+815
-79
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2279,12 +2279,14 @@ definitions:
22792279
- FAIL
22802280
- RETRY
22812281
- IGNORE
2282+
- RESET_PAGINATION
22822283
- RATE_LIMITED
22832284
examples:
22842285
- SUCCESS
22852286
- FAIL
22862287
- RETRY
22872288
- IGNORE
2289+
- RESET_PAGINATION
22882290
- RATE_LIMITED
22892291
failure_type:
22902292
title: Failure Type
@@ -3707,6 +3709,9 @@ definitions:
37073709
anyOf:
37083710
- "$ref": "#/definitions/DefaultPaginator"
37093711
- "$ref": "#/definitions/NoPagination"
3712+
pagination_reset:
3713+
description: Describes what triggers pagination reset and how to handle it.
3714+
"$ref": "#/definitions/PaginationReset"
37103715
ignore_stream_slicer_parameters_on_paginated_requests:
37113716
description: If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.
37123717
type: boolean
@@ -3730,6 +3735,36 @@ definitions:
37303735
$parameters:
37313736
type: object
37323737
additionalProperties: true
3738+
PaginationReset:
3739+
title: Pagination Reset
3740+
description: Describes what triggers pagination reset and how to handle it. If SPLIT_USING_CURSOR, the connector developer is accountable for ensuring that the records are returned in ascending order.
3741+
type: object
3742+
required:
3743+
- type
3744+
- action
3745+
properties:
3746+
type:
3747+
type: string
3748+
enum: [ PaginationReset ]
3749+
action:
3750+
type: string
3751+
enum:
3752+
- SPLIT_USING_CURSOR
3753+
- RESET
3754+
limits:
3755+
"$ref": "#/definitions/PaginationResetLimits"
3756+
PaginationResetLimits:
3757+
title: Pagination Reset Limits
3758+
description: Describes the limits that trigger pagination reset
3759+
type: object
3760+
required:
3761+
- type
3762+
properties:
3763+
type:
3764+
type: string
3765+
enum: [ PaginationResetLimits ]
3766+
number_of_records:
3767+
type: integer
37333768
GzipDecoder:
37343769
title: gzip
37353770
description: Select 'gzip' for response data that is compressed with gzip. Requires specifying an inner data type/decoder to parse the decompressed data.

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ def __init__(
151151
self._connector_state_converter = connector_state_converter
152152
self._cursor_field = cursor_field
153153

154-
self._cursor_factory = cursor_factory
154+
self._cursor_factory = cursor_factory # self._cursor_factory is flagged as private but is used in model_to_component_factory to ease pagination reset instantiation
155155
self._partition_router = partition_router
156156

157157
# The dict is ordered to ensure that once the maximum number of partitions is reached,

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,7 @@ class Action(Enum):
539539
FAIL = "FAIL"
540540
RETRY = "RETRY"
541541
IGNORE = "IGNORE"
542+
RESET_PAGINATION = "RESET_PAGINATION"
542543
RATE_LIMITED = "RATE_LIMITED"
543544

544545

@@ -553,7 +554,14 @@ class HttpResponseFilter(BaseModel):
553554
action: Optional[Action] = Field(
554555
None,
555556
description="Action to execute if a response matches the filter.",
556-
examples=["SUCCESS", "FAIL", "RETRY", "IGNORE", "RATE_LIMITED"],
557+
examples=[
558+
"SUCCESS",
559+
"FAIL",
560+
"RETRY",
561+
"IGNORE",
562+
"RESET_PAGINATION",
563+
"RATE_LIMITED",
564+
],
557565
title="Action",
558566
)
559567
failure_type: Optional[FailureType] = Field(
@@ -1173,6 +1181,16 @@ class LegacySessionTokenAuthenticator(BaseModel):
11731181
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
11741182

11751183

1184+
class Action1(Enum):
1185+
SPLIT_USING_CURSOR = "SPLIT_USING_CURSOR"
1186+
RESET = "RESET"
1187+
1188+
1189+
class PaginationResetLimits(BaseModel):
1190+
type: Literal["PaginationResetLimits"]
1191+
number_of_records: Optional[int] = None
1192+
1193+
11761194
class CsvDecoder(BaseModel):
11771195
type: Literal["CsvDecoder"]
11781196
encoding: Optional[str] = "utf-8"
@@ -2054,6 +2072,12 @@ class RecordSelector(BaseModel):
20542072
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20552073

20562074

2075+
class PaginationReset(BaseModel):
2076+
type: Literal["PaginationReset"]
2077+
action: Action1
2078+
limits: Optional[PaginationResetLimits] = None
2079+
2080+
20572081
class GzipDecoder(BaseModel):
20582082
type: Literal["GzipDecoder"]
20592083
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder]
@@ -2822,6 +2846,10 @@ class SimpleRetriever(BaseModel):
28222846
None,
28232847
description="Paginator component that describes how to navigate through the API's pages.",
28242848
)
2849+
pagination_reset: Optional[PaginationReset] = Field(
2850+
None,
2851+
description="Describes what triggers pagination reset and how to handle it.",
2852+
)
28252853
ignore_stream_slicer_parameters_on_paginated_requests: Optional[bool] = Field(
28262854
False,
28272855
description="If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
)
117117
from airbyte_cdk.sources.declarative.models import (
118118
CustomStateMigration,
119+
PaginationResetLimits,
119120
)
120121
from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (
121122
DEPRECATION_LOGS_TAG,
@@ -358,6 +359,9 @@
358359
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
359360
PageIncrement as PageIncrementModel,
360361
)
362+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
363+
PaginationReset as PaginationResetModel,
364+
)
361365
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
362366
ParametrizedComponentsResolver as ParametrizedComponentsResolverModel,
363367
)
@@ -529,6 +533,7 @@
529533
LocalFileSystemFileWriter,
530534
NoopFileWriter,
531535
)
536+
from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker
532537
from airbyte_cdk.sources.declarative.schema import (
533538
ComplexFieldType,
534539
DefaultSchemaLoader,
@@ -644,6 +649,8 @@
644649
# this would be a circular import
645650
MAX_SLICES = 5
646651

652+
LOGGER = logging.getLogger(f"airbyte.model_to_component_factory")
653+
647654

648655
class ModelToComponentFactory:
649656
EPOCH_DATETIME_FORMAT = "%s"
@@ -2043,6 +2050,7 @@ def create_default_stream(
20432050
if isinstance(concurrent_cursor, FinalStateCursor)
20442051
else concurrent_cursor
20452052
)
2053+
20462054
retriever = self._create_component_from_model(
20472055
model=model.retriever,
20482056
config=config,
@@ -2051,12 +2059,9 @@ def create_default_stream(
20512059
request_options_provider=request_options_provider,
20522060
stream_slicer=stream_slicer,
20532061
partition_router=partition_router,
2054-
stop_condition_cursor=concurrent_cursor
2055-
if self._is_stop_condition_on_cursor(model)
2056-
else None,
2057-
client_side_incremental_sync={"cursor": concurrent_cursor}
2058-
if self._is_client_side_filtering_enabled(model)
2059-
else None,
2062+
has_stop_condition_cursor=self._is_stop_condition_on_cursor(model),
2063+
is_client_side_incremental_sync=self._is_client_side_filtering_enabled(model),
2064+
cursor=concurrent_cursor,
20602065
transformations=transformations,
20612066
file_uploader=file_uploader,
20622067
incremental_sync=model.incremental_sync,
@@ -3050,7 +3055,7 @@ def create_record_selector(
30503055
name: str,
30513056
transformations: List[RecordTransformation] | None = None,
30523057
decoder: Decoder | None = None,
3053-
client_side_incremental_sync: Dict[str, Any] | None = None,
3058+
client_side_incremental_sync_cursor: Optional[Cursor] = None,
30543059
file_uploader: Optional[DefaultFileUploader] = None,
30553060
**kwargs: Any,
30563061
) -> RecordSelector:
@@ -3066,14 +3071,14 @@ def create_record_selector(
30663071
transform_before_filtering = (
30673072
False if model.transform_before_filtering is None else model.transform_before_filtering
30683073
)
3069-
if client_side_incremental_sync:
3074+
if client_side_incremental_sync_cursor:
30703075
record_filter = ClientSideIncrementalRecordFilterDecorator(
30713076
config=config,
30723077
parameters=model.parameters,
30733078
condition=model.record_filter.condition
30743079
if (model.record_filter and hasattr(model.record_filter, "condition"))
30753080
else None,
3076-
**client_side_incremental_sync,
3081+
cursor=client_side_incremental_sync_cursor,
30773082
)
30783083
transform_before_filtering = (
30793084
True
@@ -3151,8 +3156,9 @@ def create_simple_retriever(
31513156
name: str,
31523157
primary_key: Optional[Union[str, List[str], List[List[str]]]],
31533158
request_options_provider: Optional[RequestOptionsProvider] = None,
3154-
stop_condition_cursor: Optional[Cursor] = None,
3155-
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
3159+
cursor: Optional[Cursor] = None,
3160+
has_stop_condition_cursor: bool = False,
3161+
is_client_side_incremental_sync: bool = False,
31563162
transformations: List[RecordTransformation],
31573163
file_uploader: Optional[DefaultFileUploader] = None,
31583164
incremental_sync: Optional[
@@ -3182,6 +3188,9 @@ def _get_url(req: Requester) -> str:
31823188

31833189
return _url or _url_base
31843190

3191+
if cursor is None:
3192+
cursor = FinalStateCursor(name, None, self._message_repository)
3193+
31853194
decoder = (
31863195
self._create_component_from_model(model=model.decoder, config=config)
31873196
if model.decoder
@@ -3193,7 +3202,7 @@ def _get_url(req: Requester) -> str:
31933202
config=config,
31943203
decoder=decoder,
31953204
transformations=transformations,
3196-
client_side_incremental_sync=client_side_incremental_sync,
3205+
client_side_incremental_sync_cursor=cursor if is_client_side_incremental_sync else None,
31973206
file_uploader=file_uploader,
31983207
)
31993208

@@ -3270,7 +3279,7 @@ def _get_url(req: Requester) -> str:
32703279
url_base=_get_url(requester),
32713280
extractor_model=model.record_selector.extractor,
32723281
decoder=decoder,
3273-
cursor_used_for_stop_condition=stop_condition_cursor or None,
3282+
cursor_used_for_stop_condition=cursor if has_stop_condition_cursor else None,
32743283
)
32753284
if model.paginator
32763285
else NoPagination(parameters={})
@@ -3319,6 +3328,13 @@ def _get_url(req: Requester) -> str:
33193328
parameters=model.parameters or {},
33203329
)
33213330

3331+
if (
3332+
model.record_selector.record_filter
3333+
and model.pagination_reset
3334+
and model.pagination_reset.limits
3335+
):
3336+
raise ValueError("PaginationResetLimits are not support while having record filter.")
3337+
33223338
return SimpleRetriever(
33233339
name=name,
33243340
paginator=paginator,
@@ -3332,9 +3348,31 @@ def _get_url(req: Requester) -> str:
33323348
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
33333349
additional_query_properties=query_properties,
33343350
log_formatter=self._get_log_formatter(log_formatter, name),
3351+
pagination_tracker_factory=self._create_pagination_tracker_factory(
3352+
model.pagination_reset, cursor
3353+
),
33353354
parameters=model.parameters or {},
33363355
)
33373356

3357+
def _create_pagination_tracker_factory(
3358+
self, model: PaginationResetModel, cursor: Cursor
3359+
) -> Callable[[], PaginationTracker]:
3360+
# Until we figure out a way to use any cursor for PaginationTracker, we will have to have this cursor selector logic
3361+
cursor_for_pagination_tracking = None
3362+
if isinstance(cursor, ConcurrentCursor):
3363+
cursor_for_pagination_tracking = cursor
3364+
elif isinstance(cursor, ConcurrentPerPartitionCursor):
3365+
cursor_for_pagination_tracking = cursor._cursor_factory.create( # type: ignore # if this becomes a problem, we would need to extract the cursor_factory instantiation logic and make it accessible here
3366+
{}, datetime.timedelta(0)
3367+
)
3368+
elif not isinstance(cursor, FinalStateCursor):
3369+
LOGGER.warning(
3370+
"Unknown cursor for PaginationTracker. Pagination resets might not work properly"
3371+
)
3372+
3373+
limit = model.limits.number_of_records if model and model.limits else None
3374+
return lambda: PaginationTracker(cursor_for_pagination_tracking, limit)
3375+
33383376
def _get_log_formatter(
33393377
self, log_formatter: Callable[[Response], Any] | None, name: str
33403378
) -> Callable[[Response], Any] | None:
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
from typing import Optional
2+
3+
from airbyte_cdk.sources.declarative.models import FailureType
4+
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
5+
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
6+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
7+
8+
9+
class PaginationTracker:
10+
_record_count: int
11+
_number_of_attempt_with_same_slice: int
12+
13+
def __init__(
14+
self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None
15+
) -> None:
16+
"""
17+
Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all
18+
implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor`
19+
switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate
20+
view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only
21+
ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`.
22+
"""
23+
self._cursor = cursor
24+
self._limit = max_number_of_records
25+
self.reset()
26+
27+
"""
28+
Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will
29+
always process the same slice.
30+
31+
Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once.
32+
"""
33+
self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2
34+
self._number_of_attempt_with_same_slice = 0
35+
36+
def observe(self, record: Record) -> None:
37+
self._record_count += 1
38+
if self._cursor:
39+
self._cursor.observe(record)
40+
41+
def has_reached_limit(self) -> bool:
42+
return self._limit is not None and self._record_count >= self._limit
43+
44+
def reset(self) -> None:
45+
self._record_count = 0
46+
47+
def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice:
48+
new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice
49+
50+
self._number_of_attempt_with_same_slice += 1
51+
if new_slice == stream_slice:
52+
if (
53+
self._number_of_attempt_with_same_slice
54+
>= self._allowed_number_of_attempt_with_same_slice
55+
):
56+
raise AirbyteTracedException(
57+
internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}",
58+
failure_type=FailureType.system_error,
59+
)
60+
61+
return new_slice

0 commit comments

Comments
 (0)