@@ -229,7 +229,9 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
229229 of
230230 {value , Consumer } ->
231231 G1 = remove_from_group (Consumer , Group0 ),
232- handle_consumer_removal (G1 , Stream , ConsumerName , Consumer # consumer .active );
232+ handle_consumer_removal (
233+ G1 , Stream , ConsumerName ,
234+ is_active (Consumer # consumer .active ));
233235 false ->
234236 {Group0 , []}
235237 end ,
@@ -252,11 +254,12 @@ apply(#command_activate_consumer{vhost = VirtualHost,
252254 " the group does not longer exist" ,
253255 [{VirtualHost , Stream , ConsumerName }]),
254256 {undefined , []};
255- Group ->
257+ Group0 ->
258+ Group1 = update_consumers (Group0 , waiting ),
256259 # consumer {pid = Pid , subscription_id = SubId } =
257- evaluate_active_consumer (Group ),
258- Group1 = update_consumer_state_in_group (Group , Pid , SubId , true ),
259- {Group1 , [notify_consumer_effect (Pid , SubId , Stream , ConsumerName , true )]}
260+ evaluate_active_consumer (Group1 ),
261+ Group2 = update_consumer_state_in_group (Group1 , Pid , SubId , active ),
262+ {Group2 , [notify_consumer_effect (Pid , SubId , Stream , ConsumerName , true )]}
260263 end ,
261264 StreamGroups1 =
262265 update_groups (VirtualHost , Stream , ConsumerName , G , StreamGroups0 ),
@@ -322,12 +325,8 @@ group_consumers(VirtualHost,
322325 [{connection_name ,
323326 Owner }
324327 | RecAcc ];
325- (state , RecAcc )
326- when Active ->
327- [{state , active }
328- | RecAcc ];
329328 (state , RecAcc ) ->
330- [{state , inactive }
329+ [{state , Active }
331330 | RecAcc ];
332331 (Unknown , RecAcc ) ->
333332 [{Unknown ,
@@ -432,12 +431,13 @@ handle_group_after_connection_down(Pid,
432431 % % remove the connection consumers from the group state
433432 % % keep flags to know what happened
434433 {Consumers1 , ActiveRemoved , AnyRemoved } =
435- lists :foldl (
436- fun (# consumer {pid = P , active = S }, {L , ActiveFlag , _ }) when P == Pid ->
437- {L , S or ActiveFlag , true };
438- (C , {L , ActiveFlag , AnyFlag }) ->
439- {L ++ [C ], ActiveFlag , AnyFlag }
440- end , {[], false , false }, Consumers0 ),
434+ lists :foldl (
435+ fun (# consumer {pid = P , active = S }, {L , ActiveFlag , _ })
436+ when P == Pid ->
437+ {L , is_active (S ) or ActiveFlag , true };
438+ (C , {L , ActiveFlag , AnyFlag }) ->
439+ {L ++ [C ], ActiveFlag , AnyFlag }
440+ end , {[], false , false }, Consumers0 ),
441441
442442 case AnyRemoved of
443443 true ->
@@ -454,6 +454,11 @@ handle_group_after_connection_down(Pid,
454454 end
455455 end .
456456
457+ is_active (waiting ) ->
458+ false ;
459+ is_active (_ ) ->
460+ true .
461+
457462do_register_consumer (VirtualHost ,
458463 Stream ,
459464 - 1 = _PartitionIndex ,
@@ -471,12 +476,12 @@ do_register_consumer(VirtualHost,
471476 # consumer {pid = ConnectionPid ,
472477 owner = Owner ,
473478 subscription_id = SubscriptionId ,
474- active = false };
479+ active = waiting };
475480 false ->
476481 # consumer {pid = ConnectionPid ,
477482 subscription_id = SubscriptionId ,
478483 owner = Owner ,
479- active = true }
484+ active = active }
480485 end ,
481486 Group1 = add_to_group (Consumer , Group0 ),
482487 StreamGroups1 =
@@ -489,14 +494,14 @@ do_register_consumer(VirtualHost,
489494 # consumer {active = Active } = Consumer ,
490495 Effects =
491496 case Active of
492- true ->
497+ active ->
493498 [notify_consumer_effect (ConnectionPid , SubscriptionId ,
494- Stream , ConsumerName , Active )];
499+ Stream , ConsumerName , is_active ( Active ) )];
495500 _ ->
496501 []
497502 end ,
498503
499- {State #? MODULE {groups = StreamGroups1 }, {ok , Active }, Effects };
504+ {State #? MODULE {groups = StreamGroups1 }, {ok , is_active ( Active ) }, Effects };
500505do_register_consumer (VirtualHost ,
501506 Stream ,
502507 _PartitionIndex ,
@@ -516,7 +521,7 @@ do_register_consumer(VirtualHost,
516521 # consumer {pid = ConnectionPid ,
517522 owner = Owner ,
518523 subscription_id = SubscriptionId ,
519- active = true },
524+ active = active },
520525 G1 = add_to_group (Consumer0 , Group0 ),
521526 {G1 ,
522527 [notify_consumer_effect (ConnectionPid , SubscriptionId ,
@@ -527,7 +532,7 @@ do_register_consumer(VirtualHost,
527532 # consumer {pid = ConnectionPid ,
528533 owner = Owner ,
529534 subscription_id = SubscriptionId ,
530- active = false },
535+ active = waiting },
531536 G1 = add_to_group (Consumer0 , Group0 ),
532537
533538 case lookup_active_consumer (G1 ) of
@@ -543,7 +548,7 @@ do_register_consumer(VirtualHost,
543548 {update_consumer_state_in_group (G1 ,
544549 ActPid ,
545550 ActSubId ,
546- false ),
551+ deactivating ),
547552 [notify_consumer_effect (ActPid ,
548553 ActSubId ,
549554 Stream ,
@@ -565,7 +570,7 @@ do_register_consumer(VirtualHost,
565570 StreamGroups0 ),
566571 {value , # consumer {active = Active }} =
567572 lookup_consumer (ConnectionPid , SubscriptionId , Group1 ),
568- {State #? MODULE {groups = StreamGroups1 }, {ok , Active }, Effects }.
573+ {State #? MODULE {groups = StreamGroups1 }, {ok , is_active ( Active ) }, Effects }.
569574
570575handle_consumer_removal (# group {consumers = []} = G , _ , _ , _ ) ->
571576 {G , []};
@@ -601,7 +606,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
601606 {update_consumer_state_in_group (Group0 ,
602607 ActPid ,
603608 ActSubId ,
604- false ),
609+ deactivating ),
605610 [notify_consumer_effect (ActPid , ActSubId ,
606611 Stream , ConsumerName , false , true )]}
607612 end ;
@@ -611,7 +616,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
611616 % % the active one is going away, picking a new one
612617 # consumer {pid = P , subscription_id = SID } =
613618 evaluate_active_consumer (Group0 ),
614- {update_consumer_state_in_group (Group0 , P , SID , true ),
619+ {update_consumer_state_in_group (Group0 , P , SID , active ),
615620 [notify_consumer_effect (P , SID ,
616621 Stream , ConsumerName , true )]};
617622 false ->
@@ -678,13 +683,13 @@ compute_active_consumer(#group{consumers = Crs,
678683compute_active_consumer (# group {partition_index = - 1 ,
679684 consumers = [Consumer0 ]} =
680685 Group0 ) ->
681- Consumer1 = Consumer0 # consumer {active = true },
686+ Consumer1 = Consumer0 # consumer {active = active },
682687 Group0 # group {consumers = [Consumer1 ]};
683688compute_active_consumer (# group {partition_index = - 1 ,
684689 consumers = [Consumer0 | T ]} =
685690 Group0 ) ->
686- Consumer1 = Consumer0 # consumer {active = true },
687- Consumers = lists :map (fun (C ) -> C # consumer {active = false } end , T ),
691+ Consumer1 = Consumer0 # consumer {active = active },
692+ Consumers = lists :map (fun (C ) -> C # consumer {active = waiting } end , T ),
688693 Group0 # group {consumers = [Consumer1 ] ++ Consumers }.
689694
690695evaluate_active_consumer (# group {partition_index = PartitionIndex ,
@@ -701,7 +706,7 @@ lookup_consumer(ConnectionPid, SubscriptionId,
701706 Consumers ).
702707
703708lookup_active_consumer (# group {consumers = Consumers }) ->
704- lists :search (fun (# consumer {active = Active }) -> Active end ,
709+ lists :search (fun (# consumer {active = Active }) -> is_active ( Active ) end ,
705710 Consumers ).
706711
707712update_groups (_VirtualHost ,
@@ -738,6 +743,12 @@ update_consumer_state_in_group(#group{consumers = Consumers0} = G,
738743 Consumers0 ),
739744 G # group {consumers = CS1 }.
740745
746+ update_consumers (# group {consumers = Consumers0 } = G , NewState ) ->
747+ Consumers1 = lists :map (fun (C ) ->
748+ C # consumer {active = NewState }
749+ end , Consumers0 ),
750+ G # group {consumers = Consumers1 }.
751+
741752mod_call_effect (Pid , Msg ) ->
742753 {mod_call , rabbit_stream_sac_coordinator , send_message , [Pid , Msg ]}.
743754
0 commit comments