@@ -383,6 +383,151 @@ def test_substream_partition_router(parent_stream_configs, expected_slices):
383383 assert slices == expected_slices
384384
385385
386+ @pytest .mark .parametrize (
387+ "parent_stream_configs, expected_slices" ,
388+ [
389+ (
390+ [
391+ ParentStreamConfig (
392+ stream = MockStream ([{}], parent_records , "first_stream" ),
393+ parent_key = "id" ,
394+ partition_field = "first_stream_id" ,
395+ parameters = {},
396+ config = {},
397+ )
398+ ],
399+ [
400+ {"first_stream_id" : 1 },
401+ {"first_stream_id" : 2 },
402+ ],
403+ ),
404+ (
405+ [
406+ ParentStreamConfig (
407+ stream = MockStream (parent_slices , all_parent_data , "first_stream" ),
408+ parent_key = "id" ,
409+ partition_field = "first_stream_id" ,
410+ parameters = {},
411+ config = {},
412+ )
413+ ],
414+ [
415+ {"first_stream_id" : 0 },
416+ {"first_stream_id" : 1 },
417+ {"first_stream_id" : 2 },
418+ ],
419+ ),
420+ (
421+ [
422+ ParentStreamConfig (
423+ stream = MockStream (
424+ [
425+ StreamSlice (partition = p , cursor_slice = {"start" : 0 , "end" : 1 })
426+ for p in parent_slices
427+ ],
428+ all_parent_data ,
429+ "first_stream" ,
430+ ),
431+ parent_key = "id" ,
432+ partition_field = "first_stream_id" ,
433+ parameters = {},
434+ config = {},
435+ )
436+ ],
437+ [
438+ {"first_stream_id" : 0 },
439+ {"first_stream_id" : 1 },
440+ {"first_stream_id" : 2 },
441+ ],
442+ ),
443+ (
444+ [
445+ ParentStreamConfig (
446+ stream = MockStream (
447+ parent_slices ,
448+ data_first_parent_slice + data_second_parent_slice ,
449+ "first_stream" ,
450+ ),
451+ parent_key = "id" ,
452+ partition_field = "first_stream_id" ,
453+ parameters = {},
454+ config = {},
455+ ),
456+ ParentStreamConfig (
457+ stream = MockStream (second_parent_stream_slice , more_records , "second_stream" ),
458+ parent_key = "id" ,
459+ partition_field = "second_stream_id" ,
460+ parameters = {},
461+ config = {},
462+ ),
463+ ],
464+ [
465+ {"first_stream_id" : 0 },
466+ {"first_stream_id" : 1 },
467+ {"first_stream_id" : 2 },
468+ {"second_stream_id" : 10 },
469+ {"second_stream_id" : 20 },
470+ ],
471+ ),
472+ (
473+ [
474+ ParentStreamConfig (
475+ stream = MockStream (
476+ [{}], [{"id" : 0 }, {"id" : 1 }, {"_id" : 2 }, {"id" : 3 }], "first_stream"
477+ ),
478+ parent_key = "id" ,
479+ partition_field = "first_stream_id" ,
480+ parameters = {},
481+ config = {},
482+ )
483+ ],
484+ [
485+ {"first_stream_id" : 0 },
486+ {"first_stream_id" : 1 },
487+ {"first_stream_id" : 3 },
488+ ],
489+ ),
490+ (
491+ [
492+ ParentStreamConfig (
493+ stream = MockStream (
494+ [{}],
495+ [{"a" : {"b" : 0 }}, {"a" : {"b" : 1 }}, {"a" : {"c" : 2 }}, {"a" : {"b" : 3 }}],
496+ "first_stream" ,
497+ ),
498+ parent_key = "a/b" ,
499+ partition_field = "first_stream_id" ,
500+ parameters = {},
501+ config = {},
502+ )
503+ ],
504+ [
505+ {"first_stream_id" : 0 },
506+ {"first_stream_id" : 1 },
507+ {"first_stream_id" : 3 },
508+ ],
509+ ),
510+ ],
511+ ids = [
512+ "test_single_parent_slices_with_records" ,
513+ "test_with_parent_slices_and_records" ,
514+ "test_multiple_parent_streams" ,
515+ "test_cursor_values_are_removed_from_parent_slices" ,
516+ "test_missed_parent_key" ,
517+ "test_dpath_extraction" ,
518+ ],
519+ )
520+ def test_substream_partition_router_without_parent_slice (parent_stream_configs , expected_slices ):
521+ partition_router = SubstreamPartitionRouter (
522+ parent_stream_configs = parent_stream_configs ,
523+ parameters = {},
524+ config = {},
525+ include_parent_slice = False ,
526+ )
527+ slices = [s for s in partition_router .stream_slices ()]
528+ assert slices == expected_slices
529+
530+
386531def test_substream_partition_router_invalid_parent_record_type ():
387532 partition_router = SubstreamPartitionRouter (
388533 parent_stream_configs = [
0 commit comments