Skip to content

Commit f247e93

Browse files
author
maxime.c
committed
One cursor per PaginationTracker
1 parent c8e5c1e commit f247e93

File tree

2 files changed

+23
-5
lines changed

2 files changed

+23
-5
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3361,11 +3361,11 @@ def _create_pagination_tracker_factory(
33613361
return lambda: PaginationTracker()
33623362

33633363
# Until we figure out a way to use any cursor for PaginationTracker, we will have to have this cursor selector logic
3364-
cursor_for_pagination_tracking = None
3364+
cursor_factory: Callable[[], Optional[ConcurrentCursor]] = lambda: None
33653365
if isinstance(cursor, ConcurrentCursor):
3366-
cursor_for_pagination_tracking = cursor
3366+
cursor_factory = lambda: cursor.copy_without_state() # type: ignore # the if condition validates that it is a ConcurrentCursor
33673367
elif isinstance(cursor, ConcurrentPerPartitionCursor):
3368-
cursor_for_pagination_tracking = cursor._cursor_factory.create( # type: ignore # if this becomes a problem, we would need to extract the cursor_factory instantiation logic and make it accessible here
3368+
cursor_factory = lambda: cursor._cursor_factory.create( # type: ignore # if this becomes a problem, we would need to extract the cursor_factory instantiation logic and make it accessible here
33693369
{}, datetime.timedelta(0)
33703370
)
33713371
elif not isinstance(cursor, FinalStateCursor):
@@ -3374,7 +3374,7 @@ def _create_pagination_tracker_factory(
33743374
)
33753375

33763376
limit = model.limits.number_of_records if model and model.limits else None
3377-
return lambda: PaginationTracker(cursor_for_pagination_tracking, limit)
3377+
return lambda: PaginationTracker(cursor_factory(), limit)
33783378

33793379
def _get_log_formatter(
33803380
self, log_formatter: Callable[[Response], Any] | None, name: str

airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
)
2020

2121
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
22-
from airbyte_cdk.sources.message import MessageRepository
22+
from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
2323
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
2424
from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping
2525
from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType
@@ -136,6 +136,24 @@ class ConcurrentCursor(Cursor):
136136
_START_BOUNDARY = 0
137137
_END_BOUNDARY = 1
138138

139+
def copy_without_state(self) -> "ConcurrentCursor":
140+
return self.__class__(
141+
stream_name=self._stream_name,
142+
stream_namespace=self._stream_namespace,
143+
stream_state={},
144+
message_repository=NoopMessageRepository(),
145+
connector_state_manager=ConnectorStateManager(),
146+
connector_state_converter=self._connector_state_converter,
147+
cursor_field=self._cursor_field,
148+
slice_boundary_fields=self._slice_boundary_fields,
149+
start=self._start,
150+
end_provider=self._end_provider,
151+
lookback_window=self._lookback_window,
152+
slice_range=self._slice_range,
153+
cursor_granularity=self._cursor_granularity,
154+
clamping_strategy=self._clamping_strategy,
155+
)
156+
139157
def __init__(
140158
self,
141159
stream_name: str,

0 commit comments

Comments
 (0)