@@ -422,142 +422,6 @@ def test_substream_without_input_state():
422422 ]
423423
424424
425- def test_switch_to_global_limit (caplog ):
426- """
427- Test that when the number of partitions exceeds the limit to switch to global state.
428-
429- In this test, we set the maximum number of partitions to 1 (not 2 because we evaluate this before generating a
430- partition and the limit is not inclusive) and provide 3 partitions.
431- We verify that the state switch to global.
432- """
433- stream_name = "Rates"
434-
435- partition_slices = [
436- StreamSlice (partition = {"partition_field" : "1" }, cursor_slice = {}),
437- StreamSlice (partition = {"partition_field" : "2" }, cursor_slice = {}),
438- StreamSlice (partition = {"partition_field" : "3" }, cursor_slice = {}),
439- ]
440-
441- records_list = [
442- [
443- Record (
444- data = {"a record key" : "a record value" , CURSOR_FIELD : "2022-01-15" },
445- associated_slice = partition_slices [0 ],
446- stream_name = stream_name ,
447- ),
448- Record (
449- data = {"a record key" : "a record value" , CURSOR_FIELD : "2022-01-16" },
450- associated_slice = partition_slices [0 ],
451- stream_name = stream_name ,
452- ),
453- ],
454- [
455- Record (
456- data = {"a record key" : "a record value" , CURSOR_FIELD : "2022-02-15" },
457- associated_slice = partition_slices [0 ],
458- stream_name = stream_name ,
459- )
460- ],
461- [
462- Record (
463- data = {"a record key" : "a record value" , CURSOR_FIELD : "2022-01-16" },
464- associated_slice = partition_slices [1 ],
465- stream_name = stream_name ,
466- )
467- ],
468- [],
469- [],
470- [
471- Record (
472- data = {"a record key" : "a record value" , CURSOR_FIELD : "2022-02-17" },
473- associated_slice = partition_slices [2 ],
474- stream_name = stream_name ,
475- )
476- ],
477- ]
478-
479- configured_stream = ConfiguredAirbyteStream (
480- stream = AirbyteStream (
481- name = "Rates" ,
482- json_schema = {},
483- supported_sync_modes = [SyncMode .full_refresh , SyncMode .incremental ],
484- ),
485- sync_mode = SyncMode .incremental ,
486- destination_sync_mode = DestinationSyncMode .append ,
487- )
488- catalog = ConfiguredAirbyteCatalog (streams = [configured_stream ])
489-
490- initial_state = [
491- AirbyteStateMessage (
492- type = AirbyteStateType .STREAM ,
493- stream = AirbyteStreamState (
494- stream_descriptor = StreamDescriptor (name = "post_comment_votes" , namespace = None ),
495- stream_state = AirbyteStateBlob (
496- {
497- "states" : [
498- {
499- "partition" : {"partition_field" : "1" },
500- "cursor" : {CURSOR_FIELD : "2022-01-01" },
501- },
502- {
503- "partition" : {"partition_field" : "2" },
504- "cursor" : {CURSOR_FIELD : "2022-01-02" },
505- },
506- {
507- "partition" : {"partition_field" : "3" },
508- "cursor" : {CURSOR_FIELD : "2022-01-03" },
509- },
510- ]
511- }
512- ),
513- ),
514- )
515- ]
516- logger = MagicMock ()
517-
518- source = ConcurrentDeclarativeSource (
519- source_config = ManifestBuilder ()
520- .with_list_partition_router (
521- stream_name = stream_name , cursor_field = "partition_field" , partitions = ["1" , "2" , "3" ]
522- )
523- .with_incremental_sync (
524- stream_name = stream_name ,
525- start_datetime = "2022-01-01" ,
526- end_datetime = "2022-02-28" ,
527- datetime_format = "%Y-%m-%d" ,
528- cursor_field = CURSOR_FIELD ,
529- step = "P1M" ,
530- cursor_granularity = "P1D" ,
531- )
532- .build (),
533- config = {},
534- catalog = catalog ,
535- state = initial_state ,
536- )
537-
538- # Use caplog to capture logs
539- with caplog .at_level (logging .INFO , logger = "airbyte" ):
540- with patch .object (SimpleRetriever , "_read_pages" , side_effect = records_list ):
541- with patch .object (ConcurrentPerPartitionCursor , "SWITCH_TO_GLOBAL_LIMIT" , 1 ):
542- output = list (source .read (logger , {}, catalog , initial_state ))
543-
544- # Check if the warning was logged
545- logged_messages = [record .message for record in caplog .records if record .levelname == "INFO" ]
546- warning_message = "Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of"
547- assert any (map (lambda message : warning_message in message , logged_messages ))
548-
549- final_state = [
550- orjson .loads (orjson .dumps (message .state .stream .stream_state ))
551- for message in output
552- if message .state
553- ]
554- assert final_state [- 1 ] == {
555- "lookback_window" : 1 ,
556- "state" : {"cursor_field" : "2022-02-17" },
557- "use_global_cursor" : True ,
558- }
559-
560-
561425def test_perpartition_with_fallback (caplog ):
562426 """
563427 Test that when the number of partitions exceeds the limit in PerPartitionCursor,
0 commit comments