Skip to content

Commit 2907e00

Browse files
committed
Reconcile stream SAC group when forgotten active consumer comes back
1 parent 08ea875 commit 2907e00

File tree

3 files changed

+288
-25
lines changed

3 files changed

+288
-25
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 131 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
-define(CONN_ACT, {?CONNECTED, ?ACTIVE}).
6969
-define(CONN_WAIT, {?CONNECTED, ?WAITING}).
7070
-define(DISCONN_ACT, {?DISCONNECTED, ?ACTIVE}).
71-
71+
-define(FORG_ACT, {?FORGOTTTEN, ?ACTIVE}).
7272

7373
%% Single Active Consumer API
7474
-spec register_consumer(binary(),
@@ -266,6 +266,15 @@ apply(#command_activate_consumer{vhost = VirtualHost,
266266
[{VirtualHost, Stream, ConsumerName}]),
267267
{undefined, []};
268268
G0 ->
269+
%% keep track of the former active, if any
270+
{ActPid, ActSubId} =
271+
case lookup_active_consumer(G0) of
272+
{value, #consumer{pid = ActivePid,
273+
subscription_id = ActiveSubId}} ->
274+
{ActivePid, ActiveSubId};
275+
_ ->
276+
{-1, -1}
277+
end,
269278
G1 = update_connected_consumers(G0, ?CONN_WAIT),
270279
case evaluate_active_consumer(G1) of
271280
undefined ->
@@ -277,8 +286,19 @@ apply(#command_activate_consumer{vhost = VirtualHost,
277286
G2 = update_consumer_state_in_group(G1, Pid,
278287
SubId,
279288
?CONN_ACT),
280-
{G2, [notify_consumer_effect(Pid, SubId, Stream,
281-
ConsumerName, true)]}
289+
%% do we need effects or not?
290+
Effects =
291+
case {Pid, SubId} of
292+
{ActPid, ActSubId} ->
293+
%% it is the same active consumer as before
294+
%% no need to notify it
295+
[];
296+
_ ->
297+
%% new active consumer, need to notify it
298+
[notify_consumer_effect(Pid, SubId, Stream,
299+
ConsumerName, true)]
300+
end,
301+
{G2, Effects}
282302
end
283303
end,
284304
StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName,
@@ -308,31 +328,122 @@ purge_node(Node, #?MODULE{groups = Groups0} = State0) ->
308328

309329
handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0,
310330
Eff0, {VH, S, Name} = K) ->
311-
%% TODO sac: handle forgotten_active case (reconciliate state with current active)
312331
case lookup_group(VH, S, Name, Groups0) of
313332
undefined ->
314333
{S0, Eff0};
315-
#group{consumers = Consumers0} = G0 ->
316-
{Consumers1, Updated} =
317-
lists:foldr(
318-
fun(#consumer{pid = P, status = {_, St}} = C, {L, _})
319-
when P == Pid ->
320-
{[C#consumer{status = {?CONNECTED, St}} | L], true};
321-
(C, {L, UpdatedFlag}) ->
322-
{[C | L], UpdatedFlag or false}
323-
end, {[], false}, Consumers0),
324-
325-
case Updated of
334+
Group ->
335+
case has_forgotten_active(Group, Pid) of
326336
true ->
327-
G1 = G0#group{consumers = Consumers1},
328-
{G2, Eff} = maybe_rebalance_group(G1, K),
329-
Groups1 = update_groups(VH, S, Name, G2, Groups0),
330-
{S0#?MODULE{groups = Groups1}, Eff ++ Eff0};
337+
%% a forgotten active is coming in the connection
338+
%% we need to reconcile the group,
339+
%% as there may have been 2 active consumers at a time
340+
handle_forgotten_active_reconnected(Pid, S0, Eff0, K);
331341
false ->
332-
{S0, Eff0}
342+
do_handle_group_connection_reconnected(Pid, S0, Eff0, K)
333343
end
334344
end.
335345

346+
do_handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0,
347+
Eff0, {VH, S, Name} = K) ->
348+
G0 = #group{consumers = Consumers0} = lookup_group(VH, S, Name, Groups0),
349+
{Consumers1, Updated} =
350+
lists:foldr(
351+
fun(#consumer{pid = P, status = {_, St}} = C, {L, _})
352+
when P == Pid ->
353+
{[C#consumer{status = {?CONNECTED, St}} | L], true};
354+
(C, {L, UpdatedFlag}) ->
355+
{[C | L], UpdatedFlag or false}
356+
end, {[], false}, Consumers0),
357+
358+
case Updated of
359+
true ->
360+
G1 = G0#group{consumers = Consumers1},
361+
{G2, Eff} = maybe_rebalance_group(G1, K),
362+
Groups1 = update_groups(VH, S, Name, G2, Groups0),
363+
{S0#?MODULE{groups = Groups1}, Eff ++ Eff0};
364+
false ->
365+
{S0, Eff0}
366+
end.
367+
368+
handle_forgotten_active_reconnected(Pid,
369+
#?MODULE{groups = Groups0} = S0,
370+
Eff0, {VH, S, Name}) ->
371+
G0 = #group{consumers = Consumers0} = lookup_group(VH, S, Name, Groups0),
372+
{Consumers1, Eff1} =
373+
case has_disconnected_active(G0) of
374+
true ->
375+
%% disconnected active consumer in the group, no rebalancing possible
376+
%% we update the disconnected active consumers
377+
%% and tell them to step down
378+
lists:foldr(fun(#consumer{status = St,
379+
pid = P,
380+
subscription_id = SID} = C, {Cs, Eff})
381+
when P =:= Pid andalso St =:= ?FORG_ACT ->
382+
{[C#consumer{status = ?CONN_WAIT} | Cs],
383+
[notify_consumer_effect(Pid, SID, S,
384+
Name, false, true) | Eff]};
385+
(C, {Cs, Eff}) ->
386+
{[C | Cs], Eff}
387+
end, {[], Eff0}, Consumers0);
388+
false ->
389+
lists:foldr(fun(#consumer{status = St,
390+
pid = P,
391+
subscription_id = SID} = C, {Cs, Eff})
392+
when P =:= Pid andalso St =:= ?FORG_ACT ->
393+
%% update forgotten active
394+
%% tell it to step down
395+
{[C#consumer{status = ?CONN_WAIT} | Cs],
396+
[notify_consumer_effect(P, SID, S,
397+
Name, false, true) | Eff]};
398+
(#consumer{status = {?FORGOTTTEN, _},
399+
pid = P} = C, {Cs, Eff})
400+
when P =:= Pid ->
401+
%% update forgotten
402+
{[C#consumer{status = ?CONN_WAIT} | Cs], Eff};
403+
(#consumer{status = ?CONN_ACT,
404+
pid = P,
405+
subscription_id = SID} = C, {Cs, Eff}) ->
406+
%% update connected active
407+
%% tell it to step down
408+
{[C#consumer{status = ?CONN_WAIT} | Cs],
409+
[notify_consumer_effect(P, SID, S,
410+
Name, false, true) | Eff]};
411+
(C, {Cs, Eff}) ->
412+
{[C | Cs], Eff}
413+
end, {[], Eff0}, Consumers0)
414+
end,
415+
G1 = G0#group{consumers = Consumers1},
416+
Groups1 = update_groups(VH, S, Name, G1, Groups0),
417+
{S0#?MODULE{groups = Groups1}, Eff1}.
418+
419+
has_forgotten_active(#group{consumers = Consumers}, Pid) ->
420+
case lists:search(fun(#consumer{status = ?FORG_ACT,
421+
pid = P}) when P =:= Pid ->
422+
true;
423+
(_) -> false
424+
end, Consumers) of
425+
false ->
426+
false;
427+
_ ->
428+
true
429+
end.
430+
431+
has_disconnected_active(Group) ->
432+
has_consumer_with_status(Group, ?DISCONN_ACT).
433+
434+
has_consumer_with_status(#group{consumers = Consumers}, Status) ->
435+
case lists:search(fun(#consumer{status = S}) when S =:= Status ->
436+
true;
437+
(_) -> false
438+
end, Consumers) of
439+
false ->
440+
false;
441+
_ ->
442+
true
443+
end.
444+
445+
446+
336447
maybe_rebalance_group(#group{partition_index = -1, consumers = Consumers0} = G0,
337448
{_VH, S, Name}) ->
338449
case lookup_active_consumer(G0) of

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 156 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -953,7 +953,7 @@ handle_connection_node_disconnected_super_stream_disconn_active_block_rebalancin
953953
assertNodeDisconnectedTimerEffect(Pid0, Eff),
954954
ok.
955955

956-
connection_reconnected_simple_disconn_active_block_rebalancing_test(_) ->
956+
connection_reconnected_simple_disconn_active_blocks_rebalancing_test(_) ->
957957
Pid0 = new_process(),
958958
Pid1 = new_process(),
959959
Pid2 = new_process(),
@@ -974,7 +974,73 @@ connection_reconnected_simple_disconn_active_block_rebalancing_test(_) ->
974974
assertEmpty(Eff),
975975
ok.
976976

977-
connection_reconnected_super_stream_disconn_active_block_rebalancing_test(_) ->
977+
connection_reconnected_simple_forg_act_disconn_active_blocks_rebalancing_test(_) ->
978+
P0 = new_process(),
979+
P1 = new_process(),
980+
P2 = new_process(),
981+
GId = group_id(),
982+
Group = cgroup([consumer(P0, 0, {forgotten, active}),
983+
consumer(P1, 0, {disconnected, active}),
984+
consumer(P2, 0, {connected, waiting})]),
985+
986+
Groups0 = #{GId => Group},
987+
State0 = state(Groups0),
988+
Cmd = connection_reconnected_command(P0),
989+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
990+
991+
assertHasGroup(GId, cgroup([consumer(P0, 0, {connected, waiting}),
992+
consumer(P1, 0, {disconnected, active}),
993+
consumer(P2, 0, {connected, waiting})]),
994+
Groups1),
995+
assertSize(1, Eff),
996+
assertContainsSendMessageSteppingDownEffect(P0, Eff),
997+
ok.
998+
999+
connection_reconnected_simple_forg_act_should_trigger_rebalancing_test(_) ->
1000+
P0 = new_process(),
1001+
P1 = new_process(),
1002+
P2 = new_process(),
1003+
GId = group_id(),
1004+
Group = cgroup([consumer(P0, {forgotten, active}),
1005+
consumer(P1, {connected, active}),
1006+
consumer(P2, {connected, waiting})]),
1007+
1008+
Groups0 = #{GId => Group},
1009+
S0 = state(Groups0),
1010+
Cmd0 = connection_reconnected_command(P0),
1011+
{#?STATE{groups = Groups1} = S1, ok, Eff1} = ?MOD:apply(Cmd0, S0),
1012+
1013+
assertHasGroup(GId, cgroup([consumer(P0, {connected, waiting}),
1014+
consumer(P1, {connected, waiting}),
1015+
consumer(P2, {connected, waiting})]),
1016+
Groups1),
1017+
assertSize(2, Eff1),
1018+
assertContainsSendMessageSteppingDownEffect(P0, 0, stream(), name(), Eff1),
1019+
assertContainsSendMessageSteppingDownEffect(P1, 0, stream(), name(), Eff1),
1020+
1021+
%% activation from the first consumer stepping down
1022+
Cmd1 = activate_consumer_command(stream(), name()),
1023+
{#?STATE{groups = Groups2} = S2, ok, Eff2} = ?MOD:apply(Cmd1, S1),
1024+
assertHasGroup(GId, cgroup([consumer(P0, {connected, active}),
1025+
consumer(P1, {connected, waiting}),
1026+
consumer(P2, {connected, waiting})]),
1027+
Groups2),
1028+
assertSize(1, Eff2),
1029+
assertContainsActivateMessage(P0, Eff2),
1030+
1031+
%% activation from the second consumer stepping down
1032+
%% this is expected, but should not change the state
1033+
Cmd2 = activate_consumer_command(stream(), name()),
1034+
{#?STATE{groups = Groups3}, ok, Eff3} = ?MOD:apply(Cmd2, S2),
1035+
assertHasGroup(GId, cgroup([consumer(P0, {connected, active}),
1036+
consumer(P1, {connected, waiting}),
1037+
consumer(P2, {connected, waiting})]),
1038+
Groups3),
1039+
assertEmpty(Eff3),
1040+
1041+
ok.
1042+
1043+
connection_reconnected_super_stream_disconn_active_blocks_rebalancing_test(_) ->
9781044
Pid0 = new_process(),
9791045
Pid1 = new_process(),
9801046
Pid2 = new_process(),
@@ -995,7 +1061,73 @@ connection_reconnected_super_stream_disconn_active_block_rebalancing_test(_) ->
9951061
assertEmpty(Eff),
9961062
ok.
9971063

998-
forget_connection_simple_disconn_active_block_rebalancing_test(_) ->
1064+
connection_reconnected_super_stream_forg_act_disconn_active_blocks_rebalancing_test(_) ->
1065+
P0 = new_process(),
1066+
P1 = new_process(),
1067+
P2 = new_process(),
1068+
GId = group_id(),
1069+
Group = cgroup(1, [consumer(P0, {forgotten, active}),
1070+
consumer(P1, {disconnected, active}),
1071+
consumer(P2, {connected, waiting})]),
1072+
1073+
Groups0 = #{GId => Group},
1074+
State0 = state(Groups0),
1075+
Cmd = connection_reconnected_command(P0),
1076+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
1077+
1078+
assertHasGroup(GId, cgroup(1, [consumer(P0, {connected, waiting}),
1079+
consumer(P1, {disconnected, active}),
1080+
consumer(P2, {connected, waiting})]),
1081+
Groups1),
1082+
assertSize(1, Eff),
1083+
assertContainsSendMessageSteppingDownEffect(P0, Eff),
1084+
ok.
1085+
1086+
connection_reconnected_super_stream_forg_act_should_trigger_rebalancing_test(_) ->
1087+
P0 = new_process(),
1088+
P1 = new_process(),
1089+
P2 = new_process(),
1090+
GId = group_id(),
1091+
Group = cgroup(1, [consumer(P0, {forgotten, active}),
1092+
consumer(P1, {connected, waiting}),
1093+
consumer(P2, {connected, active})]),
1094+
1095+
Groups0 = #{GId => Group},
1096+
S0 = state(Groups0),
1097+
Cmd0 = connection_reconnected_command(P0),
1098+
{#?STATE{groups = Groups1} = S1, ok, Eff1} = ?MOD:apply(Cmd0, S0),
1099+
1100+
assertHasGroup(GId, cgroup(1, [consumer(P0, {connected, waiting}),
1101+
consumer(P1, {connected, waiting}),
1102+
consumer(P2, {connected, waiting})]),
1103+
Groups1),
1104+
assertSize(2, Eff1),
1105+
assertContainsSendMessageSteppingDownEffect(P0, 0, stream(), name(), Eff1),
1106+
assertContainsSendMessageSteppingDownEffect(P2, 0, stream(), name(), Eff1),
1107+
1108+
%% activation from the first consumer stepping down
1109+
Cmd1 = activate_consumer_command(stream(), name()),
1110+
{#?STATE{groups = Groups2} = S2, ok, Eff2} = ?MOD:apply(Cmd1, S1),
1111+
assertHasGroup(GId, cgroup(1, [consumer(P0, {connected, waiting}),
1112+
consumer(P1, {connected, active}),
1113+
consumer(P2, {connected, waiting})]),
1114+
Groups2),
1115+
assertSize(1, Eff2),
1116+
assertContainsActivateMessage(P1, Eff2),
1117+
1118+
%% activation from the second consumer stepping down
1119+
%% this is expected, but should not change the state
1120+
Cmd2 = activate_consumer_command(stream(), name()),
1121+
{#?STATE{groups = Groups3}, ok, Eff3} = ?MOD:apply(Cmd2, S2),
1122+
assertHasGroup(GId, cgroup(1, [consumer(P0, {connected, waiting}),
1123+
consumer(P1, {connected, active}),
1124+
consumer(P2, {connected, waiting})]),
1125+
Groups3),
1126+
assertEmpty(Eff3),
1127+
1128+
ok.
1129+
1130+
forget_connection_simple_disconn_active_blocks_rebalancing_test(_) ->
9991131
Pid0 = new_process(),
10001132
Pid1 = new_process(),
10011133
Pid2 = new_process(),
@@ -1323,6 +1455,9 @@ stream() ->
13231455
name() ->
13241456
<<"app">>.
13251457

1458+
sub_id() ->
1459+
0.
1460+
13261461
apply_ensure_monitors(Mod, Cmd, State0) ->
13271462
{State1, _, _} = Mod:apply(Cmd, State0),
13281463
{State2, _, _} = Mod:ensure_monitors(Cmd, State1, #{}, []),
@@ -1343,7 +1478,7 @@ assertHasGroup(GroupId, Group, Groups) ->
13431478
?assertEqual(Group, G).
13441479

13451480
consumer(Pid, Status) ->
1346-
consumer(Pid, 0, Status).
1481+
consumer(Pid, sub_id(), Status).
13471482

13481483
consumer(Pid, SubId, {Connectivity, Status}) ->
13491484
#consumer{pid = Pid,
@@ -1408,6 +1543,10 @@ assertContainsCheckConnectionEffect(Pid, Effects) ->
14081543
assertContainsSendMessageEffect(Pid, Stream, Active, Effects) ->
14091544
assertContainsSendMessageEffect(Pid, 0, Stream, name(), Active, Effects).
14101545

1546+
assertContainsActivateMessage(Pid, Effects) ->
1547+
assertContainsSendMessageEffect(Pid, sub_id(), stream(), name(),
1548+
true, Effects).
1549+
14111550
assertContainsSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active,
14121551
Effects) ->
14131552
assertContainsSendMessageEffect(Pid, {sac,
@@ -1417,6 +1556,19 @@ assertContainsSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active,
14171556
active => Active}},
14181557
Effects).
14191558

1559+
assertContainsSendMessageSteppingDownEffect(Pid, Effects) ->
1560+
assertContainsSendMessageSteppingDownEffect(Pid, sub_id(), stream(),
1561+
name(), Effects).
1562+
1563+
assertContainsSendMessageSteppingDownEffect(Pid, SubId, Stream, ConsumerName,
1564+
Effects) ->
1565+
assertContainsSendMessageEffect(Pid, {sac,
1566+
#{subscription_id => SubId,
1567+
stream => Stream,
1568+
consumer_name => ConsumerName,
1569+
active => false,
1570+
stepping_down => true}}, Effects).
1571+
14201572
assertContainsSendMessageEffect(Pid, Msg, Effects) ->
14211573
assertContainsEffect({mod_call,
14221574
rabbit_stream_sac_coordinator,

0 commit comments

Comments
 (0)