@@ -402,6 +402,115 @@ def test_substream_partition_router_invalid_parent_record_type():
402402 _ = [s for s in partition_router .stream_slices ()]
403403
404404
405+ @pytest .mark .parametrize (
406+ "initial_state, expected_parent_state" ,
407+ [
408+ # Case 1: Empty initial state, no parent state expected
409+ ({}, {}),
410+ # Case 2: Initial state with no `parent_state`, migrate `updated_at` to `parent_stream_cursor`
411+ (
412+ {"updated_at" : "2023-05-27T00:00:00Z" },
413+ {"parent_stream_cursor" : "2023-05-27T00:00:00Z" },
414+ ),
415+ # Case 3: Initial state with global `state`, no migration expected
416+ (
417+ {"state" : {"updated" : "2023-05-27T00:00:00Z" }},
418+ {},
419+ ),
420+ # Case 4: Initial state with per-partition `states`, no migration expected
421+ (
422+ {
423+ "states" : [
424+ {
425+ "partition" : {
426+ "issue_id" : "10012" ,
427+ "parent_slice" : {
428+ "parent_slice" : {},
429+ "project_id" : "10000" ,
430+ },
431+ },
432+ "cursor" : {"updated" : "2021-01-01T00:00:00+0000" },
433+ },
434+ {
435+ "partition" : {
436+ "issue_id" : "10019" ,
437+ "parent_slice" : {
438+ "parent_slice" : {},
439+ "project_id" : "10000" ,
440+ },
441+ },
442+ "cursor" : {"updated" : "2021-01-01T00:00:00+0000" },
443+ },
444+ {
445+ "partition" : {
446+ "issue_id" : "10000" ,
447+ "parent_slice" : {
448+ "parent_slice" : {},
449+ "project_id" : "10000" ,
450+ },
451+ },
452+ "cursor" : {"updated" : "2021-01-01T00:00:00+0000" },
453+ },
454+ ]
455+ },
456+ {},
457+ ),
458+ # Case 5: Initial state with `parent_state`, existing parent state persists
459+ (
460+ {
461+ "parent_state" : {
462+ "parent_stream_name1" : {"parent_stream_cursor" : "2023-05-27T00:00:00Z" },
463+ },
464+ },
465+ {"parent_stream_cursor" : "2023-05-27T00:00:00Z" },
466+ ),
467+ ],
468+ ids = [
469+ "empty_initial_state" ,
470+ "initial_state_no_parent_legacy_state" ,
471+ "initial_state_no_parent_global_state" ,
472+ "initial_state_no_parent_per_partition_state" ,
473+ "initial_state_with_parent_state" ,
474+ ],
475+ )
476+ def test_set_initial_state (initial_state , expected_parent_state ):
477+ """
478+ Test the `set_initial_state` method of SubstreamPartitionRouter.
479+
480+ This test verifies that the method correctly handles different initial state formats
481+ and sets the appropriate parent stream state.
482+ """
483+ parent_stream = MockStream (
484+ slices = [{}],
485+ records = [],
486+ name = "parent_stream_name1" ,
487+ cursor_field = "parent_stream_cursor" ,
488+ )
489+ parent_stream .state = {}
490+ parent_stream_config = ParentStreamConfig (
491+ stream = parent_stream ,
492+ parent_key = "id" ,
493+ partition_field = "parent_stream_id" ,
494+ parameters = {},
495+ config = {},
496+ incremental_dependency = True ,
497+ )
498+
499+ partition_router = SubstreamPartitionRouter (
500+ parent_stream_configs = [parent_stream_config ],
501+ parameters = {},
502+ config = {},
503+ )
504+
505+ partition_router .set_initial_state (initial_state )
506+
507+ # Assert the state of the parent stream
508+ assert parent_stream .state == expected_parent_state , (
509+ f"Unexpected parent state. Initial state: { initial_state } , "
510+ f"Expected: { expected_parent_state } , Got: { parent_stream .state } "
511+ )
512+
513+
405514@pytest .mark .parametrize (
406515 "parent_stream_request_parameters, expected_req_params, expected_headers, expected_body_json, expected_body_data" ,
407516 [
0 commit comments