Skip to content

Commit e26b0e6

Browse files
committed
QQ: resend pending commands when new leader detected on applied notification.
When a leader changes all enqueuer and consumer processes are notified from the `state_enter(leader,` callback. However a new leader may not yet have applied all commands that the old leader had. If any of those commands is a checkout or a register_enqueuer command these processes will not be notified of the new leader and thus may never resend their pending commands. The new leader will however send an applied notification when it does apply these entries and these are always sent from the leader process so can also be used to trigger pending resends. This commit implements that.
1 parent a51d8a5 commit e26b0e6

File tree

2 files changed

+93
-11
lines changed

2 files changed

+93
-11
lines changed

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -586,34 +586,58 @@ update_machine_state(Server, Conf) ->
586586
ra_server_proc:ra_event_body(), state()) ->
587587
{internal, Correlators :: [term()], rabbit_queue_type:actions(), state()} |
588588
{rabbit_fifo:client_msg(), state()} | {eol, rabbit_queue_type:actions()}.
589-
handle_ra_event(QName, From, {applied, Seqs},
590-
#state{cfg = #cfg{soft_limit = SftLmt}} = State0) ->
589+
handle_ra_event(QName, Leader, {applied, Seqs},
590+
#state{leader = OldLeader,
591+
cfg = #cfg{soft_limit = SftLmt}} = State0) ->
591592

592593
{Corrs, ActionsRev, State1} = lists:foldl(fun seq_applied/2,
593-
{[], [], State0#state{leader = From}},
594+
{[], [], State0#state{leader = Leader}},
594595
Seqs),
596+
597+
%% if the leader has changed we need to resend any pending commands remaining
598+
%% after the applied processing
599+
State2 = if OldLeader =/= Leader ->
600+
%% double check before resending as applied notifications
601+
%% can arrive from old leaders in any order
602+
case ra:members(Leader) of
603+
{ok, _, ActualLeader}
604+
when ActualLeader =/= OldLeader ->
605+
%% there is a new leader
606+
rabbit_log:debug("~ts: Detected QQ leader change (applied) "
607+
"from ~w to ~w, "
608+
"resending ~b pending commands",
609+
[?MODULE, OldLeader, Leader,
610+
maps:size(State1#state.pending)]),
611+
resend_all_pending(State1#state{leader = ActualLeader});
612+
_ ->
613+
State1
614+
end;
615+
true ->
616+
State1
617+
end,
618+
595619
Actions0 = lists:reverse(ActionsRev),
596620
Actions = case Corrs of
597621
[] ->
598622
Actions0;
599623
_ ->
600-
%%TODO consider using lists:foldr/3 above because
624+
%%TODO: consider using lists:foldr/3 above because
601625
%% Corrs is returned in the wrong order here.
602626
%% The wrong order does not matter much because the channel sorts the
603627
%% sequence numbers before confirming to the client. But rabbit_fifo_client
604628
%% is sequence numer agnostic: it handles any correlation terms.
605629
[{settled, QName, Corrs} | Actions0]
606630
end,
607-
case map_size(State1#state.pending) < SftLmt of
608-
true when State1#state.slow == true ->
631+
case map_size(State2#state.pending) < SftLmt of
632+
true when State2#state.slow == true ->
609633
% we have exited soft limit state
610634
% send any unsent commands and cancel the time as
611635
% TODO: really the timer should only be cancelled when the channel
612636
% exits flow state (which depends on the state of all queues the
613637
% channel is interacting with)
614638
% but the fact the queue has just applied suggests
615639
% it's ok to cancel here anyway
616-
State2 = cancel_timer(State1#state{slow = false,
640+
State3 = cancel_timer(State2#state{slow = false,
617641
unsent_commands = #{}}),
618642
% build up a list of commands to issue
619643
Commands = maps:fold(
@@ -622,16 +646,16 @@ handle_ra_event(QName, From, {applied, Seqs},
622646
add_command(Cid, return, Returns,
623647
add_command(Cid, discard,
624648
Discards, Acc)))
625-
end, [], State1#state.unsent_commands),
626-
ServerId = pick_server(State2),
649+
end, [], State2#state.unsent_commands),
650+
ServerId = pick_server(State3),
627651
%% send all the settlements and returns
628652
State = lists:foldl(fun (C, S0) ->
629653
send_command(ServerId, undefined, C,
630654
normal, S0)
631-
end, State2, Commands),
655+
end, State3, Commands),
632656
{ok, State, [{unblock, cluster_name(State)} | Actions]};
633657
_ ->
634-
{ok, State1, Actions}
658+
{ok, State2, Actions}
635659
end;
636660
handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
637661
handle_delivery(QName, From, Del, State0);

deps/rabbit/test/rabbit_fifo_int_SUITE.erl

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ all_tests() ->
2323
[
2424
basics,
2525
return,
26+
lost_return_is_resent_on_applied_after_leader_change,
2627
rabbit_fifo_returns_correlation,
2728
resends_lost_command,
2829
returns,
@@ -162,6 +163,63 @@ return(Config) ->
162163
rabbit_quorum_queue:stop_server(ServerId),
163164
ok.
164165

166+
lost_return_is_resent_on_applied_after_leader_change(Config) ->
167+
%% this test handles a case where a combination of a lost/overwritten
168+
%% command and a leader change could result in a client never detecting
169+
%% a new leader and thus never resends whatever command was overwritten
170+
%% in the prior term. The fix is to handle leader changes when processing
171+
%% the {appliekd, _} ra event.
172+
ClusterName = ?config(cluster_name, Config),
173+
ServerId = ?config(node_id, Config),
174+
ServerId2 = ?config(node_id2, Config),
175+
ServerId3 = ?config(node_id3, Config),
176+
Members = [ServerId, ServerId2, ServerId3],
177+
178+
ok = meck:new(ra, [passthrough]),
179+
ok = start_cluster(ClusterName, Members),
180+
181+
{ok, _, Leader} = ra:members(ServerId),
182+
Followers = lists:delete(Leader, Members),
183+
184+
F00 = rabbit_fifo_client:init(Members),
185+
{ok, F0, []} = rabbit_fifo_client:enqueue(ClusterName, 1, msg1, F00),
186+
F1 = F0,
187+
{_, _, F2} = process_ra_events(receive_ra_events(1, 0), ClusterName, F1),
188+
{ok, _, {_, _, MsgId, _, _}, F3} =
189+
rabbit_fifo_client:dequeue(ClusterName, <<"tag">>, unsettled, F2),
190+
{F4, _} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F3),
191+
RaEvt = receive
192+
{ra_event, Leader, {applied, _} = Evt} ->
193+
Evt
194+
after 5000 ->
195+
ct:fail("no ra event")
196+
end,
197+
NextLeader = hd(Followers),
198+
timer:sleep(100),
199+
ok = ra:transfer_leadership(Leader, NextLeader),
200+
%% get rid of leader change event
201+
receive
202+
{ra_event, _, {machine, leader_change}} ->
203+
ok
204+
after 5000 ->
205+
ct:fail("no machine leader_change event")
206+
end,
207+
%% client will "send" to the old leader
208+
meck:expect(ra, pipeline_command, fun (_, _, _, _) -> ok end),
209+
{ok, F5, []} = rabbit_fifo_client:enqueue(ClusterName, 2, msg2, F4),
210+
?assertEqual(2, rabbit_fifo_client:pending_size(F5)),
211+
meck:unload(ra),
212+
%% pass the ra event with the new leader as if the entry was applied
213+
%% by the new leader, not the old
214+
{ok, F6, _} = rabbit_fifo_client:handle_ra_event(ClusterName, NextLeader,
215+
RaEvt, F5),
216+
%% this should resend the never applied enqueue
217+
{_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6),
218+
?assertEqual(0, rabbit_fifo_client:pending_size(F7)),
219+
220+
flush(),
221+
ok.
222+
165223
rabbit_fifo_returns_correlation(Config) ->
166224
ClusterName = ?config(cluster_name, Config),
167225
ServerId = ?config(node_id, Config),

0 commit comments

Comments
 (0)