2121-opaque command () :: # command_register_consumer {} |
2222 # command_unregister_consumer {} |
2323 # command_activate_consumer {} |
24- # command_connection_reconnected {}.
24+ # command_connection_reconnected {} |
25+ # command_purge_nodes {}.
2526
2627-opaque state () :: #? MODULE {}.
2728
4748 group_consumers /5 ,
4849 overview /1 ,
4950 import_state /2 ]).
51+ -export ([make_purge_nodes /1 ]).
5052
5153% % exported for unit tests only
5254-ifdef (TEST ).
@@ -84,25 +86,13 @@ register_consumer(VirtualHost,
8486 ConnectionPid ,
8587 Owner ,
8688 SubscriptionId ) ->
87- process_command ({sac ,
88- # command_register_consumer {vhost =
89- VirtualHost ,
90- stream =
91- Stream ,
92- partition_index
93- =
94- PartitionIndex ,
95- consumer_name
96- =
97- ConsumerName ,
98- connection_pid
99- =
100- ConnectionPid ,
101- owner =
102- Owner ,
103- subscription_id
104- =
105- SubscriptionId }}).
89+ process_command (# command_register_consumer {vhost = VirtualHost ,
90+ stream = Stream ,
91+ partition_index = PartitionIndex ,
92+ consumer_name = ConsumerName ,
93+ connection_pid = ConnectionPid ,
94+ owner = Owner ,
95+ subscription_id = SubscriptionId }).
10696
10797-spec unregister_consumer (binary (),
10898 binary (),
@@ -115,35 +105,24 @@ unregister_consumer(VirtualHost,
115105 ConsumerName ,
116106 ConnectionPid ,
117107 SubscriptionId ) ->
118- process_command ({sac ,
119- # command_unregister_consumer {vhost =
120- VirtualHost ,
121- stream =
122- Stream ,
123- consumer_name
124- =
125- ConsumerName ,
126- connection_pid
127- =
128- ConnectionPid ,
129- subscription_id
130- =
131- SubscriptionId }}).
108+ process_command (# command_unregister_consumer {vhost = VirtualHost ,
109+ stream = Stream ,
110+ consumer_name = ConsumerName ,
111+ connection_pid = ConnectionPid ,
112+ subscription_id = SubscriptionId }).
132113
133114-spec activate_consumer (binary (), binary (), binary ()) -> ok .
134115activate_consumer (VH , Stream , Name ) ->
135- process_command ({sac ,
136- # command_activate_consumer {vhost = VH ,
137- stream = Stream ,
138- consumer_name = Name }}).
116+ process_command (# command_activate_consumer {vhost = VH ,
117+ stream = Stream ,
118+ consumer_name = Name }).
139119
140120-spec connection_reconnected (connection_pid ()) -> ok .
141121connection_reconnected (Pid ) ->
142- process_command ({sac ,
143- # command_connection_reconnected {pid = Pid }}).
122+ process_command (# command_connection_reconnected {pid = Pid }).
144123
145124process_command (Cmd ) ->
146- case rabbit_stream_coordinator :process_command (Cmd ) of
125+ case rabbit_stream_coordinator :process_command (wrap_cmd ( Cmd ) ) of
147126 {ok , Res , _ } ->
148127 Res ;
149128 {error , _ } = Err ->
@@ -152,6 +131,10 @@ process_command(Cmd) ->
152131 Err
153132 end .
154133
134+ -spec wrap_cmd (command ()) -> {sac , command ()}.
135+ wrap_cmd (Cmd ) ->
136+ {sac , Cmd }.
137+
155138% % return the current groups for a given virtual host
156139-spec consumer_groups (binary (), [atom ()]) ->
157140 {ok ,
@@ -306,8 +289,31 @@ apply(#command_connection_reconnected{pid = Pid},
306289 handle_group_connection_reconnected (Pid , St , Eff , G )
307290 end , {State0 , []}, Groups0 ),
308291
292+ {State1 , ok , Eff };
293+ apply (# command_purge_nodes {nodes = Nodes }, State0 ) ->
294+ {State1 , Eff } = lists :foldl (fun (N , {S0 , Eff0 }) ->
295+ {S1 , Eff1 } = purge_node (N , S0 ),
296+ {S1 , Eff1 ++ Eff0 }
297+ end , {State0 , []}, Nodes ),
309298 {State1 , ok , Eff }.
310299
300+ purge_node (Node , #? MODULE {groups = Groups0 } = State0 ) ->
301+ PidsGroups =
302+ maps :fold (fun (K , # group {consumers = Consumers }, Acc ) ->
303+ lists :foldl (fun (# consumer {pid = Pid }, AccIn )
304+ when node (Pid ) =:= Node ->
305+ PG0 = maps :get (Pid , AccIn , #{}),
306+ PG1 = PG0 #{K => true },
307+ AccIn #{Pid => PG1 };
308+ (_ , AccIn ) ->
309+ AccIn
310+ end , Acc , Consumers )
311+ end , #{}, Groups0 ),
312+ maps :fold (fun (Pid , Groups , {S0 , Eff0 }) ->
313+ {S1 , Eff1 } = handle_connection_down0 (Pid , S0 , Groups ),
314+ {S1 , Eff1 ++ Eff0 }
315+ end , {State0 , []}, PidsGroups ).
316+
311317handle_group_connection_reconnected (Pid , #? MODULE {groups = Groups0 } = S0 ,
312318 Eff0 , {VH , S , Name } = K ) ->
313319 % % TODO sac: handle forgotten_active case (reconciliate state with current active)
@@ -573,6 +579,7 @@ ensure_monitors(#command_connection_reconnected{pid = Pid},
573579 Monitors #{Pid => sac },
574580 [{monitor , process , Pid }, {monitor , node , node (Pid )} | Effects ]};
575581ensure_monitors (_ , #? MODULE {} = State0 , Monitors , Effects ) ->
582+ % % TODO sac: ensure the pid-group mapping after purge_nodes?
576583 {State0 , Monitors , Effects }.
577584
578585-spec handle_connection_down (connection_pid (), state ()) ->
@@ -584,11 +591,14 @@ handle_connection_down(Pid,
584591 {State0 , []};
585592 {Groups , PidsGroups1 } ->
586593 State1 = State0 #? MODULE {pids_groups = PidsGroups1 },
587- maps :fold (fun (G , _ , Acc ) ->
588- handle_group_after_connection_down (Pid , Acc , G )
589- end , {State1 , []}, Groups )
594+ handle_connection_down0 (Pid , State1 , Groups )
590595 end .
591596
597+ handle_connection_down0 (Pid , State , Groups ) ->
598+ maps :fold (fun (G , _ , Acc ) ->
599+ handle_group_after_connection_down (Pid , Acc , G )
600+ end , {State , []}, Groups ).
601+
592602-spec handle_connection_node_disconnected (connection_pid (), state ()) ->
593603 {state (), ra_machine :effects ()}.
594604handle_connection_node_disconnected (ConnPid ,
@@ -730,6 +740,10 @@ import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) ->
730740 #? MODULE {groups = map_to_groups (Groups ),
731741 pids_groups = map_to_pids_groups (PidsGroups )}.
732742
743+ - spec make_purge_nodes ([node ()]) -> command ().
744+ make_purge_nodes (Nodes ) ->
745+ wrap_cmd (# command_purge_nodes {nodes = Nodes }).
746+
733747map_to_groups (Groups ) when is_map (Groups ) ->
734748 maps :fold (fun (K , V , Acc ) ->
735749 Acc #{K => map_to_group (V )}
0 commit comments