Skip to content

Commit f476750

Browse files
committed
rabbit_fifo: state_enter fix
when a ra server becomes leader rabbit_fifo should re-issue monitor effects for both consumers and enqueuers.
1 parent 9a2ea74 commit f476750

File tree

1 file changed

+26
-7
lines changed

1 file changed

+26
-7
lines changed

src/rabbit_fifo.erl

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -413,29 +413,35 @@ apply(_, {down, Pid, _Info}, Effects0,
413413
apply(_, {nodeup, Node}, Effects0,
414414
#state{consumers = Cons0,
415415
enqueuers = Enqs0} = State0) ->
416+
%% A node we are monitoring has come back.
417+
%% If we have suspected any processes of being
418+
%% down we should now re-issue the monitors for them to detect if they're
419+
%% actually down or not
416420
Cons = maps:fold(fun({_, P}, #consumer{suspected_down = true}, Acc)
417-
when node(P) =:= Node ->
418-
[P | Acc];
419-
(_, _, Acc) -> Acc
420-
end, [], Cons0),
421+
when node(P) =:= Node ->
422+
[P | Acc];
423+
(_, _, Acc) -> Acc
424+
end, [], Cons0),
421425
Enqs = maps:fold(fun(P, #enqueuer{suspected_down = true}, Acc)
422426
when node(P) =:= Node ->
423427
[P | Acc];
424428
(_, _, Acc) -> Acc
425429
end, [], Enqs0),
426430
Monitors = [{monitor, process, P} || P <- Cons ++ Enqs],
427-
% TODO: should we unsuspect these processes here?
428431
% TODO: avoid list concat
429432
{State0, Monitors ++ Effects0, ok};
430433
apply(_, {nodedown, _Node}, Effects, State) ->
431434
{State, Effects, ok}.
432435

433436
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
434437
state_enter(leader, #state{consumers = Custs,
438+
enqueuers = Enqs,
435439
name = Name,
436440
become_leader_handler = BLH}) ->
437-
% return effects to monitor all current consumerss
438-
Effects = [{monitor, process, P} || {_, P} <- maps:keys(Custs)],
441+
% return effects to monitor all current consumers and enqueuers
442+
ConMons = [{monitor, process, P} || {_, P} <- maps:keys(Custs)],
443+
EnqMons = [{monitor, process, P} || P <- maps:keys(Enqs)],
444+
Effects = ConMons ++ EnqMons,
439445
case BLH of
440446
undefined ->
441447
Effects;
@@ -1509,6 +1515,19 @@ state_enter_test() ->
15091515
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
15101516
ok.
15111517

1518+
leader_monitors_on_state_enter_test() ->
1519+
Cid = {<<"cid">>, self()},
1520+
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
1521+
{State1, _} = check_auto(Cid, 2, State0),
1522+
Self = self(),
1523+
%% as we have an enqueuer _and_ a consumer we chould
1524+
%% get two monitor effects in total, even if they are for the same
1525+
%% processs
1526+
[{monitor, process, Self},
1527+
{monitor, process, Self}] = state_enter(leader, State1),
1528+
ok.
1529+
1530+
15121531
purge_test() ->
15131532
Cid = {<<"purge_test">>, self()},
15141533
{State1, _} = enq(1, 1, first, test_init(test)),

0 commit comments

Comments
 (0)