@@ -312,29 +312,27 @@ ensure_monitors_test(_) ->
312312
313313 ok .
314314
315- handle_connection_down_test (_ ) ->
315+ handle_connection_down_sac_should_get_activated_test (_ ) ->
316316 Stream = <<" stream" >>,
317317 ConsumerName = <<" app" >>,
318318 GroupId = {<<" /" >>, Stream , ConsumerName },
319319 Pid0 = self (),
320320 Pid1 = spawn (fun () -> ok end ),
321- Group =
322- cgroup ([consumer (Pid0 , 0 , true ), consumer (Pid1 , 1 , false ),
323- consumer (Pid0 , 2 , false )]),
324- State0 =
325- state (#{GroupId => Group },
326- #{Pid0 => maps :from_list ([{GroupId , true }]),
327- Pid1 => maps :from_list ([{GroupId , true }])}),
321+ Group = cgroup ([consumer (Pid0 , 0 , true ),
322+ consumer (Pid1 , 1 , false ),
323+ consumer (Pid0 , 2 , false )]),
324+ State0 = state (#{GroupId => Group },
325+ #{Pid0 => maps :from_list ([{GroupId , true }]),
326+ Pid1 => maps :from_list ([{GroupId , true }])}),
328327
329328 {#? STATE {pids_groups = PidsGroups1 , groups = Groups1 } = State1 ,
330329 Effects1 } =
331- rabbit_stream_sac_coordinator :handle_connection_down (Pid0 , State0 ),
330+ rabbit_stream_sac_coordinator :handle_connection_down (Pid0 , State0 ),
332331 assertSize (1 , PidsGroups1 ),
333332 assertSize (1 , maps :get (Pid1 , PidsGroups1 )),
334333 assertSendMessageEffect (Pid1 , 1 , Stream , ConsumerName , true , Effects1 ),
335- ? assertEqual (#{GroupId => cgroup ([consumer (Pid1 , 1 , true )])},
336- Groups1 ),
337- {#? STATE {pids_groups = PidsGroups2 , groups = Groups2 } = _State2 ,
334+ assertHasGroup (GroupId , cgroup ([consumer (Pid1 , 1 , true )]), Groups1 ),
335+ {#? STATE {pids_groups = PidsGroups2 , groups = Groups2 },
338336 Effects2 } =
339337 rabbit_stream_sac_coordinator :handle_connection_down (Pid1 , State1 ),
340338 assertEmpty (PidsGroups2 ),
@@ -343,6 +341,168 @@ handle_connection_down_test(_) ->
343341
344342 ok .
345343
344+ handle_connection_down_sac_active_does_not_change_test (_ ) ->
345+ Stream = <<" stream" >>,
346+ ConsumerName = <<" app" >>,
347+ GroupId = {<<" /" >>, Stream , ConsumerName },
348+ Pid0 = self (),
349+ Pid1 = spawn (fun () -> ok end ),
350+ Group = cgroup ([consumer (Pid1 , 0 , true ),
351+ consumer (Pid0 , 1 , false ),
352+ consumer (Pid0 , 2 , false )]),
353+ State = state (#{GroupId => Group },
354+ #{Pid0 => maps :from_list ([{GroupId , true }]),
355+ Pid1 => maps :from_list ([{GroupId , true }])}),
356+
357+ {#? STATE {pids_groups = PidsGroups , groups = Groups },
358+ Effects } =
359+ rabbit_stream_sac_coordinator :handle_connection_down (Pid0 , State ),
360+ assertSize (1 , PidsGroups ),
361+ assertSize (1 , maps :get (Pid1 , PidsGroups )),
362+ assertEmpty (Effects ),
363+ assertHasGroup (GroupId , cgroup ([consumer (Pid1 , 0 , true )]), Groups ),
364+ ok .
365+
366+ handle_connection_down_sac_no_more_consumers_test (_ ) ->
367+ Stream = <<" stream" >>,
368+ ConsumerName = <<" app" >>,
369+ GroupId = {<<" /" >>, Stream , ConsumerName },
370+ Pid0 = self (),
371+ Group = cgroup ([consumer (Pid0 , 0 , true ),
372+ consumer (Pid0 , 1 , false )]),
373+ State = state (#{GroupId => Group },
374+ #{Pid0 => maps :from_list ([{GroupId , true }])}),
375+
376+ {#? STATE {pids_groups = PidsGroups , groups = Groups },
377+ Effects } =
378+ rabbit_stream_sac_coordinator :handle_connection_down (Pid0 , State ),
379+ assertEmpty (PidsGroups ),
380+ assertEmpty (Groups ),
381+ assertEmpty (Effects ),
382+ ok .
383+
384+ handle_connection_down_sac_no_consumers_in_down_connection_test (_ ) ->
385+ Stream = <<" stream" >>,
386+ ConsumerName = <<" app" >>,
387+ GroupId = {<<" /" >>, Stream , ConsumerName },
388+ Pid0 = self (),
389+ Pid1 = spawn (fun () -> ok end ),
390+ Group = cgroup ([consumer (Pid1 , 0 , true ),
391+ consumer (Pid1 , 1 , false )]),
392+ State = state (#{GroupId => Group },
393+ #{Pid0 => maps :from_list ([{GroupId , true }]), % % should not be there
394+ Pid1 => maps :from_list ([{GroupId , true }])}),
395+
396+ {#? STATE {pids_groups = PidsGroups , groups = Groups },
397+ Effects } =
398+ rabbit_stream_sac_coordinator :handle_connection_down (Pid0 , State ),
399+
400+ assertSize (1 , PidsGroups ),
401+ assertSize (1 , maps :get (Pid1 , PidsGroups )),
402+ assertEmpty (Effects ),
403+ assertHasGroup (GroupId , cgroup ([consumer (Pid1 , 0 , true ), consumer (Pid1 , 1 , false )]),
404+ Groups ),
405+ ok .
406+
407+ handle_connection_down_super_stream_active_stays_test (_ ) ->
408+ Stream = <<" stream" >>,
409+ ConsumerName = <<" app" >>,
410+ GroupId = {<<" /" >>, Stream , ConsumerName },
411+ Pid0 = self (),
412+ Pid1 = spawn (fun () -> ok end ),
413+ Group = cgroup (1 , [consumer (Pid0 , 0 , false ),
414+ consumer (Pid0 , 1 , true ),
415+ consumer (Pid1 , 2 , false ),
416+ consumer (Pid1 , 3 , false )]),
417+ State = state (#{GroupId => Group },
418+ #{Pid0 => maps :from_list ([{GroupId , true }]),
419+ Pid1 => maps :from_list ([{GroupId , true }])}),
420+
421+ {#? STATE {pids_groups = PidsGroups , groups = Groups },
422+ Effects } =
423+ rabbit_stream_sac_coordinator :handle_connection_down (Pid1 , State ),
424+ assertSize (1 , PidsGroups ),
425+ assertSize (1 , maps :get (Pid0 , PidsGroups )),
426+ assertEmpty (Effects ),
427+ assertHasGroup (GroupId , cgroup (1 , [consumer (Pid0 , 0 , false ), consumer (Pid0 , 1 , true )]),
428+ Groups ),
429+ ok .
430+
431+ handle_connection_down_super_stream_active_changes_test (_ ) ->
432+ Stream = <<" stream" >>,
433+ ConsumerName = <<" app" >>,
434+ GroupId = {<<" /" >>, Stream , ConsumerName },
435+ Pid0 = self (),
436+ Pid1 = spawn (fun () -> ok end ),
437+ Group = cgroup (1 , [consumer (Pid0 , 0 , false ),
438+ consumer (Pid1 , 1 , true ),
439+ consumer (Pid0 , 2 , false ),
440+ consumer (Pid1 , 3 , false )]),
441+ State = state (#{GroupId => Group },
442+ #{Pid0 => maps :from_list ([{GroupId , true }]),
443+ Pid1 => maps :from_list ([{GroupId , true }])}),
444+
445+ {#? STATE {pids_groups = PidsGroups , groups = Groups },
446+ Effects } =
447+ rabbit_stream_sac_coordinator :handle_connection_down (Pid0 , State ),
448+ assertSize (1 , PidsGroups ),
449+ assertSize (1 , maps :get (Pid1 , PidsGroups )),
450+ assertSendMessageSteppingDownEffect (Pid1 , 1 , Stream , ConsumerName , Effects ),
451+ assertHasGroup (GroupId , cgroup (1 , [consumer (Pid1 , 1 , false ), consumer (Pid1 , 3 , false )]),
452+ Groups ),
453+ ok .
454+
455+ handle_connection_down_super_stream_activate_in_remaining_connection_test (_ ) ->
456+ Stream = <<" stream" >>,
457+ ConsumerName = <<" app" >>,
458+ GroupId = {<<" /" >>, Stream , ConsumerName },
459+ Pid0 = self (),
460+ Pid1 = spawn (fun () -> ok end ),
461+ Group = cgroup (1 , [consumer (Pid0 , 0 , false ),
462+ consumer (Pid0 , 1 , true ),
463+ consumer (Pid1 , 2 , false ),
464+ consumer (Pid1 , 3 , false )]),
465+ State = state (#{GroupId => Group },
466+ #{Pid0 => maps :from_list ([{GroupId , true }]),
467+ Pid1 => maps :from_list ([{GroupId , true }])}),
468+
469+ {#? STATE {pids_groups = PidsGroups , groups = Groups },
470+ Effects } =
471+ rabbit_stream_sac_coordinator :handle_connection_down (Pid0 , State ),
472+ assertSize (1 , PidsGroups ),
473+ assertSize (1 , maps :get (Pid1 , PidsGroups )),
474+ assertSendMessageEffect (Pid1 , 3 , Stream , ConsumerName , true , Effects ),
475+ assertHasGroup (GroupId , cgroup (1 , [consumer (Pid1 , 2 , false ), consumer (Pid1 , 3 , true )]),
476+ Groups ),
477+ ok .
478+
479+ handle_connection_down_super_stream_no_active_removed_or_present_test (_ ) ->
480+ Stream = <<" stream" >>,
481+ ConsumerName = <<" app" >>,
482+ GroupId = {<<" /" >>, Stream , ConsumerName },
483+ Pid0 = self (),
484+ Pid1 = spawn (fun () -> ok end ),
485+ % % this is a weird case that should not happen in the wild,
486+ % % we test the logic in the code nevertheless.
487+ % % No active consumer in the group
488+ Group = cgroup (1 , [consumer (Pid0 , 0 , false ),
489+ consumer (Pid0 , 1 , false ),
490+ consumer (Pid1 , 2 , false ),
491+ consumer (Pid1 , 3 , false )]),
492+ State = state (#{GroupId => Group },
493+ #{Pid0 => maps :from_list ([{GroupId , true }]),
494+ Pid1 => maps :from_list ([{GroupId , true }])}),
495+
496+ {#? STATE {pids_groups = PidsGroups , groups = Groups },
497+ Effects } =
498+ rabbit_stream_sac_coordinator :handle_connection_down (Pid0 , State ),
499+ assertSize (1 , PidsGroups ),
500+ assertSize (1 , maps :get (Pid1 , PidsGroups )),
501+ assertEmpty (Effects ),
502+ assertHasGroup (GroupId , cgroup (1 , [consumer (Pid1 , 2 , false ), consumer (Pid1 , 3 , false )]),
503+ Groups ),
504+ ok .
505+
346506assertSize (Expected , []) ->
347507 ? assertEqual (Expected , 0 );
348508assertSize (Expected , Map ) when is_map (Map ) ->
@@ -353,6 +513,9 @@ assertSize(Expected, List) when is_list(List) ->
353513assertEmpty (Data ) ->
354514 assertSize (0 , Data ).
355515
516+ assertHasGroup (GroupId , Group , Groups ) ->
517+ ? assertEqual (#{GroupId => Group }, Groups ).
518+
356519consumer (Pid , SubId , Active ) ->
357520 # consumer {pid = Pid ,
358521 subscription_id = SubId ,
0 commit comments