Skip to content

Commit 2f89bd9

Browse files
authored
Merge pull request #13095 from rabbitmq/qq-resend-pending-commands-on-applied
QQ: resend pending commands when new leader detected on applied notif…
2 parents d4dbb27 + d31b9aa commit 2f89bd9

File tree

2 files changed

+96
-11
lines changed

2 files changed

+96
-11
lines changed

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -586,34 +586,58 @@ update_machine_state(Server, Conf) ->
586586
ra_server_proc:ra_event_body(), state()) ->
587587
{internal, Correlators :: [term()], rabbit_queue_type:actions(), state()} |
588588
{rabbit_fifo:client_msg(), state()} | {eol, rabbit_queue_type:actions()}.
589-
handle_ra_event(QName, From, {applied, Seqs},
590-
#state{cfg = #cfg{soft_limit = SftLmt}} = State0) ->
589+
handle_ra_event(QName, Leader, {applied, Seqs},
590+
#state{leader = OldLeader,
591+
cfg = #cfg{soft_limit = SftLmt}} = State0) ->
591592

592593
{Corrs, ActionsRev, State1} = lists:foldl(fun seq_applied/2,
593-
{[], [], State0#state{leader = From}},
594+
{[], [], State0#state{leader = Leader}},
594595
Seqs),
596+
597+
%% if the leader has changed we need to resend any pending commands remaining
598+
%% after the applied processing
599+
State2 = if OldLeader =/= Leader ->
600+
%% double check before resending as applied notifications
601+
%% can arrive from old leaders in any order
602+
case ra:members(Leader) of
603+
{ok, _, ActualLeader}
604+
when ActualLeader =/= OldLeader ->
605+
%% there is a new leader
606+
rabbit_log:debug("~ts: Detected QQ leader change (applied) "
607+
"from ~w to ~w, "
608+
"resending ~b pending commands",
609+
[?MODULE, OldLeader, ActualLeader,
610+
maps:size(State1#state.pending)]),
611+
resend_all_pending(State1#state{leader = ActualLeader});
612+
_ ->
613+
State1
614+
end;
615+
true ->
616+
State1
617+
end,
618+
595619
Actions0 = lists:reverse(ActionsRev),
596620
Actions = case Corrs of
597621
[] ->
598622
Actions0;
599623
_ ->
600-
%%TODO consider using lists:foldr/3 above because
624+
%%TODO: consider using lists:foldr/3 above because
601625
%% Corrs is returned in the wrong order here.
602626
%% The wrong order does not matter much because the channel sorts the
603627
%% sequence numbers before confirming to the client. But rabbit_fifo_client
604628
%% is sequence numer agnostic: it handles any correlation terms.
605629
[{settled, QName, Corrs} | Actions0]
606630
end,
607-
case map_size(State1#state.pending) < SftLmt of
608-
true when State1#state.slow == true ->
631+
case map_size(State2#state.pending) < SftLmt of
632+
true when State2#state.slow == true ->
609633
% we have exited soft limit state
610634
% send any unsent commands and cancel the time as
611635
% TODO: really the timer should only be cancelled when the channel
612636
% exits flow state (which depends on the state of all queues the
613637
% channel is interacting with)
614638
% but the fact the queue has just applied suggests
615639
% it's ok to cancel here anyway
616-
State2 = cancel_timer(State1#state{slow = false,
640+
State3 = cancel_timer(State2#state{slow = false,
617641
unsent_commands = #{}}),
618642
% build up a list of commands to issue
619643
Commands = maps:fold(
@@ -622,16 +646,16 @@ handle_ra_event(QName, From, {applied, Seqs},
622646
add_command(Cid, return, Returns,
623647
add_command(Cid, discard,
624648
Discards, Acc)))
625-
end, [], State1#state.unsent_commands),
626-
ServerId = pick_server(State2),
649+
end, [], State2#state.unsent_commands),
650+
ServerId = pick_server(State3),
627651
%% send all the settlements and returns
628652
State = lists:foldl(fun (C, S0) ->
629653
send_command(ServerId, undefined, C,
630654
normal, S0)
631-
end, State2, Commands),
655+
end, State3, Commands),
632656
{ok, State, [{unblock, cluster_name(State)} | Actions]};
633657
_ ->
634-
{ok, State1, Actions}
658+
{ok, State2, Actions}
635659
end;
636660
handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
637661
handle_delivery(QName, From, Del, State0);

deps/rabbit/test/rabbit_fifo_int_SUITE.erl

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ all_tests() ->
2323
[
2424
basics,
2525
return,
26+
lost_return_is_resent_on_applied_after_leader_change,
2627
rabbit_fifo_returns_correlation,
2728
resends_lost_command,
2829
returns,
@@ -56,9 +57,11 @@ init_per_group(_, Config) ->
5657
PrivDir = ?config(priv_dir, Config),
5758
_ = application:load(ra),
5859
ok = application:set_env(ra, data_dir, PrivDir),
60+
application:ensure_all_started(logger),
5961
application:ensure_all_started(ra),
6062
application:ensure_all_started(lg),
6163
SysCfg = ra_system:default_config(),
64+
ra_env:configure_logger(logger),
6265
ra_system:start(SysCfg#{name => ?RA_SYSTEM}),
6366
Config.
6467

@@ -67,6 +70,7 @@ end_per_group(_, Config) ->
6770
Config.
6871

6972
init_per_testcase(TestCase, Config) ->
73+
ok = logger:set_primary_config(level, all),
7074
meck:new(rabbit_quorum_queue, [passthrough]),
7175
meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end),
7276
meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end),
@@ -162,6 +166,63 @@ return(Config) ->
162166
rabbit_quorum_queue:stop_server(ServerId),
163167
ok.
164168

169+
lost_return_is_resent_on_applied_after_leader_change(Config) ->
170+
%% this test handles a case where a combination of a lost/overwritten
171+
%% command and a leader change could result in a client never detecting
172+
%% a new leader and thus never resends whatever command was overwritten
173+
%% in the prior term. The fix is to handle leader changes when processing
174+
%% the {appliekd, _} ra event.
175+
ClusterName = ?config(cluster_name, Config),
176+
ServerId = ?config(node_id, Config),
177+
ServerId2 = ?config(node_id2, Config),
178+
ServerId3 = ?config(node_id3, Config),
179+
Members = [ServerId, ServerId2, ServerId3],
180+
181+
ok = meck:new(ra, [passthrough]),
182+
ok = start_cluster(ClusterName, Members),
183+
184+
{ok, _, Leader} = ra:members(ServerId),
185+
Followers = lists:delete(Leader, Members),
186+
187+
F00 = rabbit_fifo_client:init(Members),
188+
{ok, F0, []} = rabbit_fifo_client:enqueue(ClusterName, 1, msg1, F00),
189+
F1 = F0,
190+
{_, _, F2} = process_ra_events(receive_ra_events(1, 0), ClusterName, F1),
191+
{ok, _, {_, _, MsgId, _, _}, F3} =
192+
rabbit_fifo_client:dequeue(ClusterName, <<"tag">>, unsettled, F2),
193+
{F4, _} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F3),
194+
RaEvt = receive
195+
{ra_event, Leader, {applied, _} = Evt} ->
196+
Evt
197+
after 5000 ->
198+
ct:fail("no ra event")
199+
end,
200+
NextLeader = hd(Followers),
201+
timer:sleep(100),
202+
ok = ra:transfer_leadership(Leader, NextLeader),
203+
%% get rid of leader change event
204+
receive
205+
{ra_event, _, {machine, leader_change}} ->
206+
ok
207+
after 5000 ->
208+
ct:fail("no machine leader_change event")
209+
end,
210+
%% client will "send" to the old leader
211+
meck:expect(ra, pipeline_command, fun (_, _, _, _) -> ok end),
212+
{ok, F5, []} = rabbit_fifo_client:enqueue(ClusterName, 2, msg2, F4),
213+
?assertEqual(2, rabbit_fifo_client:pending_size(F5)),
214+
meck:unload(ra),
215+
%% pass the ra event with the new leader as if the entry was applied
216+
%% by the new leader, not the old
217+
{ok, F6, _} = rabbit_fifo_client:handle_ra_event(ClusterName, NextLeader,
218+
RaEvt, F5),
219+
%% this should resend the never applied enqueue
220+
{_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6),
221+
?assertEqual(0, rabbit_fifo_client:pending_size(F7)),
222+
223+
flush(),
224+
ok.
225+
165226
rabbit_fifo_returns_correlation(Config) ->
166227
ClusterName = ?config(cluster_name, Config),
167228
ServerId = ?config(node_id, Config),

0 commit comments

Comments
 (0)