Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,14 @@ apply(#{machine_version := Vsn} = Meta,
E#enqueuer{status = up};
(_, E) -> E
end, Enqs0),
%% send leader change events to all disconnected enqueuers to prompt them
%% to resend any messages stuck during disconnection,
%% ofc it may not be a leader change per se
Effects0 = maps:fold(fun(P, _E, Acc) when node(P) =:= Node ->
[{send_msg, P, leader_change, ra_event} | Acc];
(_, _E, Acc) -> Acc
end, Monitors, Enqs0),

ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
%% mark all consumers as up
{State1, Effects1} =
Expand All @@ -643,7 +651,7 @@ apply(#{machine_version := Vsn} = Meta,
SAcc), EAcc1};
(_, _, Acc) ->
Acc
end, {State0, Monitors}, Cons0, Vsn),
end, {State0, Effects0}, Cons0, Vsn),
Waiting = update_waiting_consumer_status(Node, State1, up),
State2 = State1#?STATE{enqueuers = Enqs1,
waiting_consumers = Waiting},
Expand Down
43 changes: 27 additions & 16 deletions deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ enqueue(_QName, _Correlation, _Msg,
#state{queue_status = reject_publish,
cfg = #cfg{}} = State) ->
{reject_publish, State};
enqueue(QName, Correlation, Msg,
enqueue(_QName, Correlation, Msg,
#state{slow = WasSlow,
pending = Pending,
queue_status = go,
Expand All @@ -176,8 +176,9 @@ enqueue(QName, Correlation, Msg,
next_seq = Seq + 1,
next_enqueue_seq = EnqueueSeq + 1,
slow = IsSlow},

if IsSlow andalso not WasSlow ->
{ok, set_timer(QName, State), [{block, cluster_name(State)}]};
{ok, State, [{block, cluster_name(State)}]};
true ->
{ok, State, []}
end.
Expand Down Expand Up @@ -632,10 +633,10 @@ handle_ra_event(QName, Leader, {applied, Seqs},
when ActualLeader =/= OldLeader ->
%% there is a new leader
?LOG_DEBUG("~ts: Detected QQ leader change (applied) "
"from ~w to ~w, "
"resending ~b pending commands",
[?MODULE, OldLeader, ActualLeader,
maps:size(State1#state.pending)]),
"from ~w to ~w, "
"resending ~b pending commands",
[?MODULE, OldLeader, ActualLeader,
maps:size(State1#state.pending)]),
resend_all_pending(State1#state{leader = ActualLeader});
_ ->
State1
Expand Down Expand Up @@ -702,9 +703,9 @@ handle_ra_event(QName, Leader, {machine, leader_change},
%% we need to update leader
%% and resend any pending commands
?LOG_DEBUG("~ts: ~s Detected QQ leader change from ~w to ~w, "
"resending ~b pending commands",
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
Leader, maps:size(Pending)]),
"resending ~b pending commands",
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
Leader, maps:size(Pending)]),
State = resend_all_pending(State0#state{leader = Leader}),
{ok, State, []};
handle_ra_event(_QName, _From, {rejected, {not_leader, Leader, _Seq}},
Expand All @@ -714,21 +715,27 @@ handle_ra_event(QName, _From, {rejected, {not_leader, Leader, _Seq}},
#state{leader = OldLeader,
pending = Pending} = State0) ->
?LOG_DEBUG("~ts: ~s Detected QQ leader change (rejection) from ~w to ~w, "
"resending ~b pending commands",
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
Leader, maps:size(Pending)]),
"resending ~b pending commands",
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
Leader, maps:size(Pending)]),
State = resend_all_pending(State0#state{leader = Leader}),
{ok, cancel_timer(State), []};
handle_ra_event(_QName, _From,
{rejected, {not_leader, _UndefinedMaybe, _Seq}}, State0) ->
% TODO: how should these be handled? re-sent on timer or try random
{ok, State0, []};
handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers},
leader = OldLeader,
pending = Pending} = State0) ->
case find_leader(Servers) of
undefined ->
%% still no leader, set the timer again
{ok, set_timer(QName, State0), []};
Leader ->
?LOG_DEBUG("~ts: ~s Pending applied Timeout ~w to ~w, "
"resending ~b pending commands",
[rabbit_misc:rs(QName), ?MODULE, OldLeader,
Leader, maps:size(Pending)]),
State = resend_all_pending(State0#state{leader = Leader}),
{ok, State, []}
end;
Expand All @@ -743,7 +750,7 @@ handle_ra_event(QName, Leader, close_cached_segments,
case now_ms() > Last + ?CACHE_SEG_TIMEOUT of
true ->
?LOG_DEBUG("~ts: closing_cached_segments",
[rabbit_misc:rs(QName)]),
[rabbit_misc:rs(QName)]),
%% its been long enough, evict all
_ = ra_flru:evict_all(Cache),
State#state{cached_segments = undefined};
Expand Down Expand Up @@ -804,12 +811,16 @@ seq_applied({Seq, Response},
{Corrs, Actions0, #state{} = State0}) ->
%% sequences aren't guaranteed to be applied in order as enqueues are
%% low priority commands and may be overtaken by others with a normal priority.
%%
%% if the response is 'not_enqueued' we need to still keep the pending
%% command for a later resend
{Actions, State} = maybe_add_action(Response, Actions0, State0),
case maps:take(Seq, State#state.pending) of
{{undefined, _}, Pending} ->
{{undefined, _}, Pending}
when Response =/= not_enqueued ->
{Corrs, Actions, State#state{pending = Pending}};
{{Corr, _}, Pending}
when Response /= not_enqueued ->
when Response =/= not_enqueued ->
{[Corr | Corrs], Actions, State#state{pending = Pending}};
_ ->
{Corrs, Actions, State}
Expand Down
52 changes: 50 additions & 2 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ groups() ->
]},
{clustered_with_partitions, [],
[
partitioned_publisher,
reconnect_consumer_and_publish,
reconnect_consumer_and_wait,
reconnect_consumer_and_wait_channel_down
Expand Down Expand Up @@ -285,7 +286,8 @@ end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
init_per_testcase(Testcase, Config) when Testcase == partitioned_publisher;
Testcase == reconnect_consumer_and_publish;
Testcase == reconnect_consumer_and_wait;
Testcase == reconnect_consumer_and_wait_channel_down ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
Expand Down Expand Up @@ -383,7 +385,8 @@ merge_app_env(Config) ->
{rabbit, [{core_metrics_gc_interval, 100}]}),
{ra, [{min_wal_roll_over_interval, 30000}]}).

end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
end_per_testcase(Testcase, Config) when Testcase == partitioned_publisher;
Testcase == reconnect_consumer_and_publish;
Testcase == reconnect_consumer_and_wait;
Testcase == reconnect_consumer_and_wait_channel_down ->
Config1 = rabbit_ct_helpers:run_steps(Config,
Expand Down Expand Up @@ -3026,6 +3029,51 @@ cleanup_data_dir(Config) ->
?awaitMatch(false, filelib:is_dir(DataDir2), 30000),
ok.

partitioned_publisher(Config) ->
[Node0, Node1, Node2] = Nodes =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch0 = rabbit_ct_client_helpers:open_channel(Config, Node0),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Node1),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
RaName = ra_name(QQ),
{ok, _, {_, Node1}} = ra:members({RaName, Node1}),

#'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}),
#'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
%% first publish with confirm
publish_confirm(Ch0, QQ),

%% then partition
rabbit_ct_broker_helpers:block_traffic_between(Node0, Node1),
rabbit_ct_broker_helpers:block_traffic_between(Node0, Node2),

%% check that we can still publish from another channel that is on the
%% majority side
publish_confirm(Ch1, QQ),

%% publish one from partitioned node that will not go through
publish(Ch0, QQ),

%% wait for disconnections
rabbit_ct_helpers:await_condition(
fun() ->
ConnectedNodes = erpc:call(Node0, erlang, nodes, []),
not lists:member(Node1, ConnectedNodes)
end, 30000),

flush(10),

%% then heal the partition
rabbit_ct_broker_helpers:allow_traffic_between(Node0, Node1),
rabbit_ct_broker_helpers:allow_traffic_between(Node0, Node2),

publish(Ch0, QQ),
wait_for_messages_ready(Nodes, RaName, 4),
ok.

reconnect_consumer_and_publish(Config) ->
[Server | _] = Servers =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/test/rabbit_fifo_int_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ lost_return_is_resent_on_applied_after_leader_change(Config) ->
RaEvt, F5),
%% this should resend the never applied enqueue
{_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6),
?assertEqual(0, rabbit_fifo_client:pending_size(F7)),
{_, _, F8} = process_ra_events(receive_ra_events(1, 0), ClusterName, F7),

?assertEqual(0, rabbit_fifo_client:pending_size(F8)),

flush(),
ok.
Expand Down
Loading