Skip to content

Commit 0fa8633

Browse files
committed
Add new machine module for SAC coordinator v5
1 parent 1d9314b commit 0fa8633

11 files changed

+1669
-144
lines changed

deps/rabbit/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ PARALLEL_CT_SET_2_B = clustering_recovery crashing_queues deprecated_features di
268268
PARALLEL_CT_SET_2_C = disk_monitor dynamic_qq unit_disk_monitor unit_file_handle_cache unit_log_management unit_operator_policy
269269
PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator
270270

271-
PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_prop rabbit_fifo_v0 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue
271+
PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_prop rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue
272272
PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_online_and_offline logging lqueue maintenance_mode rabbit_fifo_q
273273
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration
274274
PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor

deps/rabbit/ct.test.spec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
, rabbit_local_random_exchange_SUITE
118118
, rabbit_msg_interceptor_SUITE
119119
, rabbit_stream_coordinator_SUITE
120+
, rabbit_stream_sac_coordinator_v4_SUITE
120121
, rabbit_stream_sac_coordinator_SUITE
121122
, rabbitmq_4_0_deprecations_SUITE
122123
, rabbitmq_queues_cli_integration_SUITE

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -520,13 +520,13 @@ reachable_coord_members() ->
520520
Nodes = rabbit_nodes:list_reachable(),
521521
[{?MODULE, Node} || Node <- Nodes].
522522

523-
version() -> 4.
523+
version() -> 5.
524524

525525
which_module(_) ->
526526
?MODULE.
527527

528528
init(_Conf) ->
529-
#?MODULE{single_active_consumer = rabbit_stream_sac_coordinator:init_state()}.
529+
#?MODULE{single_active_consumer = rabbit_stream_sac_coordinator_v4:init_state()}.
530530

531531
-spec apply(ra_machine:command_meta_data(), command(), state()) ->
532532
{state(), term(), ra_machine:effects()}.
@@ -564,11 +564,12 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
564564
end;
565565
apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0,
566566
monitors = Monitors0} = State0) ->
567-
{SacState1, Reply, Effects0} = rabbit_stream_sac_coordinator:apply(SacCommand, SacState0),
567+
Mod = sac_module(Meta),
568+
{SacState1, Reply, Effects0} = Mod:apply(SacCommand, SacState0),
568569
{SacState2, Monitors1, Effects1} =
569-
rabbit_stream_sac_coordinator:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0),
570+
Mod:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0),
570571
return(Meta, State0#?MODULE{single_active_consumer = SacState2,
571-
monitors = Monitors1}, Reply, Effects1);
572+
monitors = Monitors1}, Reply, Effects1);
572573
apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
573574
#?MODULE{streams = Streams0,
574575
monitors = Monitors0,
@@ -629,7 +630,8 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
629630
monitors = Monitors1}, ok, Effects0)
630631
end;
631632
{sac, Monitors1} ->
632-
{SacState1, Effects} = rabbit_stream_sac_coordinator:handle_connection_down(Pid, SacState0),
633+
Mod = sac_module(Meta),
634+
{SacState1, Effects} = Mod:handle_connection_down(Pid, SacState0),
633635
return(Meta, State#?MODULE{single_active_consumer = SacState1,
634636
monitors = Monitors1}, ok, Effects);
635637
error ->
@@ -747,6 +749,11 @@ state_enter(leader, #?MODULE{streams = Streams,
747749
state_enter(_S, _) ->
748750
[].
749751

752+
sac_module(#{machine_version := MachineVersion}) when MachineVersion =< 4 ->
753+
rabbit_stream_sac_coordinator_v4;
754+
sac_module(_) ->
755+
rabbit_stream_sac_coordinator.
756+
750757
all_member_nodes(Streams) ->
751758
maps:keys(
752759
maps:fold(
@@ -2214,6 +2221,11 @@ machine_version(3, 4, #?MODULE{streams = Streams0} = State) ->
22142221
end, Members)}
22152222
end, Streams0),
22162223
{State#?MODULE{streams = Streams}, []};
2224+
machine_version(4 = From, 5, #?MODULE{single_active_consumer = Sac0} = State) ->
2225+
rabbit_log:info("Stream coordinator machine version changes from 4 to 5, updating state."),
2226+
SacExport = rabbit_stream_sac_coordinator_v4:state_to_map(Sac0),
2227+
Sac1 = rabbit_stream_sac_coordinator:import_state(From, SacExport),
2228+
{State#?MODULE{single_active_consumer = Sac1}, []};
22172229
machine_version(From, To, State) ->
22182230
rabbit_log:info("Stream coordinator machine version changes from ~tp to ~tp, no state changes required.",
22192231
[From, To]),

deps/rabbit/src/rabbit_stream_coordinator.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
listeners = #{} :: undefined | #{stream_id() =>
6969
#{pid() := queue_ref()}},
7070
single_active_consumer = undefined :: undefined |
71+
rabbit_stream_sac_coordinator_v4:state() |
7172
rabbit_stream_sac_coordinator:state(),
7273
%% future extensibility
7374
reserved_2}).

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 75 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,15 @@
3939
handle_connection_down/2,
4040
consumer_groups/3,
4141
group_consumers/5,
42-
overview/1]).
42+
overview/1,
43+
import_state/2]).
4344

4445
-import(rabbit_stream_coordinator, [ra_local_query/1]).
4546

47+
-define(ACTIVE, active).
48+
-define(WAITING, waiting).
49+
-define(DEACTIVATING, deactivating).
50+
4651
%% Single Active Consumer API
4752
-spec register_consumer(binary(),
4853
binary(),
@@ -233,7 +238,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
233238
G1 = remove_from_group(Consumer, Group0),
234239
handle_consumer_removal(
235240
G1, Stream, ConsumerName,
236-
is_active(Consumer#consumer.active));
241+
is_active(Consumer#consumer.status));
237242
false ->
238243
{Group0, []}
239244
end,
@@ -257,10 +262,10 @@ apply(#command_activate_consumer{vhost = VirtualHost,
257262
[{VirtualHost, Stream, ConsumerName}]),
258263
{undefined, []};
259264
Group0 ->
260-
Group1 = update_consumers(Group0, waiting),
265+
Group1 = update_consumers(Group0, ?WAITING),
261266
#consumer{pid = Pid, subscription_id = SubId} =
262267
evaluate_active_consumer(Group1),
263-
Group2 = update_consumer_state_in_group(Group1, Pid, SubId, active),
268+
Group2 = update_consumer_state_in_group(Group1, Pid, SubId, ?ACTIVE),
264269
{Group2, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}
265270
end,
266271
StreamGroups1 =
@@ -316,7 +321,7 @@ group_consumers(VirtualHost,
316321
#{GroupId := #group{consumers = Consumers}} ->
317322
Cs = lists:foldr(fun(#consumer{subscription_id = SubId,
318323
owner = Owner,
319-
active = Active},
324+
status = Status},
320325
Acc) ->
321326
Record =
322327
lists:foldr(fun (subscription_id, RecAcc) ->
@@ -328,7 +333,7 @@ group_consumers(VirtualHost,
328333
Owner}
329334
| RecAcc];
330335
(state, RecAcc) ->
331-
[{state, Active}
336+
[{state, cli_consumer_status_label(Status)}
332337
| RecAcc];
333338
(Unknown, RecAcc) ->
334339
[{Unknown,
@@ -344,6 +349,11 @@ group_consumers(VirtualHost,
344349
{error, not_found}
345350
end.
346351

352+
cli_consumer_status_label(?ACTIVE) ->
353+
active;
354+
cli_consumer_status_label(_) ->
355+
inactive.
356+
347357
-spec ensure_monitors(command(),
348358
state(),
349359
map(),
@@ -434,7 +444,7 @@ handle_group_after_connection_down(Pid,
434444
%% keep flags to know what happened
435445
{Consumers1, ActiveRemoved, AnyRemoved} =
436446
lists:foldl(
437-
fun(#consumer{pid = P, active = S}, {L, ActiveFlag, _})
447+
fun(#consumer{pid = P, status = S}, {L, ActiveFlag, _})
438448
when P == Pid ->
439449
{L, is_active(S) or ActiveFlag, true};
440450
(C, {L, ActiveFlag, AnyFlag}) ->
@@ -456,6 +466,42 @@ handle_group_after_connection_down(Pid,
456466
end
457467
end.
458468

469+
-spec import_state(ra_machine:version(), map()) -> state().
470+
import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) ->
471+
#?MODULE{groups = map_to_groups(Groups),
472+
pids_groups = map_to_pids_groups(PidsGroups)}.
473+
474+
map_to_groups(Groups) when is_map(Groups) ->
475+
maps:fold(fun(K, V, Acc) ->
476+
Acc#{K => map_to_group(V)}
477+
end, #{}, Groups);
478+
map_to_groups(_) ->
479+
#{}.
480+
481+
map_to_pids_groups(PidsGroups) when is_map(PidsGroups) ->
482+
PidsGroups;
483+
map_to_pids_groups(_) ->
484+
#{}.
485+
486+
map_to_group(#{<<"consumers">> := Consumers, <<"partition_index">> := Index}) ->
487+
C = lists:foldl(fun(V, Acc) ->
488+
Acc ++ [map_to_consumer(V)]
489+
end, [], Consumers),
490+
#group{consumers = C,
491+
partition_index = Index}.
492+
493+
map_to_consumer(#{<<"pid">> := Pid, <<"subscription_id">> := SubId,
494+
<<"owner">> := Owner, <<"active">> := Active}) ->
495+
#consumer{pid = Pid,
496+
subscription_id = SubId,
497+
owner = Owner,
498+
status = active_to_status(Active)}.
499+
500+
active_to_status(true) ->
501+
?ACTIVE;
502+
active_to_status(false) ->
503+
?WAITING.
504+
459505
is_active(waiting) ->
460506
false;
461507
is_active(_) ->
@@ -478,12 +524,12 @@ do_register_consumer(VirtualHost,
478524
#consumer{pid = ConnectionPid,
479525
owner = Owner,
480526
subscription_id = SubscriptionId,
481-
active = waiting};
527+
status = ?WAITING};
482528
false ->
483529
#consumer{pid = ConnectionPid,
484530
subscription_id = SubscriptionId,
485531
owner = Owner,
486-
active = active}
532+
status = ?ACTIVE}
487533
end,
488534
Group1 = add_to_group(Consumer, Group0),
489535
StreamGroups1 =
@@ -493,17 +539,17 @@ do_register_consumer(VirtualHost,
493539
Group1,
494540
StreamGroups0),
495541

496-
#consumer{active = Active} = Consumer,
542+
#consumer{status = Status} = Consumer,
497543
Effects =
498-
case Active of
499-
active ->
544+
case Status of
545+
?ACTIVE ->
500546
[notify_consumer_effect(ConnectionPid, SubscriptionId,
501-
Stream, ConsumerName, is_active(Active))];
547+
Stream, ConsumerName, is_active(Status))];
502548
_ ->
503549
[]
504550
end,
505551

506-
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Active)}, Effects};
552+
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects};
507553
do_register_consumer(VirtualHost,
508554
Stream,
509555
_PartitionIndex,
@@ -523,7 +569,7 @@ do_register_consumer(VirtualHost,
523569
#consumer{pid = ConnectionPid,
524570
owner = Owner,
525571
subscription_id = SubscriptionId,
526-
active = active},
572+
status = ?ACTIVE},
527573
G1 = add_to_group(Consumer0, Group0),
528574
{G1,
529575
[notify_consumer_effect(ConnectionPid, SubscriptionId,
@@ -534,7 +580,7 @@ do_register_consumer(VirtualHost,
534580
#consumer{pid = ConnectionPid,
535581
owner = Owner,
536582
subscription_id = SubscriptionId,
537-
active = waiting},
583+
status = ?WAITING},
538584
G1 = add_to_group(Consumer0, Group0),
539585

540586
case lookup_active_consumer(G1) of
@@ -550,7 +596,7 @@ do_register_consumer(VirtualHost,
550596
{update_consumer_state_in_group(G1,
551597
ActPid,
552598
ActSubId,
553-
deactivating),
599+
?DEACTIVATING),
554600
[notify_consumer_effect(ActPid,
555601
ActSubId,
556602
Stream,
@@ -570,9 +616,9 @@ do_register_consumer(VirtualHost,
570616
ConsumerName,
571617
Group1,
572618
StreamGroups0),
573-
{value, #consumer{active = Active}} =
619+
{value, #consumer{status = Status}} =
574620
lookup_consumer(ConnectionPid, SubscriptionId, Group1),
575-
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Active)}, Effects}.
621+
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects}.
576622

577623
handle_consumer_removal(#group{consumers = []} = G, _, _, _) ->
578624
{G, []};
@@ -608,7 +654,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
608654
{update_consumer_state_in_group(Group0,
609655
ActPid,
610656
ActSubId,
611-
deactivating),
657+
?DEACTIVATING),
612658
[notify_consumer_effect(ActPid, ActSubId,
613659
Stream, ConsumerName, false, true)]}
614660
end;
@@ -618,7 +664,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
618664
%% the active one is going away, picking a new one
619665
#consumer{pid = P, subscription_id = SID} =
620666
evaluate_active_consumer(Group0),
621-
{update_consumer_state_in_group(Group0, P, SID, active),
667+
{update_consumer_state_in_group(Group0, P, SID, ?ACTIVE),
622668
[notify_consumer_effect(P, SID,
623669
Stream, ConsumerName, true)]};
624670
false ->
@@ -688,13 +734,13 @@ compute_active_consumer(#group{consumers = Crs,
688734
compute_active_consumer(#group{partition_index = -1,
689735
consumers = [Consumer0]} =
690736
Group0) ->
691-
Consumer1 = Consumer0#consumer{active = active},
737+
Consumer1 = Consumer0#consumer{status = ?ACTIVE},
692738
Group0#group{consumers = [Consumer1]};
693739
compute_active_consumer(#group{partition_index = -1,
694740
consumers = [Consumer0 | T]} =
695741
Group0) ->
696-
Consumer1 = Consumer0#consumer{active = active},
697-
Consumers = lists:map(fun(C) -> C#consumer{active = waiting} end, T),
742+
Consumer1 = Consumer0#consumer{status = ?ACTIVE},
743+
Consumers = lists:map(fun(C) -> C#consumer{status = ?WAITING} end, T),
698744
Group0#group{consumers = [Consumer1] ++ Consumers}.
699745

700746
evaluate_active_consumer(#group{partition_index = PartitionIndex,
@@ -711,7 +757,7 @@ lookup_consumer(ConnectionPid, SubscriptionId,
711757
Consumers).
712758

713759
lookup_active_consumer(#group{consumers = Consumers}) ->
714-
lists:search(fun(#consumer{active = Active}) -> is_active(Active) end,
760+
lists:search(fun(#consumer{status = Status}) -> is_active(Status) end,
715761
Consumers).
716762

717763
update_groups(_VirtualHost,
@@ -737,20 +783,20 @@ update_groups(VirtualHost,
737783
update_consumer_state_in_group(#group{consumers = Consumers0} = G,
738784
Pid,
739785
SubId,
740-
NewState) ->
786+
NewStatus) ->
741787
CS1 = lists:map(fun(C0) ->
742788
case C0 of
743789
#consumer{pid = Pid, subscription_id = SubId} ->
744-
C0#consumer{active = NewState};
790+
C0#consumer{status = NewStatus};
745791
C -> C
746792
end
747793
end,
748794
Consumers0),
749795
G#group{consumers = CS1}.
750796

751-
update_consumers(#group{consumers = Consumers0} = G, NewState) ->
797+
update_consumers(#group{consumers = Consumers0} = G, NewStatus) ->
752798
Consumers1 = lists:map(fun(C) ->
753-
C#consumer{active = NewState}
799+
C#consumer{status = NewStatus}
754800
end, Consumers0),
755801
G#group{consumers = Consumers1}.
756802

deps/rabbit/src/rabbit_stream_sac_coordinator.hrl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@
2222
-type subscription_id() :: byte().
2323
-type group_id() :: {vhost(), stream(), consumer_name()}.
2424
-type owner() :: binary().
25+
-type consumer_status() :: active | waiting | deactivating.
2526

2627
-record(consumer,
2728
{pid :: pid(),
2829
subscription_id :: subscription_id(),
2930
owner :: owner(), %% just a label
30-
active :: active | waiting | deactivating}).
31+
status :: consumer_status()}).
3132
-record(group,
3233
{consumers :: [#consumer{}], partition_index :: integer()}).
3334
-record(rabbit_stream_sac_coordinator,

0 commit comments

Comments
 (0)