3333from airbyte_cdk .sources .declarative .partition_routers .single_partition_router import (
3434 SinglePartitionRouter ,
3535)
36- from airbyte_cdk .sources .declarative .stream_slicers import StreamSlicerTestReadDecorator
36+ from airbyte_cdk .sources .declarative .stream_slicers import (
37+ StreamSlicer ,
38+ StreamSlicerTestReadDecorator ,
39+ )
3740from airbyte_cdk .sources .message import NoopMessageRepository
3841from unit_tests .sources .declarative .async_job .test_integration import MockAsyncJobRepository
3942
@@ -83,6 +86,24 @@ def date_time_based_cursor_factory() -> DatetimeBasedCursor:
8386 )
8487
8588
89+ def create_substream_partition_router ():
90+ return SubstreamPartitionRouter (
91+ config = {},
92+ parameters = {},
93+ parent_stream_configs = [
94+ ParentStreamConfig (
95+ type = "ParentStreamConfig" ,
96+ parent_key = "id" ,
97+ partition_field = "id" ,
98+ stream = DeclarativeStream (
99+ type = "DeclarativeStream" ,
100+ retriever = CustomRetriever (type = "CustomRetriever" , class_name = "a_class_name" ),
101+ ),
102+ )
103+ ],
104+ )
105+
106+
86107def test_isinstance_global_cursor ():
87108 first_partition = {"first_partition_key" : "first_partition_value" }
88109 partition_router = mocked_partition_router ()
@@ -142,21 +163,7 @@ def test_isinstance_global_cursor_aysnc_job_partition_router():
142163
143164
144165def test_isinstance_substrea_partition_router ():
145- partition_router = SubstreamPartitionRouter (
146- config = {},
147- parameters = {},
148- parent_stream_configs = [
149- ParentStreamConfig (
150- type = "ParentStreamConfig" ,
151- parent_key = "id" ,
152- partition_field = "id" ,
153- stream = DeclarativeStream (
154- type = "DeclarativeStream" ,
155- retriever = CustomRetriever (type = "CustomRetriever" , class_name = "a_class_name" ),
156- ),
157- )
158- ],
159- )
166+ partition_router = create_substream_partition_router ()
160167
161168 wrapped_slicer = StreamSlicerTestReadDecorator (
162169 wrapped_slicer = partition_router ,
@@ -175,21 +182,7 @@ def test_isinstance_substrea_partition_router():
175182
176183
177184def test_isinstance_perpartition_with_global_cursor ():
178- partition_router = SubstreamPartitionRouter (
179- config = {},
180- parameters = {},
181- parent_stream_configs = [
182- ParentStreamConfig (
183- type = "ParentStreamConfig" ,
184- parent_key = "id" ,
185- partition_field = "id" ,
186- stream = DeclarativeStream (
187- type = "DeclarativeStream" ,
188- retriever = CustomRetriever (type = "CustomRetriever" , class_name = "a_class_name" ),
189- ),
190- )
191- ],
192- )
185+ partition_router = create_substream_partition_router ()
193186 date_time_based_cursor = date_time_based_cursor_factory ()
194187
195188 cursor_factory = CursorFactory (date_time_based_cursor_factory )
@@ -221,3 +214,21 @@ def test_isinstance_perpartition_with_global_cursor():
221214 assert substream_cursor ._global_cursor ._stream_cursor == date_time_based_cursor
222215
223216 assert substream_cursor ._get_active_cursor () == wrapped_slicer ._get_active_cursor ()
217+
218+
219+ def test_slice_limiting_functionality ():
220+ # Create a slicer that returns many slices
221+ mock_slicer = Mock (spec = StreamSlicer )
222+ mock_slicer .stream_slices .return_value = [
223+ StreamSlice (partition = {f"key_{ i } " : f"value_{ i } " }, cursor_slice = {}) for i in range (10 )
224+ ]
225+
226+ # Wrap with decorator limiting to 3 slices
227+ wrapped_slicer = StreamSlicerTestReadDecorator (
228+ wrapped_slicer = mock_slicer ,
229+ maximum_number_of_slices = 3 ,
230+ )
231+
232+ # Verify only 3 slices are returned
233+ slices = list (wrapped_slicer .stream_slices ())
234+ assert len (slices ) == 3
0 commit comments