diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index fa9c0d99540a..0c195b04ab29 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -629,6 +629,14 @@ apply(#{machine_version := Vsn} = Meta, E#enqueuer{status = up}; (_, E) -> E end, Enqs0), + %% send leader change events to all disconnected enqueuers to prompt them + %% to resend any messages stuck during disconnection, + %% ofc it may not be a leader change per se + Effects0 = maps:fold(fun(P, _E, Acc) when node(P) =:= Node -> + [{send_msg, P, leader_change, ra_event} | Acc]; + (_, _E, Acc) -> Acc + end, Monitors, Enqs0), + ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), %% mark all consumers as up {State1, Effects1} = @@ -643,7 +651,7 @@ apply(#{machine_version := Vsn} = Meta, SAcc), EAcc1}; (_, _, Acc) -> Acc - end, {State0, Monitors}, Cons0, Vsn), + end, {State0, Effects0}, Cons0, Vsn), Waiting = update_waiting_consumer_status(Node, State1, up), State2 = State1#?STATE{enqueuers = Enqs1, waiting_consumers = Waiting}, diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index f2f50301da5e..462d80362e93 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -160,7 +160,7 @@ enqueue(_QName, _Correlation, _Msg, #state{queue_status = reject_publish, cfg = #cfg{}} = State) -> {reject_publish, State}; -enqueue(QName, Correlation, Msg, +enqueue(_QName, Correlation, Msg, #state{slow = WasSlow, pending = Pending, queue_status = go, @@ -176,8 +176,9 @@ enqueue(QName, Correlation, Msg, next_seq = Seq + 1, next_enqueue_seq = EnqueueSeq + 1, slow = IsSlow}, + if IsSlow andalso not WasSlow -> - {ok, set_timer(QName, State), [{block, cluster_name(State)}]}; + {ok, State, [{block, cluster_name(State)}]}; true -> {ok, State, []} end. @@ -632,10 +633,10 @@ handle_ra_event(QName, Leader, {applied, Seqs}, when ActualLeader =/= OldLeader -> %% there is a new leader ?LOG_DEBUG("~ts: Detected QQ leader change (applied) " - "from ~w to ~w, " - "resending ~b pending commands", - [?MODULE, OldLeader, ActualLeader, - maps:size(State1#state.pending)]), + "from ~w to ~w, " + "resending ~b pending commands", + [?MODULE, OldLeader, ActualLeader, + maps:size(State1#state.pending)]), resend_all_pending(State1#state{leader = ActualLeader}); _ -> State1 @@ -702,9 +703,9 @@ handle_ra_event(QName, Leader, {machine, leader_change}, %% we need to update leader %% and resend any pending commands ?LOG_DEBUG("~ts: ~s Detected QQ leader change from ~w to ~w, " - "resending ~b pending commands", - [rabbit_misc:rs(QName), ?MODULE, OldLeader, - Leader, maps:size(Pending)]), + "resending ~b pending commands", + [rabbit_misc:rs(QName), ?MODULE, OldLeader, + Leader, maps:size(Pending)]), State = resend_all_pending(State0#state{leader = Leader}), {ok, State, []}; handle_ra_event(_QName, _From, {rejected, {not_leader, Leader, _Seq}}, @@ -714,21 +715,27 @@ handle_ra_event(QName, _From, {rejected, {not_leader, Leader, _Seq}}, #state{leader = OldLeader, pending = Pending} = State0) -> ?LOG_DEBUG("~ts: ~s Detected QQ leader change (rejection) from ~w to ~w, " - "resending ~b pending commands", - [rabbit_misc:rs(QName), ?MODULE, OldLeader, - Leader, maps:size(Pending)]), + "resending ~b pending commands", + [rabbit_misc:rs(QName), ?MODULE, OldLeader, + Leader, maps:size(Pending)]), State = resend_all_pending(State0#state{leader = Leader}), {ok, cancel_timer(State), []}; handle_ra_event(_QName, _From, {rejected, {not_leader, _UndefinedMaybe, _Seq}}, State0) -> % TODO: how should these be handled? re-sent on timer or try random {ok, State0, []}; -handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}} = State0) -> +handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}, + leader = OldLeader, + pending = Pending} = State0) -> case find_leader(Servers) of undefined -> %% still no leader, set the timer again {ok, set_timer(QName, State0), []}; Leader -> + ?LOG_DEBUG("~ts: ~s Pending applied Timeout ~w to ~w, " + "resending ~b pending commands", + [rabbit_misc:rs(QName), ?MODULE, OldLeader, + Leader, maps:size(Pending)]), State = resend_all_pending(State0#state{leader = Leader}), {ok, State, []} end; @@ -743,7 +750,7 @@ handle_ra_event(QName, Leader, close_cached_segments, case now_ms() > Last + ?CACHE_SEG_TIMEOUT of true -> ?LOG_DEBUG("~ts: closing_cached_segments", - [rabbit_misc:rs(QName)]), + [rabbit_misc:rs(QName)]), %% its been long enough, evict all _ = ra_flru:evict_all(Cache), State#state{cached_segments = undefined}; @@ -804,12 +811,16 @@ seq_applied({Seq, Response}, {Corrs, Actions0, #state{} = State0}) -> %% sequences aren't guaranteed to be applied in order as enqueues are %% low priority commands and may be overtaken by others with a normal priority. + %% + %% if the response is 'not_enqueued' we need to still keep the pending + %% command for a later resend {Actions, State} = maybe_add_action(Response, Actions0, State0), case maps:take(Seq, State#state.pending) of - {{undefined, _}, Pending} -> + {{undefined, _}, Pending} + when Response =/= not_enqueued -> {Corrs, Actions, State#state{pending = Pending}}; {{Corr, _}, Pending} - when Response /= not_enqueued -> + when Response =/= not_enqueued -> {[Corr | Corrs], Actions, State#state{pending = Pending}}; _ -> {Corrs, Actions, State} diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index dbf6d8a821c6..1a3fed31227a 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -121,6 +121,7 @@ groups() -> ]}, {clustered_with_partitions, [], [ + partitioned_publisher, reconnect_consumer_and_publish, reconnect_consumer_and_wait, reconnect_consumer_and_wait_channel_down @@ -285,7 +286,8 @@ end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). -init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; +init_per_testcase(Testcase, Config) when Testcase == partitioned_publisher; + Testcase == reconnect_consumer_and_publish; Testcase == reconnect_consumer_and_wait; Testcase == reconnect_consumer_and_wait_channel_down -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), @@ -383,7 +385,8 @@ merge_app_env(Config) -> {rabbit, [{core_metrics_gc_interval, 100}]}), {ra, [{min_wal_roll_over_interval, 30000}]}). -end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; +end_per_testcase(Testcase, Config) when Testcase == partitioned_publisher; + Testcase == reconnect_consumer_and_publish; Testcase == reconnect_consumer_and_wait; Testcase == reconnect_consumer_and_wait_channel_down -> Config1 = rabbit_ct_helpers:run_steps(Config, @@ -3026,6 +3029,51 @@ cleanup_data_dir(Config) -> ?awaitMatch(false, filelib:is_dir(DataDir2), 30000), ok. +partitioned_publisher(Config) -> + [Node0, Node1, Node2] = Nodes = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch0 = rabbit_ct_client_helpers:open_channel(Config, Node0), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Node1), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Node1}} = ra:members({RaName, Node1}), + + #'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}), + #'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}), + %% first publish with confirm + publish_confirm(Ch0, QQ), + + %% then partition + rabbit_ct_broker_helpers:block_traffic_between(Node0, Node1), + rabbit_ct_broker_helpers:block_traffic_between(Node0, Node2), + + %% check that we can still publish from another channel that is on the + %% majority side + publish_confirm(Ch1, QQ), + + %% publish one from partitioned node that will not go through + publish(Ch0, QQ), + + %% wait for disconnections + rabbit_ct_helpers:await_condition( + fun() -> + ConnectedNodes = erpc:call(Node0, erlang, nodes, []), + not lists:member(Node1, ConnectedNodes) + end, 30000), + + flush(10), + + %% then heal the partition + rabbit_ct_broker_helpers:allow_traffic_between(Node0, Node1), + rabbit_ct_broker_helpers:allow_traffic_between(Node0, Node2), + + publish(Ch0, QQ), + wait_for_messages_ready(Nodes, RaName, 4), + ok. + reconnect_consumer_and_publish(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index 68811230ec0c..c160bd473f5e 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -219,7 +219,9 @@ lost_return_is_resent_on_applied_after_leader_change(Config) -> RaEvt, F5), %% this should resend the never applied enqueue {_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6), - ?assertEqual(0, rabbit_fifo_client:pending_size(F7)), + {_, _, F8} = process_ra_events(receive_ra_events(1, 0), ClusterName, F7), + + ?assertEqual(0, rabbit_fifo_client:pending_size(F8)), flush(), ok.