Skip to content

Commit 7a35833

Browse files
authored
feat: pagination reset (#781)
1 parent b28c6e3 commit 7a35833

File tree

15 files changed

+1016
-96
lines changed

15 files changed

+1016
-96
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2279,12 +2279,14 @@ definitions:
22792279
- FAIL
22802280
- RETRY
22812281
- IGNORE
2282+
- RESET_PAGINATION
22822283
- RATE_LIMITED
22832284
examples:
22842285
- SUCCESS
22852286
- FAIL
22862287
- RETRY
22872288
- IGNORE
2289+
- RESET_PAGINATION
22882290
- RATE_LIMITED
22892291
failure_type:
22902292
title: Failure Type
@@ -3707,6 +3709,9 @@ definitions:
37073709
anyOf:
37083710
- "$ref": "#/definitions/DefaultPaginator"
37093711
- "$ref": "#/definitions/NoPagination"
3712+
pagination_reset:
3713+
description: Describes what triggers pagination reset and how to handle it.
3714+
"$ref": "#/definitions/PaginationReset"
37103715
ignore_stream_slicer_parameters_on_paginated_requests:
37113716
description: If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.
37123717
type: boolean
@@ -3730,6 +3735,36 @@ definitions:
37303735
$parameters:
37313736
type: object
37323737
additionalProperties: true
3738+
PaginationReset:
3739+
title: Pagination Reset
3740+
description: Describes what triggers pagination reset and how to handle it. If SPLIT_USING_CURSOR, the connector developer is accountable for ensuring that the records are returned in ascending order.
3741+
type: object
3742+
required:
3743+
- type
3744+
- action
3745+
properties:
3746+
type:
3747+
type: string
3748+
enum: [ PaginationReset ]
3749+
action:
3750+
type: string
3751+
enum:
3752+
- SPLIT_USING_CURSOR
3753+
- RESET
3754+
limits:
3755+
"$ref": "#/definitions/PaginationResetLimits"
3756+
PaginationResetLimits:
3757+
title: Pagination Reset Limits
3758+
description: Describes the limits that trigger pagination reset
3759+
type: object
3760+
required:
3761+
- type
3762+
properties:
3763+
type:
3764+
type: string
3765+
enum: [ PaginationResetLimits ]
3766+
number_of_records:
3767+
type: integer
37333768
GzipDecoder:
37343769
title: gzip
37353770
description: Select 'gzip' for response data that is compressed with gzip. Requires specifying an inner data type/decoder to parse the decompressed data.

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ def __init__(
151151
self._connector_state_converter = connector_state_converter
152152
self._cursor_field = cursor_field
153153

154-
self._cursor_factory = cursor_factory
154+
self._cursor_factory = cursor_factory # self._cursor_factory is flagged as private but is used in model_to_component_factory to ease pagination reset instantiation
155155
self._partition_router = partition_router
156156

157157
# The dict is ordered to ensure that once the maximum number of partitions is reached,

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,7 @@ class Action(Enum):
539539
FAIL = "FAIL"
540540
RETRY = "RETRY"
541541
IGNORE = "IGNORE"
542+
RESET_PAGINATION = "RESET_PAGINATION"
542543
RATE_LIMITED = "RATE_LIMITED"
543544

544545

@@ -553,7 +554,14 @@ class HttpResponseFilter(BaseModel):
553554
action: Optional[Action] = Field(
554555
None,
555556
description="Action to execute if a response matches the filter.",
556-
examples=["SUCCESS", "FAIL", "RETRY", "IGNORE", "RATE_LIMITED"],
557+
examples=[
558+
"SUCCESS",
559+
"FAIL",
560+
"RETRY",
561+
"IGNORE",
562+
"RESET_PAGINATION",
563+
"RATE_LIMITED",
564+
],
557565
title="Action",
558566
)
559567
failure_type: Optional[FailureType] = Field(
@@ -1173,6 +1181,16 @@ class LegacySessionTokenAuthenticator(BaseModel):
11731181
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
11741182

11751183

1184+
class Action1(Enum):
1185+
SPLIT_USING_CURSOR = "SPLIT_USING_CURSOR"
1186+
RESET = "RESET"
1187+
1188+
1189+
class PaginationResetLimits(BaseModel):
1190+
type: Literal["PaginationResetLimits"]
1191+
number_of_records: Optional[int] = None
1192+
1193+
11761194
class CsvDecoder(BaseModel):
11771195
type: Literal["CsvDecoder"]
11781196
encoding: Optional[str] = "utf-8"
@@ -2054,6 +2072,12 @@ class RecordSelector(BaseModel):
20542072
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20552073

20562074

2075+
class PaginationReset(BaseModel):
2076+
type: Literal["PaginationReset"]
2077+
action: Action1
2078+
limits: Optional[PaginationResetLimits] = None
2079+
2080+
20572081
class GzipDecoder(BaseModel):
20582082
type: Literal["GzipDecoder"]
20592083
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder]
@@ -2822,6 +2846,10 @@ class SimpleRetriever(BaseModel):
28222846
None,
28232847
description="Paginator component that describes how to navigate through the API's pages.",
28242848
)
2849+
pagination_reset: Optional[PaginationReset] = Field(
2850+
None,
2851+
description="Describes what triggers pagination reset and how to handle it.",
2852+
)
28252853
ignore_stream_slicer_parameters_on_paginated_requests: Optional[bool] = Field(
28262854
False,
28272855
description="If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,15 @@
116116
)
117117
from airbyte_cdk.sources.declarative.models import (
118118
CustomStateMigration,
119+
PaginationResetLimits,
119120
)
120121
from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (
121122
DEPRECATION_LOGS_TAG,
122123
BaseModelWithDeprecations,
123124
)
125+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
126+
Action1 as PaginationResetActionModel,
127+
)
124128
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
125129
AddedFieldDefinition as AddedFieldDefinitionModel,
126130
)
@@ -358,6 +362,9 @@
358362
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
359363
PageIncrement as PageIncrementModel,
360364
)
365+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
366+
PaginationReset as PaginationResetModel,
367+
)
361368
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
362369
ParametrizedComponentsResolver as ParametrizedComponentsResolverModel,
363370
)
@@ -529,6 +536,7 @@
529536
LocalFileSystemFileWriter,
530537
NoopFileWriter,
531538
)
539+
from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker
532540
from airbyte_cdk.sources.declarative.schema import (
533541
ComplexFieldType,
534542
DefaultSchemaLoader,
@@ -644,6 +652,8 @@
644652
# this would be a circular import
645653
MAX_SLICES = 5
646654

655+
LOGGER = logging.getLogger(f"airbyte.model_to_component_factory")
656+
647657

648658
class ModelToComponentFactory:
649659
EPOCH_DATETIME_FORMAT = "%s"
@@ -2043,6 +2053,7 @@ def create_default_stream(
20432053
if isinstance(concurrent_cursor, FinalStateCursor)
20442054
else concurrent_cursor
20452055
)
2056+
20462057
retriever = self._create_component_from_model(
20472058
model=model.retriever,
20482059
config=config,
@@ -2051,12 +2062,9 @@ def create_default_stream(
20512062
request_options_provider=request_options_provider,
20522063
stream_slicer=stream_slicer,
20532064
partition_router=partition_router,
2054-
stop_condition_cursor=concurrent_cursor
2055-
if self._is_stop_condition_on_cursor(model)
2056-
else None,
2057-
client_side_incremental_sync={"cursor": concurrent_cursor}
2058-
if self._is_client_side_filtering_enabled(model)
2059-
else None,
2065+
has_stop_condition_cursor=self._is_stop_condition_on_cursor(model),
2066+
is_client_side_incremental_sync=self._is_client_side_filtering_enabled(model),
2067+
cursor=concurrent_cursor,
20602068
transformations=transformations,
20612069
file_uploader=file_uploader,
20622070
incremental_sync=model.incremental_sync,
@@ -3049,7 +3057,7 @@ def create_record_selector(
30493057
name: str,
30503058
transformations: List[RecordTransformation] | None = None,
30513059
decoder: Decoder | None = None,
3052-
client_side_incremental_sync: Dict[str, Any] | None = None,
3060+
client_side_incremental_sync_cursor: Optional[Cursor] = None,
30533061
file_uploader: Optional[DefaultFileUploader] = None,
30543062
**kwargs: Any,
30553063
) -> RecordSelector:
@@ -3065,14 +3073,14 @@ def create_record_selector(
30653073
transform_before_filtering = (
30663074
False if model.transform_before_filtering is None else model.transform_before_filtering
30673075
)
3068-
if client_side_incremental_sync:
3076+
if client_side_incremental_sync_cursor:
30693077
record_filter = ClientSideIncrementalRecordFilterDecorator(
30703078
config=config,
30713079
parameters=model.parameters,
30723080
condition=model.record_filter.condition
30733081
if (model.record_filter and hasattr(model.record_filter, "condition"))
30743082
else None,
3075-
**client_side_incremental_sync,
3083+
cursor=client_side_incremental_sync_cursor,
30763084
)
30773085
transform_before_filtering = (
30783086
True
@@ -3150,8 +3158,9 @@ def create_simple_retriever(
31503158
name: str,
31513159
primary_key: Optional[Union[str, List[str], List[List[str]]]],
31523160
request_options_provider: Optional[RequestOptionsProvider] = None,
3153-
stop_condition_cursor: Optional[Cursor] = None,
3154-
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
3161+
cursor: Optional[Cursor] = None,
3162+
has_stop_condition_cursor: bool = False,
3163+
is_client_side_incremental_sync: bool = False,
31553164
transformations: List[RecordTransformation],
31563165
file_uploader: Optional[DefaultFileUploader] = None,
31573166
incremental_sync: Optional[
@@ -3181,6 +3190,9 @@ def _get_url(req: Requester) -> str:
31813190

31823191
return _url or _url_base
31833192

3193+
if cursor is None:
3194+
cursor = FinalStateCursor(name, None, self._message_repository)
3195+
31843196
decoder = (
31853197
self._create_component_from_model(model=model.decoder, config=config)
31863198
if model.decoder
@@ -3192,7 +3204,7 @@ def _get_url(req: Requester) -> str:
31923204
config=config,
31933205
decoder=decoder,
31943206
transformations=transformations,
3195-
client_side_incremental_sync=client_side_incremental_sync,
3207+
client_side_incremental_sync_cursor=cursor if is_client_side_incremental_sync else None,
31963208
file_uploader=file_uploader,
31973209
)
31983210

@@ -3280,7 +3292,7 @@ def _get_url(req: Requester) -> str:
32803292
url_base=_get_url(requester),
32813293
extractor_model=model.record_selector.extractor,
32823294
decoder=decoder,
3283-
cursor_used_for_stop_condition=stop_condition_cursor or None,
3295+
cursor_used_for_stop_condition=cursor if has_stop_condition_cursor else None,
32843296
)
32853297
if model.paginator
32863298
else NoPagination(parameters={})
@@ -3329,6 +3341,13 @@ def _get_url(req: Requester) -> str:
33293341
parameters=model.parameters or {},
33303342
)
33313343

3344+
if (
3345+
model.record_selector.record_filter
3346+
and model.pagination_reset
3347+
and model.pagination_reset.limits
3348+
):
3349+
raise ValueError("PaginationResetLimits are not supported while having record filter.")
3350+
33323351
return SimpleRetriever(
33333352
name=name,
33343353
paginator=paginator,
@@ -3342,9 +3361,40 @@ def _get_url(req: Requester) -> str:
33423361
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
33433362
additional_query_properties=query_properties,
33443363
log_formatter=self._get_log_formatter(log_formatter, name),
3364+
pagination_tracker_factory=self._create_pagination_tracker_factory(
3365+
model.pagination_reset, cursor
3366+
),
33453367
parameters=model.parameters or {},
33463368
)
33473369

3370+
def _create_pagination_tracker_factory(
3371+
self, model: Optional[PaginationResetModel], cursor: Cursor
3372+
) -> Callable[[], PaginationTracker]:
3373+
if model is None:
3374+
return lambda: PaginationTracker()
3375+
3376+
# Until we figure out a way to use any cursor for PaginationTracker, we will have to have this cursor selector logic
3377+
cursor_factory: Callable[[], Optional[ConcurrentCursor]] = lambda: None
3378+
if model.action == PaginationResetActionModel.RESET:
3379+
# in that case, we will let cursor_factory to return None even if the stream has a cursor
3380+
pass
3381+
elif model.action == PaginationResetActionModel.SPLIT_USING_CURSOR:
3382+
if isinstance(cursor, ConcurrentCursor):
3383+
cursor_factory = lambda: cursor.copy_without_state() # type: ignore # the if condition validates that it is a ConcurrentCursor
3384+
elif isinstance(cursor, ConcurrentPerPartitionCursor):
3385+
cursor_factory = lambda: cursor._cursor_factory.create( # type: ignore # if this becomes a problem, we would need to extract the cursor_factory instantiation logic and make it accessible here
3386+
{}, datetime.timedelta(0)
3387+
)
3388+
elif not isinstance(cursor, FinalStateCursor):
3389+
LOGGER.warning(
3390+
"Unknown cursor for PaginationTracker. Pagination resets might not work properly"
3391+
)
3392+
else:
3393+
raise ValueError(f"Unknown PaginationReset action: {model.action}")
3394+
3395+
limit = model.limits.number_of_records if model and model.limits else None
3396+
return lambda: PaginationTracker(cursor_factory(), limit)
3397+
33483398
def _get_log_formatter(
33493399
self, log_formatter: Callable[[Response], Any] | None, name: str
33503400
) -> Callable[[Response], Any] | None:

airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,14 @@ def interpret_response(
6666
if not isinstance(matched_error_resolution, ErrorResolution):
6767
continue
6868

69-
if matched_error_resolution.response_action == ResponseAction.SUCCESS:
69+
if matched_error_resolution.response_action in [
70+
ResponseAction.SUCCESS,
71+
ResponseAction.RETRY,
72+
ResponseAction.IGNORE,
73+
ResponseAction.RESET_PAGINATION,
74+
]:
7075
return matched_error_resolution
7176

72-
if (
73-
matched_error_resolution.response_action == ResponseAction.RETRY
74-
or matched_error_resolution.response_action == ResponseAction.IGNORE
75-
):
76-
return matched_error_resolution
7777
if matched_error_resolution:
7878
return matched_error_resolution
7979

0 commit comments

Comments
 (0)