Skip to content

Commit db98d6e

Browse files
kjnilssonmergify[bot]
authored andcommitted
QQ: fix resend issues after network partition.
A queue client that send a message during a network partition that later caused a distribution disconnection would in some cases never resend the lost message, even if kept in the pending buffer. Subsequent sends would be accepted by the state machine but would never be enqueued as there would be a missing sequence. In the case of publishers that use pre-settled sends the pending messages would have also been incorrectly removed from the pending map. To fix we removed timer resend aapproach and instead have the leader send leader_change messages on node up to prompt any queue clients to resend their pending buffer. (cherry picked from commit b66dc40)
1 parent d6755e4 commit db98d6e

File tree

4 files changed

+89
-20
lines changed

4 files changed

+89
-20
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,14 @@ apply(#{machine_version := Vsn} = Meta,
629629
E#enqueuer{status = up};
630630
(_, E) -> E
631631
end, Enqs0),
632+
%% send leader change events to all disconnected enqueuers to prompt them
633+
%% to resend any messages stuck during disconnection,
634+
%% ofc it may not be a leader change per se
635+
Effects0 = maps:fold(fun(P, _E, Acc) when node(P) =:= Node ->
636+
[{send_msg, P, leader_change, ra_event} | Acc];
637+
(_, _E, Acc) -> Acc
638+
end, Monitors, Enqs0),
639+
632640
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
633641
%% mark all consumers as up
634642
{State1, Effects1} =
@@ -643,7 +651,7 @@ apply(#{machine_version := Vsn} = Meta,
643651
SAcc), EAcc1};
644652
(_, _, Acc) ->
645653
Acc
646-
end, {State0, Monitors}, Cons0, Vsn),
654+
end, {State0, Effects0}, Cons0, Vsn),
647655
Waiting = update_waiting_consumer_status(Node, State1, up),
648656
State2 = State1#?STATE{enqueuers = Enqs1,
649657
waiting_consumers = Waiting},

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ enqueue(_QName, _Correlation, _Msg,
160160
#state{queue_status = reject_publish,
161161
cfg = #cfg{}} = State) ->
162162
{reject_publish, State};
163-
enqueue(QName, Correlation, Msg,
163+
enqueue(_QName, Correlation, Msg,
164164
#state{slow = WasSlow,
165165
pending = Pending,
166166
queue_status = go,
@@ -176,8 +176,9 @@ enqueue(QName, Correlation, Msg,
176176
next_seq = Seq + 1,
177177
next_enqueue_seq = EnqueueSeq + 1,
178178
slow = IsSlow},
179+
179180
if IsSlow andalso not WasSlow ->
180-
{ok, set_timer(QName, State), [{block, cluster_name(State)}]};
181+
{ok, State, [{block, cluster_name(State)}]};
181182
true ->
182183
{ok, State, []}
183184
end.
@@ -632,10 +633,10 @@ handle_ra_event(QName, Leader, {applied, Seqs},
632633
when ActualLeader =/= OldLeader ->
633634
%% there is a new leader
634635
?LOG_DEBUG("~ts: Detected QQ leader change (applied) "
635-
"from ~w to ~w, "
636-
"resending ~b pending commands",
637-
[?MODULE, OldLeader, ActualLeader,
638-
maps:size(State1#state.pending)]),
636+
"from ~w to ~w, "
637+
"resending ~b pending commands",
638+
[?MODULE, OldLeader, ActualLeader,
639+
maps:size(State1#state.pending)]),
639640
resend_all_pending(State1#state{leader = ActualLeader});
640641
_ ->
641642
State1
@@ -702,9 +703,9 @@ handle_ra_event(QName, Leader, {machine, leader_change},
702703
%% we need to update leader
703704
%% and resend any pending commands
704705
?LOG_DEBUG("~ts: ~s Detected QQ leader change from ~w to ~w, "
705-
"resending ~b pending commands",
706-
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
707-
Leader, maps:size(Pending)]),
706+
"resending ~b pending commands",
707+
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
708+
Leader, maps:size(Pending)]),
708709
State = resend_all_pending(State0#state{leader = Leader}),
709710
{ok, State, []};
710711
handle_ra_event(_QName, _From, {rejected, {not_leader, Leader, _Seq}},
@@ -714,21 +715,27 @@ handle_ra_event(QName, _From, {rejected, {not_leader, Leader, _Seq}},
714715
#state{leader = OldLeader,
715716
pending = Pending} = State0) ->
716717
?LOG_DEBUG("~ts: ~s Detected QQ leader change (rejection) from ~w to ~w, "
717-
"resending ~b pending commands",
718-
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
719-
Leader, maps:size(Pending)]),
718+
"resending ~b pending commands",
719+
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
720+
Leader, maps:size(Pending)]),
720721
State = resend_all_pending(State0#state{leader = Leader}),
721722
{ok, cancel_timer(State), []};
722723
handle_ra_event(_QName, _From,
723724
{rejected, {not_leader, _UndefinedMaybe, _Seq}}, State0) ->
724725
% TODO: how should these be handled? re-sent on timer or try random
725726
{ok, State0, []};
726-
handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
727+
handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers},
728+
leader = OldLeader,
729+
pending = Pending} = State0) ->
727730
case find_leader(Servers) of
728731
undefined ->
729732
%% still no leader, set the timer again
730733
{ok, set_timer(QName, State0), []};
731734
Leader ->
735+
?LOG_DEBUG("~ts: ~s Pending applied Timeout ~w to ~w, "
736+
"resending ~b pending commands",
737+
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
738+
Leader, maps:size(Pending)]),
732739
State = resend_all_pending(State0#state{leader = Leader}),
733740
{ok, State, []}
734741
end;
@@ -743,7 +750,7 @@ handle_ra_event(QName, Leader, close_cached_segments,
743750
case now_ms() > Last + ?CACHE_SEG_TIMEOUT of
744751
true ->
745752
?LOG_DEBUG("~ts: closing_cached_segments",
746-
[rabbit_misc:rs(QName)]),
753+
[rabbit_misc:rs(QName)]),
747754
%% its been long enough, evict all
748755
_ = ra_flru:evict_all(Cache),
749756
State#state{cached_segments = undefined};
@@ -804,12 +811,16 @@ seq_applied({Seq, Response},
804811
{Corrs, Actions0, #state{} = State0}) ->
805812
%% sequences aren't guaranteed to be applied in order as enqueues are
806813
%% low priority commands and may be overtaken by others with a normal priority.
814+
%%
815+
%% if the response is 'not_enqueued' we need to still keep the pending
816+
%% command for a later resend
807817
{Actions, State} = maybe_add_action(Response, Actions0, State0),
808818
case maps:take(Seq, State#state.pending) of
809-
{{undefined, _}, Pending} ->
819+
{{undefined, _}, Pending}
820+
when Response =/= not_enqueued ->
810821
{Corrs, Actions, State#state{pending = Pending}};
811822
{{Corr, _}, Pending}
812-
when Response /= not_enqueued ->
823+
when Response =/= not_enqueued ->
813824
{[Corr | Corrs], Actions, State#state{pending = Pending}};
814825
_ ->
815826
{Corrs, Actions, State}

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ groups() ->
121121
]},
122122
{clustered_with_partitions, [],
123123
[
124+
partitioned_publisher,
124125
reconnect_consumer_and_publish,
125126
reconnect_consumer_and_wait,
126127
reconnect_consumer_and_wait_channel_down
@@ -285,7 +286,8 @@ end_per_group(_, Config) ->
285286
rabbit_ct_helpers:run_steps(Config,
286287
rabbit_ct_broker_helpers:teardown_steps()).
287288

288-
init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
289+
init_per_testcase(Testcase, Config) when Testcase == partitioned_publisher;
290+
Testcase == reconnect_consumer_and_publish;
289291
Testcase == reconnect_consumer_and_wait;
290292
Testcase == reconnect_consumer_and_wait_channel_down ->
291293
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
@@ -383,7 +385,8 @@ merge_app_env(Config) ->
383385
{rabbit, [{core_metrics_gc_interval, 100}]}),
384386
{ra, [{min_wal_roll_over_interval, 30000}]}).
385387

386-
end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
388+
end_per_testcase(Testcase, Config) when Testcase == partitioned_publisher;
389+
Testcase == reconnect_consumer_and_publish;
387390
Testcase == reconnect_consumer_and_wait;
388391
Testcase == reconnect_consumer_and_wait_channel_down ->
389392
Config1 = rabbit_ct_helpers:run_steps(Config,
@@ -3026,6 +3029,51 @@ cleanup_data_dir(Config) ->
30263029
?awaitMatch(false, filelib:is_dir(DataDir2), 30000),
30273030
ok.
30283031

3032+
partitioned_publisher(Config) ->
3033+
[Node0, Node1, Node2] = Nodes =
3034+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
3035+
3036+
Ch0 = rabbit_ct_client_helpers:open_channel(Config, Node0),
3037+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Node1),
3038+
QQ = ?config(queue_name, Config),
3039+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
3040+
declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
3041+
RaName = ra_name(QQ),
3042+
{ok, _, {_, Node1}} = ra:members({RaName, Node1}),
3043+
3044+
#'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}),
3045+
#'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
3046+
%% first publish with confirm
3047+
publish_confirm(Ch0, QQ),
3048+
3049+
%% then partition
3050+
rabbit_ct_broker_helpers:block_traffic_between(Node0, Node1),
3051+
rabbit_ct_broker_helpers:block_traffic_between(Node0, Node2),
3052+
3053+
%% check that we can still publish from another channel that is on the
3054+
%% majority side
3055+
publish_confirm(Ch1, QQ),
3056+
3057+
%% publish one from partitioned node that will not go through
3058+
publish(Ch0, QQ),
3059+
3060+
%% wait for disconnections
3061+
rabbit_ct_helpers:await_condition(
3062+
fun() ->
3063+
ConnectedNodes = erpc:call(Node0, erlang, nodes, []),
3064+
not lists:member(Node1, ConnectedNodes)
3065+
end, 30000),
3066+
3067+
flush(10),
3068+
3069+
%% then heal the partition
3070+
rabbit_ct_broker_helpers:allow_traffic_between(Node0, Node1),
3071+
rabbit_ct_broker_helpers:allow_traffic_between(Node0, Node2),
3072+
3073+
publish(Ch0, QQ),
3074+
wait_for_messages_ready(Nodes, RaName, 4),
3075+
ok.
3076+
30293077
reconnect_consumer_and_publish(Config) ->
30303078
[Server | _] = Servers =
30313079
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

deps/rabbit/test/rabbit_fifo_int_SUITE.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,9 @@ lost_return_is_resent_on_applied_after_leader_change(Config) ->
219219
RaEvt, F5),
220220
%% this should resend the never applied enqueue
221221
{_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6),
222-
?assertEqual(0, rabbit_fifo_client:pending_size(F7)),
222+
{_, _, F8} = process_ra_events(receive_ra_events(1, 0), ClusterName, F7),
223+
224+
?assertEqual(0, rabbit_fifo_client:pending_size(F8)),
223225

224226
flush(),
225227
ok.

0 commit comments

Comments
 (0)