@@ -3077,24 +3077,29 @@ def test_given_no_partitions_processed_when_close_partition_then_no_state_update
30773077 assert mock_cursor .stream_slices .call_count == 0 # No calls since no partitions
30783078
30793079
3080- def test_given_new_partition_mid_sync_when_close_partition_then_update_state ():
3081- mock_cursor = MagicMock ()
3082- # Simulate one slice per cursor
3083- mock_cursor .stream_slices .side_effect = [
3084- iter (
3085- [
3086- {"slice1" : "data1" },
3087- {"slice2" : "data1" }, # First slice
3088- ]
3089- ),
3090- iter (
3091- [
3092- {"slice2" : "data2" },
3093- {"slice2" : "data2" }, # First slice for new partition
3094- ]
3095- ),
3096- ]
3097- mock_cursor .state = {"updated_at" : "2024-01-03T00:00:00Z" } # Set cursor state
3080+ def test_given_unfinished_first_parent_partition_no_parent_state_update ():
3081+ # Create two mock cursors with different states for each partition
3082+ mock_cursor_1 = MagicMock ()
3083+ mock_cursor_1 .stream_slices .return_value = iter (
3084+ [
3085+ {"slice1" : "data1" },
3086+ {"slice2" : "data1" }, # First partition slices
3087+ ]
3088+ )
3089+ mock_cursor_1 .state = {"updated_at" : "2024-01-01T00:00:00Z" } # State for partition "1"
3090+
3091+ mock_cursor_2 = MagicMock ()
3092+ mock_cursor_2 .stream_slices .return_value = iter (
3093+ [
3094+ {"slice2" : "data2" },
3095+ {"slice2" : "data2" }, # Second partition slices
3096+ ]
3097+ )
3098+ mock_cursor_2 .state = {"updated_at" : "2024-01-02T00:00:00Z" } # State for partition "2"
3099+
3100+ # Configure cursor factory to return different mock cursors based on partition
3101+ cursor_factory_mock = MagicMock ()
3102+ cursor_factory_mock .create .side_effect = [mock_cursor_1 , mock_cursor_2 ]
30983103
30993104 connector_state_converter = CustomFormatConcurrentStreamStateConverter (
31003105 datetime_format = "%Y-%m-%dT%H:%M:%SZ" ,
@@ -3103,8 +3108,89 @@ def test_given_new_partition_mid_sync_when_close_partition_then_update_state():
31033108 cursor_granularity = timedelta (0 ),
31043109 )
31053110
3111+ cursor = ConcurrentPerPartitionCursor (
3112+ cursor_factory = cursor_factory_mock ,
3113+ partition_router = MagicMock (),
3114+ stream_name = "test_stream" ,
3115+ stream_namespace = None ,
3116+ stream_state = {
3117+ "states" : [
3118+ {"partition" : {"id" : "1" }, "cursor" : {"updated_at" : "2024-01-01T00:00:00Z" }}
3119+ ],
3120+ "state" : {"updated_at" : "2024-01-01T00:00:00Z" },
3121+ "lookback_window" : 86400 ,
3122+ "parent_state" : {"posts" : {"updated_at" : "2024-01-01T00:00:00Z" }},
3123+ },
3124+ message_repository = MagicMock (),
3125+ connector_state_manager = MagicMock (),
3126+ connector_state_converter = connector_state_converter ,
3127+ cursor_field = CursorField (cursor_field_key = "updated_at" ),
3128+ )
3129+ partition_router = cursor ._partition_router
3130+ all_partitions = [
3131+ StreamSlice (partition = {"id" : "1" }, cursor_slice = {}, extra_fields = {}),
3132+ StreamSlice (partition = {"id" : "2" }, cursor_slice = {}, extra_fields = {}), # New partition
3133+ ]
3134+ partition_router .stream_slices .return_value = iter (all_partitions )
3135+ partition_router .get_stream_state .side_effect = [
3136+ {"posts" : {"updated_at" : "2024-01-04T00:00:00Z" }}, # Initial parent state
3137+ {"posts" : {"updated_at" : "2024-01-05T00:00:00Z" }}, # Updated parent state for new partition
3138+ ]
3139+
3140+ slices = list (cursor .stream_slices ())
3141+ # Close all partitions except from the first one
3142+ for slice in slices [1 :]:
3143+ cursor .close_partition (
3144+ DeclarativePartition ("test_stream" , {}, MagicMock (), MagicMock (), slice )
3145+ )
3146+ cursor .ensure_at_least_one_state_emitted ()
3147+ print (cursor .state )
3148+
3149+ state = cursor .state
3150+ assert state == {
3151+ "use_global_cursor" : False ,
3152+ "states" : [
3153+ {"partition" : {"id" : "1" }, "cursor" : {"updated_at" : "2024-01-01T00:00:00Z" }},
3154+ {"partition" : {"id" : "2" }, "cursor" : {"updated_at" : "2024-01-02T00:00:00Z" }},
3155+ ],
3156+ "state" : {"updated_at" : "2024-01-01T00:00:00Z" },
3157+ "lookback_window" : 86400 ,
3158+ "parent_state" : {"posts" : {"updated_at" : "2024-01-01T00:00:00Z" }},
3159+ }
3160+ assert mock_cursor_1 .stream_slices .call_count == 1 # Called once for each partition
3161+ assert mock_cursor_2 .stream_slices .call_count == 1 # Called once for each partition
3162+
3163+
3164+ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update ():
3165+ # Create two mock cursors with different states for each partition
3166+ mock_cursor_1 = MagicMock ()
3167+ mock_cursor_1 .stream_slices .return_value = iter (
3168+ [
3169+ {"slice1" : "data1" },
3170+ {"slice2" : "data1" }, # First partition slices
3171+ ]
3172+ )
3173+ mock_cursor_1 .state = {"updated_at" : "2024-01-02T00:00:00Z" } # State for partition "1"
3174+
3175+ mock_cursor_2 = MagicMock ()
3176+ mock_cursor_2 .stream_slices .return_value = iter (
3177+ [
3178+ {"slice2" : "data2" },
3179+ {"slice2" : "data2" }, # Second partition slices
3180+ ]
3181+ )
3182+ mock_cursor_2 .state = {"updated_at" : "2024-01-01T00:00:00Z" } # State for partition "2"
3183+
3184+ # Configure cursor factory to return different mock cursors based on partition
31063185 cursor_factory_mock = MagicMock ()
3107- cursor_factory_mock .create .return_value = mock_cursor
3186+ cursor_factory_mock .create .side_effect = [mock_cursor_1 , mock_cursor_2 ]
3187+
3188+ connector_state_converter = CustomFormatConcurrentStreamStateConverter (
3189+ datetime_format = "%Y-%m-%dT%H:%M:%SZ" ,
3190+ input_datetime_formats = ["%Y-%m-%dT%H:%M:%SZ" ],
3191+ is_sequential_state = True ,
3192+ cursor_granularity = timedelta (0 ),
3193+ )
31083194
31093195 cursor = ConcurrentPerPartitionCursor (
31103196 cursor_factory = cursor_factory_mock ,
@@ -3137,19 +3223,26 @@ def test_given_new_partition_mid_sync_when_close_partition_then_update_state():
31373223
31383224 slices = list (cursor .stream_slices ())
31393225 # Close all partitions except from the first one
3140- for slice in slices :
3226+ for slice in slices [: - 1 ] :
31413227 cursor .close_partition (
31423228 DeclarativePartition ("test_stream" , {}, MagicMock (), MagicMock (), slice )
31433229 )
3230+ cursor .ensure_at_least_one_state_emitted ()
3231+ print (cursor .state )
31443232
31453233 state = cursor .state
3146- assert state ["use_global_cursor" ] is False
3147- assert len (state ["states" ]) == 2 # Should now have two partitions
3148- assert any (p ["partition" ]["id" ] == "1" for p in state ["states" ])
3149- assert any (p ["partition" ]["id" ] == "2" for p in state ["states" ])
3150- assert state ["parent_state" ] == {"posts" : {"updated_at" : "2024-01-05T00:00:00Z" }}
3151- assert state ["lookback_window" ] == 86400
3152- assert mock_cursor .stream_slices .call_count == 2 # Called once for each partition
3234+ assert state == {
3235+ "use_global_cursor" : False ,
3236+ "states" : [
3237+ {"partition" : {"id" : "1" }, "cursor" : {"updated_at" : "2024-01-02T00:00:00Z" }},
3238+ {"partition" : {"id" : "2" }, "cursor" : {"updated_at" : "2024-01-01T00:00:00Z" }},
3239+ ],
3240+ "state" : {"updated_at" : "2024-01-01T00:00:00Z" },
3241+ "lookback_window" : 86400 ,
3242+ "parent_state" : {"posts" : {"updated_at" : "2024-01-04T00:00:00Z" }},
3243+ }
3244+ assert mock_cursor_1 .stream_slices .call_count == 1 # Called once for each partition
3245+ assert mock_cursor_2 .stream_slices .call_count == 1 # Called once for each partition
31533246
31543247
31553248def test_given_all_partitions_finished_when_close_partition_then_final_state_emitted ():
0 commit comments