Skip to content

Commit 4aba0d2

Browse files
committed
QQ: resend pending commands when new leader detected on applied notification.
When a leader changes all enqueuer and consumer processes are notified from the `state_enter(leader,` callback. However a new leader may not yet have applied all commands that the old leader had. If any of those commands is a checkout or a register_enqueuer command these processes will not be notified of the new leader and thus may never resend their pending commands. The new leader will however send an applied notification when it does apply these entries and these are always sent from the leader process so can also be used to trigger pending resends. This commit implements that.
1 parent a8d848f commit 4aba0d2

File tree

1 file changed

+26
-11
lines changed

1 file changed

+26
-11
lines changed

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -586,34 +586,49 @@ update_machine_state(Server, Conf) ->
586586
ra_server_proc:ra_event_body(), state()) ->
587587
{internal, Correlators :: [term()], rabbit_queue_type:actions(), state()} |
588588
{rabbit_fifo:client_msg(), state()} | {eol, rabbit_queue_type:actions()}.
589-
handle_ra_event(QName, From, {applied, Seqs},
590-
#state{cfg = #cfg{soft_limit = SftLmt}} = State0) ->
589+
handle_ra_event(QName, Leader, {applied, Seqs},
590+
#state{leader = OldLeader,
591+
cfg = #cfg{soft_limit = SftLmt}} = State0) ->
591592

592593
{Corrs, ActionsRev, State1} = lists:foldl(fun seq_applied/2,
593-
{[], [], State0#state{leader = From}},
594+
{[], [], State0#state{leader = Leader}},
594595
Seqs),
596+
597+
%% if the leader has changed we need to resend any pending commands remaining
598+
%% after the applied processing
599+
State2 = if OldLeader =/= Leader ->
600+
rabbit_log:debug("~ts: Detected QQ leader change (applied) "
601+
"from ~w to ~w, "
602+
"resending ~b pending commands",
603+
[?MODULE, OldLeader, Leader,
604+
maps:size(State1#state.pending)]),
605+
resend_all_pending(State1);
606+
true ->
607+
State1
608+
end,
609+
595610
Actions0 = lists:reverse(ActionsRev),
596611
Actions = case Corrs of
597612
[] ->
598613
Actions0;
599614
_ ->
600-
%%TODO consider using lists:foldr/3 above because
615+
%%TODO: consider using lists:foldr/3 above because
601616
%% Corrs is returned in the wrong order here.
602617
%% The wrong order does not matter much because the channel sorts the
603618
%% sequence numbers before confirming to the client. But rabbit_fifo_client
604619
%% is sequence numer agnostic: it handles any correlation terms.
605620
[{settled, QName, Corrs} | Actions0]
606621
end,
607-
case map_size(State1#state.pending) < SftLmt of
608-
true when State1#state.slow == true ->
622+
case map_size(State2#state.pending) < SftLmt of
623+
true when State2#state.slow == true ->
609624
% we have exited soft limit state
610625
% send any unsent commands and cancel the time as
611626
% TODO: really the timer should only be cancelled when the channel
612627
% exits flow state (which depends on the state of all queues the
613628
% channel is interacting with)
614629
% but the fact the queue has just applied suggests
615630
% it's ok to cancel here anyway
616-
State2 = cancel_timer(State1#state{slow = false,
631+
State3 = cancel_timer(State2#state{slow = false,
617632
unsent_commands = #{}}),
618633
% build up a list of commands to issue
619634
Commands = maps:fold(
@@ -622,16 +637,16 @@ handle_ra_event(QName, From, {applied, Seqs},
622637
add_command(Cid, return, Returns,
623638
add_command(Cid, discard,
624639
Discards, Acc)))
625-
end, [], State1#state.unsent_commands),
626-
ServerId = pick_server(State2),
640+
end, [], State2#state.unsent_commands),
641+
ServerId = pick_server(State3),
627642
%% send all the settlements and returns
628643
State = lists:foldl(fun (C, S0) ->
629644
send_command(ServerId, undefined, C,
630645
normal, S0)
631-
end, State2, Commands),
646+
end, State3, Commands),
632647
{ok, State, [{unblock, cluster_name(State)} | Actions]};
633648
_ ->
634-
{ok, State1, Actions}
649+
{ok, State2, Actions}
635650
end;
636651
handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
637652
handle_delivery(QName, From, Del, State0);

0 commit comments

Comments
 (0)