Skip to content

Commit d31b9aa

Browse files
committed
QQ: resend pending commands when new leader detected on applied notification.
When a leader changes all enqueuer and consumer processes are notified from the `state_enter(leader,` callback. However a new leader may not yet have applied all commands that the old leader had. If any of those commands is a checkout or a register_enqueuer command these processes will not be notified of the new leader and thus may never resend their pending commands. The new leader will however send an applied notification when it does apply these entries and these are always sent from the leader process so can also be used to trigger pending resends. This commit implements that.
1 parent a51d8a5 commit d31b9aa

File tree

2 files changed

+96
-11
lines changed

2 files changed

+96
-11
lines changed

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -586,34 +586,58 @@ update_machine_state(Server, Conf) ->
586586
ra_server_proc:ra_event_body(), state()) ->
587587
{internal, Correlators :: [term()], rabbit_queue_type:actions(), state()} |
588588
{rabbit_fifo:client_msg(), state()} | {eol, rabbit_queue_type:actions()}.
589-
handle_ra_event(QName, From, {applied, Seqs},
590-
#state{cfg = #cfg{soft_limit = SftLmt}} = State0) ->
589+
handle_ra_event(QName, Leader, {applied, Seqs},
590+
#state{leader = OldLeader,
591+
cfg = #cfg{soft_limit = SftLmt}} = State0) ->
591592

592593
{Corrs, ActionsRev, State1} = lists:foldl(fun seq_applied/2,
593-
{[], [], State0#state{leader = From}},
594+
{[], [], State0#state{leader = Leader}},
594595
Seqs),
596+
597+
%% if the leader has changed we need to resend any pending commands remaining
598+
%% after the applied processing
599+
State2 = if OldLeader =/= Leader ->
600+
%% double check before resending as applied notifications
601+
%% can arrive from old leaders in any order
602+
case ra:members(Leader) of
603+
{ok, _, ActualLeader}
604+
when ActualLeader =/= OldLeader ->
605+
%% there is a new leader
606+
rabbit_log:debug("~ts: Detected QQ leader change (applied) "
607+
"from ~w to ~w, "
608+
"resending ~b pending commands",
609+
[?MODULE, OldLeader, ActualLeader,
610+
maps:size(State1#state.pending)]),
611+
resend_all_pending(State1#state{leader = ActualLeader});
612+
_ ->
613+
State1
614+
end;
615+
true ->
616+
State1
617+
end,
618+
595619
Actions0 = lists:reverse(ActionsRev),
596620
Actions = case Corrs of
597621
[] ->
598622
Actions0;
599623
_ ->
600-
%%TODO consider using lists:foldr/3 above because
624+
%%TODO: consider using lists:foldr/3 above because
601625
%% Corrs is returned in the wrong order here.
602626
%% The wrong order does not matter much because the channel sorts the
603627
%% sequence numbers before confirming to the client. But rabbit_fifo_client
604628
%% is sequence numer agnostic: it handles any correlation terms.
605629
[{settled, QName, Corrs} | Actions0]
606630
end,
607-
case map_size(State1#state.pending) < SftLmt of
608-
true when State1#state.slow == true ->
631+
case map_size(State2#state.pending) < SftLmt of
632+
true when State2#state.slow == true ->
609633
% we have exited soft limit state
610634
% send any unsent commands and cancel the time as
611635
% TODO: really the timer should only be cancelled when the channel
612636
% exits flow state (which depends on the state of all queues the
613637
% channel is interacting with)
614638
% but the fact the queue has just applied suggests
615639
% it's ok to cancel here anyway
616-
State2 = cancel_timer(State1#state{slow = false,
640+
State3 = cancel_timer(State2#state{slow = false,
617641
unsent_commands = #{}}),
618642
% build up a list of commands to issue
619643
Commands = maps:fold(
@@ -622,16 +646,16 @@ handle_ra_event(QName, From, {applied, Seqs},
622646
add_command(Cid, return, Returns,
623647
add_command(Cid, discard,
624648
Discards, Acc)))
625-
end, [], State1#state.unsent_commands),
626-
ServerId = pick_server(State2),
649+
end, [], State2#state.unsent_commands),
650+
ServerId = pick_server(State3),
627651
%% send all the settlements and returns
628652
State = lists:foldl(fun (C, S0) ->
629653
send_command(ServerId, undefined, C,
630654
normal, S0)
631-
end, State2, Commands),
655+
end, State3, Commands),
632656
{ok, State, [{unblock, cluster_name(State)} | Actions]};
633657
_ ->
634-
{ok, State1, Actions}
658+
{ok, State2, Actions}
635659
end;
636660
handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
637661
handle_delivery(QName, From, Del, State0);

deps/rabbit/test/rabbit_fifo_int_SUITE.erl

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ all_tests() ->
2323
[
2424
basics,
2525
return,
26+
lost_return_is_resent_on_applied_after_leader_change,
2627
rabbit_fifo_returns_correlation,
2728
resends_lost_command,
2829
returns,
@@ -56,9 +57,11 @@ init_per_group(_, Config) ->
5657
PrivDir = ?config(priv_dir, Config),
5758
_ = application:load(ra),
5859
ok = application:set_env(ra, data_dir, PrivDir),
60+
application:ensure_all_started(logger),
5961
application:ensure_all_started(ra),
6062
application:ensure_all_started(lg),
6163
SysCfg = ra_system:default_config(),
64+
ra_env:configure_logger(logger),
6265
ra_system:start(SysCfg#{name => ?RA_SYSTEM}),
6366
Config.
6467

@@ -67,6 +70,7 @@ end_per_group(_, Config) ->
6770
Config.
6871

6972
init_per_testcase(TestCase, Config) ->
73+
ok = logger:set_primary_config(level, all),
7074
meck:new(rabbit_quorum_queue, [passthrough]),
7175
meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end),
7276
meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end),
@@ -162,6 +166,63 @@ return(Config) ->
162166
rabbit_quorum_queue:stop_server(ServerId),
163167
ok.
164168

169+
lost_return_is_resent_on_applied_after_leader_change(Config) ->
170+
%% this test handles a case where a combination of a lost/overwritten
171+
%% command and a leader change could result in a client never detecting
172+
%% a new leader and thus never resends whatever command was overwritten
173+
%% in the prior term. The fix is to handle leader changes when processing
174+
%% the {appliekd, _} ra event.
175+
ClusterName = ?config(cluster_name, Config),
176+
ServerId = ?config(node_id, Config),
177+
ServerId2 = ?config(node_id2, Config),
178+
ServerId3 = ?config(node_id3, Config),
179+
Members = [ServerId, ServerId2, ServerId3],
180+
181+
ok = meck:new(ra, [passthrough]),
182+
ok = start_cluster(ClusterName, Members),
183+
184+
{ok, _, Leader} = ra:members(ServerId),
185+
Followers = lists:delete(Leader, Members),
186+
187+
F00 = rabbit_fifo_client:init(Members),
188+
{ok, F0, []} = rabbit_fifo_client:enqueue(ClusterName, 1, msg1, F00),
189+
F1 = F0,
190+
{_, _, F2} = process_ra_events(receive_ra_events(1, 0), ClusterName, F1),
191+
{ok, _, {_, _, MsgId, _, _}, F3} =
192+
rabbit_fifo_client:dequeue(ClusterName, <<"tag">>, unsettled, F2),
193+
{F4, _} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F3),
194+
RaEvt = receive
195+
{ra_event, Leader, {applied, _} = Evt} ->
196+
Evt
197+
after 5000 ->
198+
ct:fail("no ra event")
199+
end,
200+
NextLeader = hd(Followers),
201+
timer:sleep(100),
202+
ok = ra:transfer_leadership(Leader, NextLeader),
203+
%% get rid of leader change event
204+
receive
205+
{ra_event, _, {machine, leader_change}} ->
206+
ok
207+
after 5000 ->
208+
ct:fail("no machine leader_change event")
209+
end,
210+
%% client will "send" to the old leader
211+
meck:expect(ra, pipeline_command, fun (_, _, _, _) -> ok end),
212+
{ok, F5, []} = rabbit_fifo_client:enqueue(ClusterName, 2, msg2, F4),
213+
?assertEqual(2, rabbit_fifo_client:pending_size(F5)),
214+
meck:unload(ra),
215+
%% pass the ra event with the new leader as if the entry was applied
216+
%% by the new leader, not the old
217+
{ok, F6, _} = rabbit_fifo_client:handle_ra_event(ClusterName, NextLeader,
218+
RaEvt, F5),
219+
%% this should resend the never applied enqueue
220+
{_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6),
221+
?assertEqual(0, rabbit_fifo_client:pending_size(F7)),
222+
223+
flush(),
224+
ok.
225+
165226
rabbit_fifo_returns_correlation(Config) ->
166227
ClusterName = ?config(cluster_name, Config),
167228
ServerId = ?config(node_id, Config),

0 commit comments

Comments
 (0)