11#
22# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33#
4-
4+ from datetime import timedelta
55from unittest .mock import Mock
66
7- from airbyte_cdk .legacy .sources .declarative .incremental import (
8- CursorFactory ,
9- DatetimeBasedCursor ,
10- GlobalSubstreamCursor ,
11- PerPartitionWithGlobalCursor ,
12- )
13- from airbyte_cdk .legacy .sources .declarative .incremental .declarative_cursor import DeclarativeCursor
7+ import pytest
8+
9+ from airbyte_cdk .sources .connector_state_manager import ConnectorStateManager
1410from airbyte_cdk .sources .declarative .async_job .job_orchestrator import (
1511 AsyncJobOrchestrator ,
1612)
1713from airbyte_cdk .sources .declarative .async_job .job_tracker import JobTracker
18- from airbyte_cdk .sources .declarative .datetime .min_max_datetime import MinMaxDatetime
19- from airbyte_cdk .sources .declarative .interpolation import InterpolatedString
14+ from airbyte_cdk .sources .declarative .incremental import (
15+ ConcurrentCursorFactory ,
16+ ConcurrentPerPartitionCursor ,
17+ )
2018from airbyte_cdk .sources .declarative .models import (
2119 CustomRetriever ,
2220 DeclarativeStream ,
3432 StreamSlicer ,
3533 StreamSlicerTestReadDecorator ,
3634)
37- from airbyte_cdk .sources .message import NoopMessageRepository
35+ from airbyte_cdk .sources .message import MessageRepository , NoopMessageRepository
36+ from airbyte_cdk .sources .streams .concurrent .cursor import ConcurrentCursor , Cursor , CursorField
37+ from airbyte_cdk .sources .streams .concurrent .state_converters .datetime_stream_state_converter import (
38+ CustomFormatConcurrentStreamStateConverter ,
39+ )
3840from airbyte_cdk .sources .types import StreamSlice
41+ from airbyte_cdk .utils .datetime_helpers import ab_datetime_parse
3942from unit_tests .sources .declarative .async_job .test_integration import MockAsyncJobRepository
4043
4144CURSOR_SLICE_FIELD = "cursor slice field"
@@ -57,7 +60,7 @@ def with_stream_state(self, stream_state):
5760 return self
5861
5962 def build (self ):
60- cursor = Mock (spec = DeclarativeCursor )
63+ cursor = Mock (spec = Cursor )
6164 cursor .get_stream_state .return_value = self ._stream_state
6265 cursor .stream_slices .return_value = self ._stream_slices
6366 return cursor
@@ -67,20 +70,31 @@ def mocked_partition_router():
6770 return Mock (spec = PartitionRouter )
6871
6972
70- def date_time_based_cursor_factory () -> DatetimeBasedCursor :
71- return DatetimeBasedCursor (
72- start_datetime = MinMaxDatetime (
73- datetime = "2021-01-01" , datetime_format = DATE_FORMAT , parameters = {}
74- ),
75- end_datetime = MinMaxDatetime (
76- datetime = "2021-01-05" , datetime_format = DATE_FORMAT , parameters = {}
77- ),
78- step = "P10Y" ,
79- cursor_field = InterpolatedString . create ( "created_at" , parameters = {}),
73+ def mocked_message_repository () -> MessageRepository :
74+ return Mock ( spec = MessageRepository )
75+
76+
77+ def mocked_connector_state_manager () -> ConnectorStateManager :
78+ return Mock ( spec = ConnectorStateManager )
79+
80+
81+ def concurrent_cursor_factory () -> ConcurrentCursor :
82+ state_converter = CustomFormatConcurrentStreamStateConverter (
8083 datetime_format = DATE_FORMAT ,
81- cursor_granularity = "P1D" ,
82- config = {},
83- parameters = {},
84+ is_sequential_state = False ,
85+ )
86+
87+ return ConcurrentCursor (
88+ stream_name = "test" ,
89+ stream_namespace = "" ,
90+ stream_state = {},
91+ message_repository = mocked_message_repository (),
92+ connector_state_manager = mocked_connector_state_manager (),
93+ connector_state_converter = state_converter ,
94+ cursor_field = CursorField ("created_at" ),
95+ slice_boundary_fields = None ,
96+ start = ab_datetime_parse ("2021-01-01" ),
97+ end_provider = state_converter .get_end_provider (),
8498 )
8599
86100
@@ -102,45 +116,7 @@ def create_substream_partition_router():
102116 )
103117
104118
105- def test_isinstance_global_cursor ():
106- first_partition = {"first_partition_key" : "first_partition_value" }
107- partition_router = mocked_partition_router ()
108- partition_router .stream_slices .return_value = [
109- StreamSlice (
110- partition = first_partition , cursor_slice = {}, extra_fields = {"extra_field" : "extra_value" }
111- ),
112- ]
113- cursor = (
114- MockedCursorBuilder ()
115- .with_stream_slices ([{CURSOR_SLICE_FIELD : "first slice cursor value" }])
116- .build ()
117- )
118-
119- global_cursor = GlobalSubstreamCursor (cursor , partition_router )
120- wrapped_slicer = StreamSlicerTestReadDecorator (
121- wrapped_slicer = global_cursor ,
122- maximum_number_of_slices = 5 ,
123- )
124- assert isinstance (wrapped_slicer , GlobalSubstreamCursor )
125- assert isinstance (wrapped_slicer .wrapped_slicer , GlobalSubstreamCursor )
126- assert isinstance (wrapped_slicer , StreamSlicerTestReadDecorator )
127-
128- assert not isinstance (wrapped_slicer .wrapped_slicer , StreamSlicerTestReadDecorator )
129- assert not isinstance (wrapped_slicer , AsyncJobPartitionRouter )
130- assert not isinstance (wrapped_slicer .wrapped_slicer , AsyncJobPartitionRouter )
131- assert not isinstance (wrapped_slicer , PerPartitionWithGlobalCursor )
132- assert not isinstance (wrapped_slicer .wrapped_slicer , PerPartitionWithGlobalCursor )
133- assert not isinstance (wrapped_slicer , SubstreamPartitionRouter )
134- assert not isinstance (wrapped_slicer .wrapped_slicer , SubstreamPartitionRouter )
135-
136- assert isinstance (global_cursor , GlobalSubstreamCursor )
137- assert not isinstance (global_cursor , StreamSlicerTestReadDecorator )
138- assert not isinstance (global_cursor , AsyncJobPartitionRouter )
139- assert not isinstance (global_cursor , PerPartitionWithGlobalCursor )
140- assert not isinstance (global_cursor , SubstreamPartitionRouter )
141-
142-
143- def test_isinstance_global_cursor_aysnc_job_partition_router ():
119+ def test_isinstance_global_cursor_async_job_partition_router ():
144120 async_job_partition_router = AsyncJobPartitionRouter (
145121 stream_slicer = SinglePartitionRouter (parameters = {}),
146122 job_orchestrator_factory = lambda stream_slices : AsyncJobOrchestrator (
@@ -162,17 +138,14 @@ def test_isinstance_global_cursor_aysnc_job_partition_router():
162138 assert isinstance (wrapped_slicer , StreamSlicerTestReadDecorator )
163139
164140 assert not isinstance (wrapped_slicer .wrapped_slicer , StreamSlicerTestReadDecorator )
165- assert not isinstance (wrapped_slicer , GlobalSubstreamCursor )
166- assert not isinstance (wrapped_slicer .wrapped_slicer , GlobalSubstreamCursor )
167- assert not isinstance (wrapped_slicer , PerPartitionWithGlobalCursor )
168- assert not isinstance (wrapped_slicer .wrapped_slicer , PerPartitionWithGlobalCursor )
141+ assert not isinstance (wrapped_slicer , ConcurrentPerPartitionCursor )
142+ assert not isinstance (wrapped_slicer .wrapped_slicer , ConcurrentPerPartitionCursor )
169143 assert not isinstance (wrapped_slicer , SubstreamPartitionRouter )
170144 assert not isinstance (wrapped_slicer .wrapped_slicer , SubstreamPartitionRouter )
171145
172146 assert isinstance (async_job_partition_router , AsyncJobPartitionRouter )
173147 assert not isinstance (async_job_partition_router , StreamSlicerTestReadDecorator )
174- assert not isinstance (async_job_partition_router , GlobalSubstreamCursor )
175- assert not isinstance (async_job_partition_router , PerPartitionWithGlobalCursor )
148+ assert not isinstance (async_job_partition_router , ConcurrentPerPartitionCursor )
176149 assert not isinstance (async_job_partition_router , SubstreamPartitionRouter )
177150
178151
@@ -189,63 +162,76 @@ def test_isinstance_substream_partition_router():
189162 assert isinstance (wrapped_slicer , StreamSlicerTestReadDecorator )
190163
191164 assert not isinstance (wrapped_slicer .wrapped_slicer , StreamSlicerTestReadDecorator )
192- assert not isinstance (wrapped_slicer , GlobalSubstreamCursor )
193- assert not isinstance (wrapped_slicer .wrapped_slicer , GlobalSubstreamCursor )
165+ assert not isinstance (wrapped_slicer , ConcurrentPerPartitionCursor )
166+ assert not isinstance (wrapped_slicer .wrapped_slicer , ConcurrentPerPartitionCursor )
194167 assert not isinstance (wrapped_slicer , AsyncJobPartitionRouter )
195168 assert not isinstance (wrapped_slicer .wrapped_slicer , AsyncJobPartitionRouter )
196- assert not isinstance (wrapped_slicer , PerPartitionWithGlobalCursor )
197- assert not isinstance (wrapped_slicer .wrapped_slicer , PerPartitionWithGlobalCursor )
198169
199170 assert isinstance (partition_router , SubstreamPartitionRouter )
200171 assert not isinstance (partition_router , StreamSlicerTestReadDecorator )
201- assert not isinstance (partition_router , GlobalSubstreamCursor )
172+ assert not isinstance (partition_router , ConcurrentPerPartitionCursor )
202173 assert not isinstance (partition_router , AsyncJobPartitionRouter )
203- assert not isinstance (partition_router , PerPartitionWithGlobalCursor )
204174
205175
206- def test_isinstance_perpartition_with_global_cursor ():
176+ @pytest .mark .parametrize (
177+ "use_global_cursor" ,
178+ [
179+ pytest .param (True , id = "test_with_global_cursor" ),
180+ pytest .param (False , id = "test_with_no_global_cursor" ),
181+ ],
182+ )
183+ def test_isinstance_concurrent_per_partition_cursor (use_global_cursor ):
207184 partition_router = create_substream_partition_router ()
208- date_time_based_cursor = date_time_based_cursor_factory ()
185+ cursor_factory = ConcurrentCursorFactory (concurrent_cursor_factory )
186+ connector_state_converter = CustomFormatConcurrentStreamStateConverter (
187+ datetime_format = "%Y-%m-%dT%H:%M:%SZ" ,
188+ input_datetime_formats = ["%Y-%m-%dT%H:%M:%SZ" ],
189+ is_sequential_state = True ,
190+ cursor_granularity = timedelta (0 ),
191+ )
209192
210- cursor_factory = CursorFactory (date_time_based_cursor_factory )
211- substream_cursor = PerPartitionWithGlobalCursor (
193+ substream_cursor = ConcurrentPerPartitionCursor (
212194 cursor_factory = cursor_factory ,
213195 partition_router = partition_router ,
214- stream_cursor = date_time_based_cursor ,
196+ stream_name = "test" ,
197+ stream_namespace = "" ,
198+ stream_state = {},
199+ message_repository = mocked_message_repository (),
200+ connector_state_manager = mocked_connector_state_manager (),
201+ connector_state_converter = connector_state_converter ,
202+ cursor_field = CursorField (cursor_field_key = "updated_at" ),
203+ use_global_cursor = use_global_cursor ,
215204 )
216205
217206 wrapped_slicer = StreamSlicerTestReadDecorator (
218207 wrapped_slicer = substream_cursor ,
219208 maximum_number_of_slices = 5 ,
220209 )
221210
222- assert isinstance (wrapped_slicer , PerPartitionWithGlobalCursor )
223- assert isinstance (wrapped_slicer .wrapped_slicer , PerPartitionWithGlobalCursor )
211+ assert isinstance (wrapped_slicer , ConcurrentPerPartitionCursor )
212+ assert isinstance (wrapped_slicer .wrapped_slicer , ConcurrentPerPartitionCursor )
224213 assert isinstance (wrapped_slicer , StreamSlicerTestReadDecorator )
225214
226215 assert not isinstance (wrapped_slicer .wrapped_slicer , StreamSlicerTestReadDecorator )
227- assert not isinstance (wrapped_slicer , GlobalSubstreamCursor )
228- assert not isinstance (wrapped_slicer .wrapped_slicer , GlobalSubstreamCursor )
229216 assert not isinstance (wrapped_slicer , AsyncJobPartitionRouter )
230217 assert not isinstance (wrapped_slicer .wrapped_slicer , AsyncJobPartitionRouter )
231218 assert not isinstance (wrapped_slicer , SubstreamPartitionRouter )
232219 assert not isinstance (wrapped_slicer .wrapped_slicer , SubstreamPartitionRouter )
233220
234- assert wrapped_slicer ._per_partition_cursor . _cursor_factory == cursor_factory
221+ assert wrapped_slicer ._cursor_factory == cursor_factory
235222 assert wrapped_slicer ._partition_router == partition_router
236- assert wrapped_slicer ._global_cursor . _stream_cursor == date_time_based_cursor
223+ assert wrapped_slicer ._use_global_cursor == use_global_cursor
237224
238- assert isinstance (substream_cursor , PerPartitionWithGlobalCursor )
225+ assert isinstance (substream_cursor , ConcurrentPerPartitionCursor )
239226 assert not isinstance (substream_cursor , StreamSlicerTestReadDecorator )
240- assert not isinstance (substream_cursor , GlobalSubstreamCursor )
241227 assert not isinstance (substream_cursor , AsyncJobPartitionRouter )
242228 assert not isinstance (substream_cursor , SubstreamPartitionRouter )
243229
244- assert substream_cursor ._per_partition_cursor . _cursor_factory == cursor_factory
230+ assert substream_cursor ._cursor_factory == cursor_factory
245231 assert substream_cursor ._partition_router == partition_router
246- assert substream_cursor . _global_cursor . _stream_cursor == date_time_based_cursor
232+ assert wrapped_slicer . _use_global_cursor == use_global_cursor
247233
248- assert substream_cursor ._get_active_cursor () == wrapped_slicer ._get_active_cursor ()
234+ assert substream_cursor ._use_global_cursor == wrapped_slicer ._use_global_cursor
249235
250236
251237def test_slice_limiting_functionality ():
0 commit comments