Skip to content

Commit 2f47fcb

Browse files
Merge pull request #14621 from rabbitmq/mergify/bp/v4.1.x/pr-14605
Quorum queues: fix resend issues after network partition (backport #14589) (backport #14605)
2 parents e2e9288 + 408fe4c commit 2f47fcb

File tree

4 files changed

+89
-20
lines changed

4 files changed

+89
-20
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,14 @@ apply(#{machine_version := Vsn} = Meta,
629629
E#enqueuer{status = up};
630630
(_, E) -> E
631631
end, Enqs0),
632+
%% send leader change events to all disconnected enqueuers to prompt them
633+
%% to resend any messages stuck during disconnection,
634+
%% ofc it may not be a leader change per se
635+
Effects0 = maps:fold(fun(P, _E, Acc) when node(P) =:= Node ->
636+
[{send_msg, P, leader_change, ra_event} | Acc];
637+
(_, _E, Acc) -> Acc
638+
end, Monitors, Enqs0),
639+
632640
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
633641
%% mark all consumers as up
634642
{State1, Effects1} =
@@ -643,7 +651,7 @@ apply(#{machine_version := Vsn} = Meta,
643651
SAcc), EAcc1};
644652
(_, _, Acc) ->
645653
Acc
646-
end, {State0, Monitors}, Cons0, Vsn),
654+
end, {State0, Effects0}, Cons0, Vsn),
647655
Waiting = update_waiting_consumer_status(Node, State1, up),
648656
State2 = State1#?STATE{enqueuers = Enqs1,
649657
waiting_consumers = Waiting},

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ enqueue(_QName, _Correlation, _Msg,
160160
#state{queue_status = reject_publish,
161161
cfg = #cfg{}} = State) ->
162162
{reject_publish, State};
163-
enqueue(QName, Correlation, Msg,
163+
enqueue(_QName, Correlation, Msg,
164164
#state{slow = WasSlow,
165165
pending = Pending,
166166
queue_status = go,
@@ -176,8 +176,9 @@ enqueue(QName, Correlation, Msg,
176176
next_seq = Seq + 1,
177177
next_enqueue_seq = EnqueueSeq + 1,
178178
slow = IsSlow},
179+
179180
if IsSlow andalso not WasSlow ->
180-
{ok, set_timer(QName, State), [{block, cluster_name(State)}]};
181+
{ok, State, [{block, cluster_name(State)}]};
181182
true ->
182183
{ok, State, []}
183184
end.
@@ -632,10 +633,10 @@ handle_ra_event(QName, Leader, {applied, Seqs},
632633
when ActualLeader =/= OldLeader ->
633634
%% there is a new leader
634635
?LOG_DEBUG("~ts: Detected QQ leader change (applied) "
635-
"from ~w to ~w, "
636-
"resending ~b pending commands",
637-
[?MODULE, OldLeader, ActualLeader,
638-
maps:size(State1#state.pending)]),
636+
"from ~w to ~w, "
637+
"resending ~b pending commands",
638+
[?MODULE, OldLeader, ActualLeader,
639+
maps:size(State1#state.pending)]),
639640
resend_all_pending(State1#state{leader = ActualLeader});
640641
_ ->
641642
State1
@@ -702,9 +703,9 @@ handle_ra_event(QName, Leader, {machine, leader_change},
702703
%% we need to update leader
703704
%% and resend any pending commands
704705
?LOG_DEBUG("~ts: ~s Detected QQ leader change from ~w to ~w, "
705-
"resending ~b pending commands",
706-
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
707-
Leader, maps:size(Pending)]),
706+
"resending ~b pending commands",
707+
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
708+
Leader, maps:size(Pending)]),
708709
State = resend_all_pending(State0#state{leader = Leader}),
709710
{ok, State, []};
710711
handle_ra_event(_QName, _From, {rejected, {not_leader, Leader, _Seq}},
@@ -714,21 +715,27 @@ handle_ra_event(QName, _From, {rejected, {not_leader, Leader, _Seq}},
714715
#state{leader = OldLeader,
715716
pending = Pending} = State0) ->
716717
?LOG_DEBUG("~ts: ~s Detected QQ leader change (rejection) from ~w to ~w, "
717-
"resending ~b pending commands",
718-
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
719-
Leader, maps:size(Pending)]),
718+
"resending ~b pending commands",
719+
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
720+
Leader, maps:size(Pending)]),
720721
State = resend_all_pending(State0#state{leader = Leader}),
721722
{ok, cancel_timer(State), []};
722723
handle_ra_event(_QName, _From,
723724
{rejected, {not_leader, _UndefinedMaybe, _Seq}}, State0) ->
724725
% TODO: how should these be handled? re-sent on timer or try random
725726
{ok, State0, []};
726-
handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
727+
handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers},
728+
leader = OldLeader,
729+
pending = Pending} = State0) ->
727730
case find_leader(Servers) of
728731
undefined ->
729732
%% still no leader, set the timer again
730733
{ok, set_timer(QName, State0), []};
731734
Leader ->
735+
?LOG_DEBUG("~ts: ~s Pending applied Timeout ~w to ~w, "
736+
"resending ~b pending commands",
737+
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
738+
Leader, maps:size(Pending)]),
732739
State = resend_all_pending(State0#state{leader = Leader}),
733740
{ok, State, []}
734741
end;
@@ -743,7 +750,7 @@ handle_ra_event(QName, Leader, close_cached_segments,
743750
case now_ms() > Last + ?CACHE_SEG_TIMEOUT of
744751
true ->
745752
?LOG_DEBUG("~ts: closing_cached_segments",
746-
[rabbit_misc:rs(QName)]),
753+
[rabbit_misc:rs(QName)]),
747754
%% its been long enough, evict all
748755
_ = ra_flru:evict_all(Cache),
749756
State#state{cached_segments = undefined};
@@ -804,12 +811,16 @@ seq_applied({Seq, Response},
804811
{Corrs, Actions0, #state{} = State0}) ->
805812
%% sequences aren't guaranteed to be applied in order as enqueues are
806813
%% low priority commands and may be overtaken by others with a normal priority.
814+
%%
815+
%% if the response is 'not_enqueued' we need to still keep the pending
816+
%% command for a later resend
807817
{Actions, State} = maybe_add_action(Response, Actions0, State0),
808818
case maps:take(Seq, State#state.pending) of
809-
{{undefined, _}, Pending} ->
819+
{{undefined, _}, Pending}
820+
when Response =/= not_enqueued ->
810821
{Corrs, Actions, State#state{pending = Pending}};
811822
{{Corr, _}, Pending}
812-
when Response /= not_enqueued ->
823+
when Response =/= not_enqueued ->
813824
{[Corr | Corrs], Actions, State#state{pending = Pending}};
814825
_ ->
815826
{Corrs, Actions, State}

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ groups() ->
121121
]},
122122
{clustered_with_partitions, [],
123123
[
124+
partitioned_publisher,
124125
reconnect_consumer_and_publish,
125126
reconnect_consumer_and_wait,
126127
reconnect_consumer_and_wait_channel_down
@@ -285,7 +286,8 @@ end_per_group(_, Config) ->
285286
rabbit_ct_helpers:run_steps(Config,
286287
rabbit_ct_broker_helpers:teardown_steps()).
287288

288-
init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
289+
init_per_testcase(Testcase, Config) when Testcase == partitioned_publisher;
290+
Testcase == reconnect_consumer_and_publish;
289291
Testcase == reconnect_consumer_and_wait;
290292
Testcase == reconnect_consumer_and_wait_channel_down ->
291293
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
@@ -377,7 +379,8 @@ merge_app_env(Config) ->
377379
{rabbit, [{core_metrics_gc_interval, 100}]}),
378380
{ra, [{min_wal_roll_over_interval, 30000}]}).
379381

380-
end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
382+
end_per_testcase(Testcase, Config) when Testcase == partitioned_publisher;
383+
Testcase == reconnect_consumer_and_publish;
381384
Testcase == reconnect_consumer_and_wait;
382385
Testcase == reconnect_consumer_and_wait_channel_down ->
383386
Config1 = rabbit_ct_helpers:run_steps(Config,
@@ -3011,6 +3014,51 @@ cleanup_data_dir(Config) ->
30113014
?awaitMatch(false, filelib:is_dir(DataDir2), 30000),
30123015
ok.
30133016

3017+
partitioned_publisher(Config) ->
3018+
[Node0, Node1, Node2] = Nodes =
3019+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
3020+
3021+
Ch0 = rabbit_ct_client_helpers:open_channel(Config, Node0),
3022+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Node1),
3023+
QQ = ?config(queue_name, Config),
3024+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
3025+
declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
3026+
RaName = ra_name(QQ),
3027+
{ok, _, {_, Node1}} = ra:members({RaName, Node1}),
3028+
3029+
#'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}),
3030+
#'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
3031+
%% first publish with confirm
3032+
publish_confirm(Ch0, QQ),
3033+
3034+
%% then partition
3035+
rabbit_ct_broker_helpers:block_traffic_between(Node0, Node1),
3036+
rabbit_ct_broker_helpers:block_traffic_between(Node0, Node2),
3037+
3038+
%% check that we can still publish from another channel that is on the
3039+
%% majority side
3040+
publish_confirm(Ch1, QQ),
3041+
3042+
%% publish one from partitioned node that will not go through
3043+
publish(Ch0, QQ),
3044+
3045+
%% wait for disconnections
3046+
rabbit_ct_helpers:await_condition(
3047+
fun() ->
3048+
ConnectedNodes = erpc:call(Node0, erlang, nodes, []),
3049+
not lists:member(Node1, ConnectedNodes)
3050+
end, 30000),
3051+
3052+
flush(10),
3053+
3054+
%% then heal the partition
3055+
rabbit_ct_broker_helpers:allow_traffic_between(Node0, Node1),
3056+
rabbit_ct_broker_helpers:allow_traffic_between(Node0, Node2),
3057+
3058+
publish(Ch0, QQ),
3059+
wait_for_messages_ready(Nodes, RaName, 4),
3060+
ok.
3061+
30143062
reconnect_consumer_and_publish(Config) ->
30153063
[Server | _] = Servers =
30163064
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

deps/rabbit/test/rabbit_fifo_int_SUITE.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,9 @@ lost_return_is_resent_on_applied_after_leader_change(Config) ->
219219
RaEvt, F5),
220220
%% this should resend the never applied enqueue
221221
{_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6),
222-
?assertEqual(0, rabbit_fifo_client:pending_size(F7)),
222+
{_, _, F8} = process_ra_events(receive_ra_events(1, 0), ClusterName, F7),
223+
224+
?assertEqual(0, rabbit_fifo_client:pending_size(F8)),
223225

224226
flush(),
225227
ok.

0 commit comments

Comments
 (0)