@@ -231,7 +231,9 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
231231 of
232232 {value , Consumer } ->
233233 G1 = remove_from_group (Consumer , Group0 ),
234- handle_consumer_removal (G1 , Stream , ConsumerName , Consumer # consumer .active );
234+ handle_consumer_removal (
235+ G1 , Stream , ConsumerName ,
236+ is_active (Consumer # consumer .active ));
235237 false ->
236238 {Group0 , []}
237239 end ,
@@ -254,11 +256,12 @@ apply(#command_activate_consumer{vhost = VirtualHost,
254256 " the group does not longer exist" ,
255257 [{VirtualHost , Stream , ConsumerName }]),
256258 {undefined , []};
257- Group ->
259+ Group0 ->
260+ Group1 = update_consumers (Group0 , waiting ),
258261 # consumer {pid = Pid , subscription_id = SubId } =
259- evaluate_active_consumer (Group ),
260- Group1 = update_consumer_state_in_group (Group , Pid , SubId , true ),
261- {Group1 , [notify_consumer_effect (Pid , SubId , Stream , ConsumerName , true )]}
262+ evaluate_active_consumer (Group1 ),
263+ Group2 = update_consumer_state_in_group (Group1 , Pid , SubId , active ),
264+ {Group2 , [notify_consumer_effect (Pid , SubId , Stream , ConsumerName , true )]}
262265 end ,
263266 StreamGroups1 =
264267 update_groups (VirtualHost , Stream , ConsumerName , G , StreamGroups0 ),
@@ -324,12 +327,8 @@ group_consumers(VirtualHost,
324327 [{connection_name ,
325328 Owner }
326329 | RecAcc ];
327- (state , RecAcc )
328- when Active ->
329- [{state , active }
330- | RecAcc ];
331330 (state , RecAcc ) ->
332- [{state , inactive }
331+ [{state , Active }
333332 | RecAcc ];
334333 (Unknown , RecAcc ) ->
335334 [{Unknown ,
@@ -434,12 +433,13 @@ handle_group_after_connection_down(Pid,
434433 % % remove the connection consumers from the group state
435434 % % keep flags to know what happened
436435 {Consumers1 , ActiveRemoved , AnyRemoved } =
437- lists :foldl (
438- fun (# consumer {pid = P , active = S }, {L , ActiveFlag , _ }) when P == Pid ->
439- {L , S or ActiveFlag , true };
440- (C , {L , ActiveFlag , AnyFlag }) ->
441- {L ++ [C ], ActiveFlag , AnyFlag }
442- end , {[], false , false }, Consumers0 ),
436+ lists :foldl (
437+ fun (# consumer {pid = P , active = S }, {L , ActiveFlag , _ })
438+ when P == Pid ->
439+ {L , is_active (S ) or ActiveFlag , true };
440+ (C , {L , ActiveFlag , AnyFlag }) ->
441+ {L ++ [C ], ActiveFlag , AnyFlag }
442+ end , {[], false , false }, Consumers0 ),
443443
444444 case AnyRemoved of
445445 true ->
@@ -456,6 +456,11 @@ handle_group_after_connection_down(Pid,
456456 end
457457 end .
458458
459+ is_active (waiting ) ->
460+ false ;
461+ is_active (_ ) ->
462+ true .
463+
459464do_register_consumer (VirtualHost ,
460465 Stream ,
461466 - 1 = _PartitionIndex ,
@@ -473,12 +478,12 @@ do_register_consumer(VirtualHost,
473478 # consumer {pid = ConnectionPid ,
474479 owner = Owner ,
475480 subscription_id = SubscriptionId ,
476- active = false };
481+ active = waiting };
477482 false ->
478483 # consumer {pid = ConnectionPid ,
479484 subscription_id = SubscriptionId ,
480485 owner = Owner ,
481- active = true }
486+ active = active }
482487 end ,
483488 Group1 = add_to_group (Consumer , Group0 ),
484489 StreamGroups1 =
@@ -491,14 +496,14 @@ do_register_consumer(VirtualHost,
491496 # consumer {active = Active } = Consumer ,
492497 Effects =
493498 case Active of
494- true ->
499+ active ->
495500 [notify_consumer_effect (ConnectionPid , SubscriptionId ,
496- Stream , ConsumerName , Active )];
501+ Stream , ConsumerName , is_active ( Active ) )];
497502 _ ->
498503 []
499504 end ,
500505
501- {State #? MODULE {groups = StreamGroups1 }, {ok , Active }, Effects };
506+ {State #? MODULE {groups = StreamGroups1 }, {ok , is_active ( Active ) }, Effects };
502507do_register_consumer (VirtualHost ,
503508 Stream ,
504509 _PartitionIndex ,
@@ -518,7 +523,7 @@ do_register_consumer(VirtualHost,
518523 # consumer {pid = ConnectionPid ,
519524 owner = Owner ,
520525 subscription_id = SubscriptionId ,
521- active = true },
526+ active = active },
522527 G1 = add_to_group (Consumer0 , Group0 ),
523528 {G1 ,
524529 [notify_consumer_effect (ConnectionPid , SubscriptionId ,
@@ -529,7 +534,7 @@ do_register_consumer(VirtualHost,
529534 # consumer {pid = ConnectionPid ,
530535 owner = Owner ,
531536 subscription_id = SubscriptionId ,
532- active = false },
537+ active = waiting },
533538 G1 = add_to_group (Consumer0 , Group0 ),
534539
535540 case lookup_active_consumer (G1 ) of
@@ -545,7 +550,7 @@ do_register_consumer(VirtualHost,
545550 {update_consumer_state_in_group (G1 ,
546551 ActPid ,
547552 ActSubId ,
548- false ),
553+ deactivating ),
549554 [notify_consumer_effect (ActPid ,
550555 ActSubId ,
551556 Stream ,
@@ -567,7 +572,7 @@ do_register_consumer(VirtualHost,
567572 StreamGroups0 ),
568573 {value , # consumer {active = Active }} =
569574 lookup_consumer (ConnectionPid , SubscriptionId , Group1 ),
570- {State #? MODULE {groups = StreamGroups1 }, {ok , Active }, Effects }.
575+ {State #? MODULE {groups = StreamGroups1 }, {ok , is_active ( Active ) }, Effects }.
571576
572577handle_consumer_removal (# group {consumers = []} = G , _ , _ , _ ) ->
573578 {G , []};
@@ -603,7 +608,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
603608 {update_consumer_state_in_group (Group0 ,
604609 ActPid ,
605610 ActSubId ,
606- false ),
611+ deactivating ),
607612 [notify_consumer_effect (ActPid , ActSubId ,
608613 Stream , ConsumerName , false , true )]}
609614 end ;
@@ -613,7 +618,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
613618 % % the active one is going away, picking a new one
614619 # consumer {pid = P , subscription_id = SID } =
615620 evaluate_active_consumer (Group0 ),
616- {update_consumer_state_in_group (Group0 , P , SID , true ),
621+ {update_consumer_state_in_group (Group0 , P , SID , active ),
617622 [notify_consumer_effect (P , SID ,
618623 Stream , ConsumerName , true )]};
619624 false ->
@@ -683,13 +688,13 @@ compute_active_consumer(#group{consumers = Crs,
683688compute_active_consumer (# group {partition_index = - 1 ,
684689 consumers = [Consumer0 ]} =
685690 Group0 ) ->
686- Consumer1 = Consumer0 # consumer {active = true },
691+ Consumer1 = Consumer0 # consumer {active = active },
687692 Group0 # group {consumers = [Consumer1 ]};
688693compute_active_consumer (# group {partition_index = - 1 ,
689694 consumers = [Consumer0 | T ]} =
690695 Group0 ) ->
691- Consumer1 = Consumer0 # consumer {active = true },
692- Consumers = lists :map (fun (C ) -> C # consumer {active = false } end , T ),
696+ Consumer1 = Consumer0 # consumer {active = active },
697+ Consumers = lists :map (fun (C ) -> C # consumer {active = waiting } end , T ),
693698 Group0 # group {consumers = [Consumer1 ] ++ Consumers }.
694699
695700evaluate_active_consumer (# group {partition_index = PartitionIndex ,
@@ -706,7 +711,7 @@ lookup_consumer(ConnectionPid, SubscriptionId,
706711 Consumers ).
707712
708713lookup_active_consumer (# group {consumers = Consumers }) ->
709- lists :search (fun (# consumer {active = Active }) -> Active end ,
714+ lists :search (fun (# consumer {active = Active }) -> is_active ( Active ) end ,
710715 Consumers ).
711716
712717update_groups (_VirtualHost ,
@@ -743,6 +748,12 @@ update_consumer_state_in_group(#group{consumers = Consumers0} = G,
743748 Consumers0 ),
744749 G # group {consumers = CS1 }.
745750
751+ update_consumers (# group {consumers = Consumers0 } = G , NewState ) ->
752+ Consumers1 = lists :map (fun (C ) ->
753+ C # consumer {active = NewState }
754+ end , Consumers0 ),
755+ G # group {consumers = Consumers1 }.
756+
746757mod_call_effect (Pid , Msg ) ->
747758 {mod_call , rabbit_stream_sac_coordinator , send_message , [Pid , Msg ]}.
748759
0 commit comments