diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 28fb854fde83..68972b6b4880 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -586,26 +586,50 @@ update_machine_state(Server, Conf) -> ra_server_proc:ra_event_body(), state()) -> {internal, Correlators :: [term()], rabbit_queue_type:actions(), state()} | {rabbit_fifo:client_msg(), state()} | {eol, rabbit_queue_type:actions()}. -handle_ra_event(QName, From, {applied, Seqs}, - #state{cfg = #cfg{soft_limit = SftLmt}} = State0) -> +handle_ra_event(QName, Leader, {applied, Seqs}, + #state{leader = OldLeader, + cfg = #cfg{soft_limit = SftLmt}} = State0) -> {Corrs, ActionsRev, State1} = lists:foldl(fun seq_applied/2, - {[], [], State0#state{leader = From}}, + {[], [], State0#state{leader = Leader}}, Seqs), + + %% if the leader has changed we need to resend any pending commands remaining + %% after the applied processing + State2 = if OldLeader =/= Leader -> + %% double check before resending as applied notifications + %% can arrive from old leaders in any order + case ra:members(Leader) of + {ok, _, ActualLeader} + when ActualLeader =/= OldLeader -> + %% there is a new leader + rabbit_log:debug("~ts: Detected QQ leader change (applied) " + "from ~w to ~w, " + "resending ~b pending commands", + [?MODULE, OldLeader, ActualLeader, + maps:size(State1#state.pending)]), + resend_all_pending(State1#state{leader = ActualLeader}); + _ -> + State1 + end; + true -> + State1 + end, + Actions0 = lists:reverse(ActionsRev), Actions = case Corrs of [] -> Actions0; _ -> - %%TODO consider using lists:foldr/3 above because + %%TODO: consider using lists:foldr/3 above because %% Corrs is returned in the wrong order here. %% The wrong order does not matter much because the channel sorts the %% sequence numbers before confirming to the client. But rabbit_fifo_client %% is sequence numer agnostic: it handles any correlation terms. [{settled, QName, Corrs} | Actions0] end, - case map_size(State1#state.pending) < SftLmt of - true when State1#state.slow == true -> + case map_size(State2#state.pending) < SftLmt of + true when State2#state.slow == true -> % we have exited soft limit state % send any unsent commands and cancel the time as % TODO: really the timer should only be cancelled when the channel @@ -613,7 +637,7 @@ handle_ra_event(QName, From, {applied, Seqs}, % channel is interacting with) % but the fact the queue has just applied suggests % it's ok to cancel here anyway - State2 = cancel_timer(State1#state{slow = false, + State3 = cancel_timer(State2#state{slow = false, unsent_commands = #{}}), % build up a list of commands to issue Commands = maps:fold( @@ -622,16 +646,16 @@ handle_ra_event(QName, From, {applied, Seqs}, add_command(Cid, return, Returns, add_command(Cid, discard, Discards, Acc))) - end, [], State1#state.unsent_commands), - ServerId = pick_server(State2), + end, [], State2#state.unsent_commands), + ServerId = pick_server(State3), %% send all the settlements and returns State = lists:foldl(fun (C, S0) -> send_command(ServerId, undefined, C, normal, S0) - end, State2, Commands), + end, State3, Commands), {ok, State, [{unblock, cluster_name(State)} | Actions]}; _ -> - {ok, State1, Actions} + {ok, State2, Actions} end; handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> handle_delivery(QName, From, Del, State0); diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index 202a729b6447..21836792831c 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -23,6 +23,7 @@ all_tests() -> [ basics, return, + lost_return_is_resent_on_applied_after_leader_change, rabbit_fifo_returns_correlation, resends_lost_command, returns, @@ -56,9 +57,11 @@ init_per_group(_, Config) -> PrivDir = ?config(priv_dir, Config), _ = application:load(ra), ok = application:set_env(ra, data_dir, PrivDir), + application:ensure_all_started(logger), application:ensure_all_started(ra), application:ensure_all_started(lg), SysCfg = ra_system:default_config(), + ra_env:configure_logger(logger), ra_system:start(SysCfg#{name => ?RA_SYSTEM}), Config. @@ -67,6 +70,7 @@ end_per_group(_, Config) -> Config. init_per_testcase(TestCase, Config) -> + ok = logger:set_primary_config(level, all), meck:new(rabbit_quorum_queue, [passthrough]), meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end), meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end), @@ -162,6 +166,63 @@ return(Config) -> rabbit_quorum_queue:stop_server(ServerId), ok. +lost_return_is_resent_on_applied_after_leader_change(Config) -> + %% this test handles a case where a combination of a lost/overwritten + %% command and a leader change could result in a client never detecting + %% a new leader and thus never resends whatever command was overwritten + %% in the prior term. The fix is to handle leader changes when processing + %% the {appliekd, _} ra event. + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ServerId2 = ?config(node_id2, Config), + ServerId3 = ?config(node_id3, Config), + Members = [ServerId, ServerId2, ServerId3], + + ok = meck:new(ra, [passthrough]), + ok = start_cluster(ClusterName, Members), + + {ok, _, Leader} = ra:members(ServerId), + Followers = lists:delete(Leader, Members), + + F00 = rabbit_fifo_client:init(Members), + {ok, F0, []} = rabbit_fifo_client:enqueue(ClusterName, 1, msg1, F00), + F1 = F0, + {_, _, F2} = process_ra_events(receive_ra_events(1, 0), ClusterName, F1), + {ok, _, {_, _, MsgId, _, _}, F3} = + rabbit_fifo_client:dequeue(ClusterName, <<"tag">>, unsettled, F2), + {F4, _} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F3), + RaEvt = receive + {ra_event, Leader, {applied, _} = Evt} -> + Evt + after 5000 -> + ct:fail("no ra event") + end, + NextLeader = hd(Followers), + timer:sleep(100), + ok = ra:transfer_leadership(Leader, NextLeader), + %% get rid of leader change event + receive + {ra_event, _, {machine, leader_change}} -> + ok + after 5000 -> + ct:fail("no machine leader_change event") + end, + %% client will "send" to the old leader + meck:expect(ra, pipeline_command, fun (_, _, _, _) -> ok end), + {ok, F5, []} = rabbit_fifo_client:enqueue(ClusterName, 2, msg2, F4), + ?assertEqual(2, rabbit_fifo_client:pending_size(F5)), + meck:unload(ra), + %% pass the ra event with the new leader as if the entry was applied + %% by the new leader, not the old + {ok, F6, _} = rabbit_fifo_client:handle_ra_event(ClusterName, NextLeader, + RaEvt, F5), + %% this should resend the never applied enqueue + {_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6), + ?assertEqual(0, rabbit_fifo_client:pending_size(F7)), + + flush(), + ok. + rabbit_fifo_returns_correlation(Config) -> ClusterName = ?config(cluster_name, Config), ServerId = ?config(node_id, Config),