Skip to content

Commit dd87c32

Browse files
committed
Introduce stream SAC status instead of active flag
1 parent 82480e4 commit dd87c32

File tree

3 files changed

+139
-87
lines changed

3 files changed

+139
-87
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,9 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
229229
of
230230
{value, Consumer} ->
231231
G1 = remove_from_group(Consumer, Group0),
232-
handle_consumer_removal(G1, Stream, ConsumerName, Consumer#consumer.active);
232+
handle_consumer_removal(
233+
G1, Stream, ConsumerName,
234+
is_active(Consumer#consumer.active));
233235
false ->
234236
{Group0, []}
235237
end,
@@ -252,11 +254,12 @@ apply(#command_activate_consumer{vhost = VirtualHost,
252254
"the group does not longer exist",
253255
[{VirtualHost, Stream, ConsumerName}]),
254256
{undefined, []};
255-
Group ->
257+
Group0 ->
258+
Group1 = update_consumers(Group0, waiting),
256259
#consumer{pid = Pid, subscription_id = SubId} =
257-
evaluate_active_consumer(Group),
258-
Group1 = update_consumer_state_in_group(Group, Pid, SubId, true),
259-
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}
260+
evaluate_active_consumer(Group1),
261+
Group2 = update_consumer_state_in_group(Group1, Pid, SubId, active),
262+
{Group2, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}
260263
end,
261264
StreamGroups1 =
262265
update_groups(VirtualHost, Stream, ConsumerName, G, StreamGroups0),
@@ -322,12 +325,8 @@ group_consumers(VirtualHost,
322325
[{connection_name,
323326
Owner}
324327
| RecAcc];
325-
(state, RecAcc)
326-
when Active ->
327-
[{state, active}
328-
| RecAcc];
329328
(state, RecAcc) ->
330-
[{state, inactive}
329+
[{state, Active}
331330
| RecAcc];
332331
(Unknown, RecAcc) ->
333332
[{Unknown,
@@ -432,12 +431,13 @@ handle_group_after_connection_down(Pid,
432431
%% remove the connection consumers from the group state
433432
%% keep flags to know what happened
434433
{Consumers1, ActiveRemoved, AnyRemoved} =
435-
lists:foldl(
436-
fun(#consumer{pid = P, active = S}, {L, ActiveFlag, _}) when P == Pid ->
437-
{L, S or ActiveFlag, true};
438-
(C, {L, ActiveFlag, AnyFlag}) ->
439-
{L ++ [C], ActiveFlag, AnyFlag}
440-
end, {[], false, false}, Consumers0),
434+
lists:foldl(
435+
fun(#consumer{pid = P, active = S}, {L, ActiveFlag, _})
436+
when P == Pid ->
437+
{L, is_active(S) or ActiveFlag, true};
438+
(C, {L, ActiveFlag, AnyFlag}) ->
439+
{L ++ [C], ActiveFlag, AnyFlag}
440+
end, {[], false, false}, Consumers0),
441441

442442
case AnyRemoved of
443443
true ->
@@ -454,6 +454,11 @@ handle_group_after_connection_down(Pid,
454454
end
455455
end.
456456

457+
is_active(waiting) ->
458+
false;
459+
is_active(_) ->
460+
true.
461+
457462
do_register_consumer(VirtualHost,
458463
Stream,
459464
-1 = _PartitionIndex,
@@ -471,12 +476,12 @@ do_register_consumer(VirtualHost,
471476
#consumer{pid = ConnectionPid,
472477
owner = Owner,
473478
subscription_id = SubscriptionId,
474-
active = false};
479+
active = waiting};
475480
false ->
476481
#consumer{pid = ConnectionPid,
477482
subscription_id = SubscriptionId,
478483
owner = Owner,
479-
active = true}
484+
active = active}
480485
end,
481486
Group1 = add_to_group(Consumer, Group0),
482487
StreamGroups1 =
@@ -489,14 +494,14 @@ do_register_consumer(VirtualHost,
489494
#consumer{active = Active} = Consumer,
490495
Effects =
491496
case Active of
492-
true ->
497+
active ->
493498
[notify_consumer_effect(ConnectionPid, SubscriptionId,
494-
Stream, ConsumerName, Active)];
499+
Stream, ConsumerName, is_active(Active))];
495500
_ ->
496501
[]
497502
end,
498503

499-
{State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects};
504+
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Active)}, Effects};
500505
do_register_consumer(VirtualHost,
501506
Stream,
502507
_PartitionIndex,
@@ -516,7 +521,7 @@ do_register_consumer(VirtualHost,
516521
#consumer{pid = ConnectionPid,
517522
owner = Owner,
518523
subscription_id = SubscriptionId,
519-
active = true},
524+
active = active},
520525
G1 = add_to_group(Consumer0, Group0),
521526
{G1,
522527
[notify_consumer_effect(ConnectionPid, SubscriptionId,
@@ -527,7 +532,7 @@ do_register_consumer(VirtualHost,
527532
#consumer{pid = ConnectionPid,
528533
owner = Owner,
529534
subscription_id = SubscriptionId,
530-
active = false},
535+
active = waiting},
531536
G1 = add_to_group(Consumer0, Group0),
532537

533538
case lookup_active_consumer(G1) of
@@ -543,7 +548,7 @@ do_register_consumer(VirtualHost,
543548
{update_consumer_state_in_group(G1,
544549
ActPid,
545550
ActSubId,
546-
false),
551+
deactivating),
547552
[notify_consumer_effect(ActPid,
548553
ActSubId,
549554
Stream,
@@ -565,7 +570,7 @@ do_register_consumer(VirtualHost,
565570
StreamGroups0),
566571
{value, #consumer{active = Active}} =
567572
lookup_consumer(ConnectionPid, SubscriptionId, Group1),
568-
{State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects}.
573+
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Active)}, Effects}.
569574

570575
handle_consumer_removal(#group{consumers = []} = G, _, _, _) ->
571576
{G, []};
@@ -601,7 +606,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
601606
{update_consumer_state_in_group(Group0,
602607
ActPid,
603608
ActSubId,
604-
false),
609+
deactivating),
605610
[notify_consumer_effect(ActPid, ActSubId,
606611
Stream, ConsumerName, false, true)]}
607612
end;
@@ -611,7 +616,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
611616
%% the active one is going away, picking a new one
612617
#consumer{pid = P, subscription_id = SID} =
613618
evaluate_active_consumer(Group0),
614-
{update_consumer_state_in_group(Group0, P, SID, true),
619+
{update_consumer_state_in_group(Group0, P, SID, active),
615620
[notify_consumer_effect(P, SID,
616621
Stream, ConsumerName, true)]};
617622
false ->
@@ -678,13 +683,13 @@ compute_active_consumer(#group{consumers = Crs,
678683
compute_active_consumer(#group{partition_index = -1,
679684
consumers = [Consumer0]} =
680685
Group0) ->
681-
Consumer1 = Consumer0#consumer{active = true},
686+
Consumer1 = Consumer0#consumer{active = active},
682687
Group0#group{consumers = [Consumer1]};
683688
compute_active_consumer(#group{partition_index = -1,
684689
consumers = [Consumer0 | T]} =
685690
Group0) ->
686-
Consumer1 = Consumer0#consumer{active = true},
687-
Consumers = lists:map(fun(C) -> C#consumer{active = false} end, T),
691+
Consumer1 = Consumer0#consumer{active = active},
692+
Consumers = lists:map(fun(C) -> C#consumer{active = waiting} end, T),
688693
Group0#group{consumers = [Consumer1] ++ Consumers}.
689694

690695
evaluate_active_consumer(#group{partition_index = PartitionIndex,
@@ -701,7 +706,7 @@ lookup_consumer(ConnectionPid, SubscriptionId,
701706
Consumers).
702707

703708
lookup_active_consumer(#group{consumers = Consumers}) ->
704-
lists:search(fun(#consumer{active = Active}) -> Active end,
709+
lists:search(fun(#consumer{active = Active}) -> is_active(Active) end,
705710
Consumers).
706711

707712
update_groups(_VirtualHost,
@@ -738,6 +743,12 @@ update_consumer_state_in_group(#group{consumers = Consumers0} = G,
738743
Consumers0),
739744
G#group{consumers = CS1}.
740745

746+
update_consumers(#group{consumers = Consumers0} = G, NewState) ->
747+
Consumers1 = lists:map(fun(C) ->
748+
C#consumer{active = NewState}
749+
end, Consumers0),
750+
G#group{consumers = Consumers1}.
751+
741752
mod_call_effect(Pid, Msg) ->
742753
{mod_call, rabbit_stream_sac_coordinator, send_message, [Pid, Msg]}.
743754

deps/rabbit/src/rabbit_stream_sac_coordinator.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
{pid :: pid(),
2828
subscription_id :: subscription_id(),
2929
owner :: owner(), %% just a label
30-
active :: boolean()}).
30+
active :: active | waiting | deactivating}).
3131
-record(group,
3232
{consumers :: [#consumer{}], partition_index :: integer()}).
3333
-record(rabbit_stream_sac_coordinator,

0 commit comments

Comments
 (0)