Skip to content

Commit 6d417bf

Browse files
committed
QQ: resend pending commands when new leader detected on applied notification.
When a leader changes all enqueuer and consumer processes are notified from the `state_enter(leader,` callback. However a new leader may not yet have applied all commands that the old leader had. If any of those commands is a checkout or a register_enqueuer command these processes will not be notified of the new leader and thus may never resend their pending commands. The new leader will however send an applied notification when it does apply these entries and these are always sent from the leader process so can also be used to trigger pending resends. This commit implements that.
1 parent a51d8a5 commit 6d417bf

File tree

2 files changed

+97
-11
lines changed

2 files changed

+97
-11
lines changed

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -586,34 +586,58 @@ update_machine_state(Server, Conf) ->
586586
ra_server_proc:ra_event_body(), state()) ->
587587
{internal, Correlators :: [term()], rabbit_queue_type:actions(), state()} |
588588
{rabbit_fifo:client_msg(), state()} | {eol, rabbit_queue_type:actions()}.
589-
handle_ra_event(QName, From, {applied, Seqs},
590-
#state{cfg = #cfg{soft_limit = SftLmt}} = State0) ->
589+
handle_ra_event(QName, Leader, {applied, Seqs},
590+
#state{leader = OldLeader,
591+
cfg = #cfg{soft_limit = SftLmt}} = State0) ->
591592

592593
{Corrs, ActionsRev, State1} = lists:foldl(fun seq_applied/2,
593-
{[], [], State0#state{leader = From}},
594+
{[], [], State0#state{leader = Leader}},
594595
Seqs),
596+
597+
%% if the leader has changed we need to resend any pending commands remaining
598+
%% after the applied processing
599+
State2 = if OldLeader =/= Leader ->
600+
%% double check before resending as applied notifications
601+
%% can arrive from old leaders in any order
602+
case ra:members(Leader) of
603+
{ok, _, ActualLeader}
604+
when ActualLeader =/= OldLeader ->
605+
%% there is a new leader
606+
rabbit_log:debug("~ts: Detected QQ leader change (applied) "
607+
"from ~w to ~w, "
608+
"resending ~b pending commands",
609+
[?MODULE, OldLeader, Leader,
610+
maps:size(State1#state.pending)]),
611+
resend_all_pending(State1#state{leader = ActualLeader});
612+
_ ->
613+
State1
614+
end;
615+
true ->
616+
State1
617+
end,
618+
595619
Actions0 = lists:reverse(ActionsRev),
596620
Actions = case Corrs of
597621
[] ->
598622
Actions0;
599623
_ ->
600-
%%TODO consider using lists:foldr/3 above because
624+
%%TODO: consider using lists:foldr/3 above because
601625
%% Corrs is returned in the wrong order here.
602626
%% The wrong order does not matter much because the channel sorts the
603627
%% sequence numbers before confirming to the client. But rabbit_fifo_client
604628
%% is sequence numer agnostic: it handles any correlation terms.
605629
[{settled, QName, Corrs} | Actions0]
606630
end,
607-
case map_size(State1#state.pending) < SftLmt of
608-
true when State1#state.slow == true ->
631+
case map_size(State2#state.pending) < SftLmt of
632+
true when State2#state.slow == true ->
609633
% we have exited soft limit state
610634
% send any unsent commands and cancel the time as
611635
% TODO: really the timer should only be cancelled when the channel
612636
% exits flow state (which depends on the state of all queues the
613637
% channel is interacting with)
614638
% but the fact the queue has just applied suggests
615639
% it's ok to cancel here anyway
616-
State2 = cancel_timer(State1#state{slow = false,
640+
State3 = cancel_timer(State2#state{slow = false,
617641
unsent_commands = #{}}),
618642
% build up a list of commands to issue
619643
Commands = maps:fold(
@@ -622,16 +646,16 @@ handle_ra_event(QName, From, {applied, Seqs},
622646
add_command(Cid, return, Returns,
623647
add_command(Cid, discard,
624648
Discards, Acc)))
625-
end, [], State1#state.unsent_commands),
626-
ServerId = pick_server(State2),
649+
end, [], State2#state.unsent_commands),
650+
ServerId = pick_server(State3),
627651
%% send all the settlements and returns
628652
State = lists:foldl(fun (C, S0) ->
629653
send_command(ServerId, undefined, C,
630654
normal, S0)
631-
end, State2, Commands),
655+
end, State3, Commands),
632656
{ok, State, [{unblock, cluster_name(State)} | Actions]};
633657
_ ->
634-
{ok, State1, Actions}
658+
{ok, State2, Actions}
635659
end;
636660
handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
637661
handle_delivery(QName, From, Del, State0);

deps/rabbit/test/rabbit_fifo_int_SUITE.erl

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ all_tests() ->
2323
[
2424
basics,
2525
return,
26+
lost_return_is_resent_on_applied_after_leader_change,
2627
rabbit_fifo_returns_correlation,
2728
resends_lost_command,
2829
returns,
@@ -56,9 +57,11 @@ init_per_group(_, Config) ->
5657
PrivDir = ?config(priv_dir, Config),
5758
_ = application:load(ra),
5859
ok = application:set_env(ra, data_dir, PrivDir),
60+
application:ensure_all_started(logger),
5961
application:ensure_all_started(ra),
6062
application:ensure_all_started(lg),
6163
SysCfg = ra_system:default_config(),
64+
ra_env:configure_logger(logger),
6265
ra_system:start(SysCfg#{name => ?RA_SYSTEM}),
6366
Config.
6467

@@ -67,6 +70,8 @@ end_per_group(_, Config) ->
6770
Config.
6871

6972
init_per_testcase(TestCase, Config) ->
73+
rabbit_ct_helpers:redirect_logger_to_ct_logs(Config),
74+
ok = logger:set_primary_config(level, all),
7075
meck:new(rabbit_quorum_queue, [passthrough]),
7176
meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end),
7277
meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end),
@@ -162,6 +167,63 @@ return(Config) ->
162167
rabbit_quorum_queue:stop_server(ServerId),
163168
ok.
164169

170+
lost_return_is_resent_on_applied_after_leader_change(Config) ->
171+
%% this test handles a case where a combination of a lost/overwritten
172+
%% command and a leader change could result in a client never detecting
173+
%% a new leader and thus never resends whatever command was overwritten
174+
%% in the prior term. The fix is to handle leader changes when processing
175+
%% the {appliekd, _} ra event.
176+
ClusterName = ?config(cluster_name, Config),
177+
ServerId = ?config(node_id, Config),
178+
ServerId2 = ?config(node_id2, Config),
179+
ServerId3 = ?config(node_id3, Config),
180+
Members = [ServerId, ServerId2, ServerId3],
181+
182+
ok = meck:new(ra, [passthrough]),
183+
ok = start_cluster(ClusterName, Members),
184+
185+
{ok, _, Leader} = ra:members(ServerId),
186+
Followers = lists:delete(Leader, Members),
187+
188+
F00 = rabbit_fifo_client:init(Members),
189+
{ok, F0, []} = rabbit_fifo_client:enqueue(ClusterName, 1, msg1, F00),
190+
F1 = F0,
191+
{_, _, F2} = process_ra_events(receive_ra_events(1, 0), ClusterName, F1),
192+
{ok, _, {_, _, MsgId, _, _}, F3} =
193+
rabbit_fifo_client:dequeue(ClusterName, <<"tag">>, unsettled, F2),
194+
{F4, _} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F3),
195+
RaEvt = receive
196+
{ra_event, Leader, {applied, _} = Evt} ->
197+
Evt
198+
after 5000 ->
199+
ct:fail("no ra event")
200+
end,
201+
NextLeader = hd(Followers),
202+
timer:sleep(100),
203+
ok = ra:transfer_leadership(Leader, NextLeader),
204+
%% get rid of leader change event
205+
receive
206+
{ra_event, _, {machine, leader_change}} ->
207+
ok
208+
after 5000 ->
209+
ct:fail("no machine leader_change event")
210+
end,
211+
%% client will "send" to the old leader
212+
meck:expect(ra, pipeline_command, fun (_, _, _, _) -> ok end),
213+
{ok, F5, []} = rabbit_fifo_client:enqueue(ClusterName, 2, msg2, F4),
214+
?assertEqual(2, rabbit_fifo_client:pending_size(F5)),
215+
meck:unload(ra),
216+
%% pass the ra event with the new leader as if the entry was applied
217+
%% by the new leader, not the old
218+
{ok, F6, _} = rabbit_fifo_client:handle_ra_event(ClusterName, NextLeader,
219+
RaEvt, F5),
220+
%% this should resend the never applied enqueue
221+
{_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6),
222+
?assertEqual(0, rabbit_fifo_client:pending_size(F7)),
223+
224+
flush(),
225+
ok.
226+
165227
rabbit_fifo_returns_correlation(Config) ->
166228
ClusterName = ?config(cluster_name, Config),
167229
ServerId = ?config(node_id, Config),

0 commit comments

Comments
 (0)