Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ apply(Meta, #modify{consumer_key = ConsumerKey,
_ ->
{State, ok}
end;
apply(#{index := Idx} = Meta,
apply(#{index := Idx,
machine_version := MacVer} = Meta,
#requeue{consumer_key = ConsumerKey,
msg_id = MsgId,
index = OldIdx,
Expand All @@ -344,7 +345,13 @@ apply(#{index := Idx} = Meta,
Messages),
enqueue_count = EnqCount + 1},
State2 = update_or_remove_con(Meta, ConsumerKey, Con, State1),
checkout(Meta, State0, State2, []);
{State3, Effects} = case MacVer >= 7 of
true ->
activate_next_consumer({State2, []});
false ->
{State2, []}
end,
checkout(Meta, State0, State3, Effects);
_ ->
{State00, ok, []}
end;
Expand Down Expand Up @@ -923,15 +930,16 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) ->
end.

-spec version() -> pos_integer().
version() -> 6.
version() -> 7.

which_module(0) -> rabbit_fifo_v0;
which_module(1) -> rabbit_fifo_v1;
which_module(2) -> rabbit_fifo_v3;
which_module(3) -> rabbit_fifo_v3;
which_module(4) -> ?MODULE;
which_module(5) -> ?MODULE;
which_module(6) -> ?MODULE.
which_module(6) -> ?MODULE;
which_module(7) -> ?MODULE.

-define(AUX, aux_v3).

Expand Down Expand Up @@ -1747,8 +1755,8 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
{duplicate, State0, Effects0}
end.

return(#{} = Meta, ConsumerKey, MsgIds, IncrDelCount, Anns,
Checked, Effects0, State0)
return(#{machine_version := MacVer} = Meta, ConsumerKey,
MsgIds, IncrDelCount, Anns, Checked, Effects0, State0)
when is_map(Anns) ->
%% We requeue in the same order as messages got returned by the client.
{State1, Effects1} =
Expand All @@ -1768,7 +1776,13 @@ return(#{} = Meta, ConsumerKey, MsgIds, IncrDelCount, Anns,
_ ->
State1
end,
checkout(Meta, State0, State2, Effects1).
{State3, Effects2} = case MacVer >= 7 of
true ->
activate_next_consumer({State2, Effects1});
false ->
{State2, Effects1}
end,
checkout(Meta, State0, State3, Effects2).

% used to process messages that are finished
complete(Meta, ConsumerKey, [MsgId],
Expand Down Expand Up @@ -2798,7 +2812,10 @@ convert(Meta, 4, To, State) ->
convert(Meta, 5, To, State);
convert(Meta, 5, To, State) ->
%% no conversion needed, this version only includes a logic change
convert(Meta, 6, To, State).
convert(Meta, 6, To, State);
convert(Meta, 6, To, State) ->
%% no conversion needed, this version only includes a logic change
convert(Meta, 7, To, State).

smallest_raft_index(#?STATE{messages = Messages,
ra_indexes = Indexes,
Expand Down
68 changes: 68 additions & 0 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ groups() ->
format,
add_member_2,
single_active_consumer_priority_take_over,
single_active_consumer_priority_take_over_return,
single_active_consumer_priority_take_over_requeue,
single_active_consumer_priority,
force_shrink_member_to_current_member,
force_all_queues_shrink_member_to_current_member,
Expand Down Expand Up @@ -1139,6 +1141,72 @@ single_active_consumer_priority_take_over(Config) ->
?DEFAULT_AWAIT),
ok.

single_active_consumer_priority_take_over_return(Config) ->
single_active_consumer_priority_take_over_base(20, Config).

single_active_consumer_priority_take_over_requeue(Config) ->
single_active_consumer_priority_take_over_base(-1, Config).

single_active_consumer_priority_take_over_base(DelLimit, Config) ->
check_quorum_queues_v4_compat(Config),

[Server0, Server1, _Server2] = Nodes =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

MinMacVers = lists:min([V || {ok, V} <-
erpc:multicall(Nodes, rabbit_fifo, version, [])]),
if MinMacVers < 7 ->
throw({skip, "single_active_consumer_priority_take_over_base needs a higher machine verison"});
true ->
ok
end,

Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
QName = ?config(queue_name, Config),
Q1 = <<QName/binary, "_1">>,
RaNameQ1 = binary_to_atom(<<"%2F", "_", Q1/binary>>, utf8),
QueryFun = fun rabbit_fifo:query_single_active_consumer/1,
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-delivery-limit">>, long, DelLimit},
{<<"x-single-active-consumer">>, bool, true}],
?assertEqual({'queue.declare_ok', Q1, 0, 0}, declare(Ch1, Q1, Args)),
ok = subscribe(Ch1, Q1, false, <<"ch1-ctag1">>, [{"x-priority", byte, 1}]),
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])),
#'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}),
publish_confirm(Ch2, Q1),
%% higher priority consumer attaches
ok = subscribe(Ch2, Q1, false, <<"ch2-ctag1">>, [{"x-priority", byte, 3}]),

%% Q1 should still have Ch1 as consumer as it has pending messages
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
rpc:call(Server0, ra, local_query,
[RaNameQ1, QueryFun])),

%% ack the message
receive
{#'basic.deliver'{consumer_tag = <<"ch1-ctag1">>,
delivery_tag = DeliveryTag}, _} ->
amqp_channel:cast(Ch1, #'basic.nack'{delivery_tag = DeliveryTag})
after ?TIMEOUT ->
flush(1),
exit(basic_deliver_timeout)
end,

?awaitMatch({ok, {_, {value, {<<"ch2-ctag1">>, _}}}, _},
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun]),
?DEFAULT_AWAIT),
receive
{#'basic.deliver'{consumer_tag = <<"ch2-ctag1">>,
delivery_tag = DeliveryTag2}, _} ->
amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag2})
after ?TIMEOUT ->
flush(1),
exit(basic_deliver_timeout_2)
end,
ok.

single_active_consumer_priority(Config) ->
check_quorum_queues_v4_compat(Config),
[Server0, Server1, Server2] =
Expand Down
84 changes: 80 additions & 4 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ groups() ->
].

init_per_group(tests, Config) ->
[{machine_version, 5} | Config];
[{machine_version, rabbit_fifo:version()} | Config];
init_per_group(machine_version_conversion, Config) ->
Config.

init_per_testcase(_Testcase, Config) ->
FF = ?config(machine_version, Config) == 5,
FF = ?config(machine_version, Config) == rabbit_fifo:version(),
ok = meck:new(rabbit_feature_flags, [passthrough]),
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> FF end),
Config.
Expand Down Expand Up @@ -1932,6 +1932,83 @@ single_active_consumer_higher_waiting_disconnected_test(Config) ->

ok.

single_active_consumer_higher_waiting_return_test(Config) ->
S0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue, ?FUNCTION_NAME_B),
single_active_consumer_on => true}),

Pid1 = test_util:fake_pid(node()),
C1Pid = test_util:fake_pid(n1@banana),
C2Pid = test_util:fake_pid(n2@banana),
% % adding some consumers
{CK1, C1} = {?LINE, {?LINE_B, C1Pid}},
{CK2, C2} = {?LINE, {?LINE_B, C2Pid}},
Entries =
[
%% add a consumer
{CK1, make_checkout(C1, {auto, {simple_prefetch, 1}}, #{priority => 1})},
?ASSERT(#rabbit_fifo{consumers = #{CK1 := #consumer{status = up}},
waiting_consumers = []}),

%% enqueue a message
{?LINE , rabbit_fifo:make_enqueue(Pid1, 1, msg1)},

%% add a consumer with a higher priority, current is quiescing
{CK2, make_checkout(C2, {auto, {simple_prefetch, 1}}, #{priority => 2})},
?ASSERT(#rabbit_fifo{consumers = #{CK1 := #consumer{status = quiescing}},
waiting_consumers = [{CK2, _}]}),
%% C1 returns message
{?LINE, rabbit_fifo:make_return(CK1, [0])},
%% C2 should activated
?ASSERT(#rabbit_fifo{consumers = #{CK2 := #consumer{status = up,
checked_out = Ch,
credit = 0}},
waiting_consumers = [_]} when map_size(Ch) == 1)
],
{_S1, _} = run_log(Config, S0, Entries, fun single_active_invariant/1),

ok.

single_active_consumer_higher_waiting_requeue_test(Config) ->
S0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue, ?FUNCTION_NAME_B),
single_active_consumer_on => true}),

Pid1 = test_util:fake_pid(node()),
C1Pid = test_util:fake_pid(n1@banana),
C2Pid = test_util:fake_pid(n2@banana),
% % adding some consumers
{CK1, C1} = {?LINE, {?LINE_B, C1Pid}},
EnqIdx = ?LINE,
RequeueIdx = ?LINE,
{CK2, C2} = {?LINE, {?LINE_B, C2Pid}},
Entries =
[
%% add a consumer
{CK1, make_checkout(C1, {auto, {simple_prefetch, 1}}, #{priority => 1})},
?ASSERT(#rabbit_fifo{consumers = #{CK1 := #consumer{status = up}},
waiting_consumers = []}),

%% enqueue a message
{EnqIdx , rabbit_fifo:make_enqueue(Pid1, 1, msg1)},

%% add a consumer with a higher priority, current is quiescing
{CK2, make_checkout(C2, {auto, {simple_prefetch, 1}}, #{priority => 2})},
?ASSERT(#rabbit_fifo{consumers = #{CK1 := #consumer{status = quiescing}},
waiting_consumers = [{CK2, _}]}),
%% C1 returns message
% {?LINE, rabbit_fifo:make_requeue(CK1, [0])},
{RequeueIdx , element(2, hd(rabbit_fifo:make_requeue(CK1, {notify, 1, self()},
[{0, EnqIdx, 0, msg1}], [])))},
%% C2 should activated
?ASSERT(#rabbit_fifo{consumers = #{CK2 := #consumer{status = up,
checked_out = Ch,
credit = 0}},
waiting_consumers = [_]} when map_size(Ch) == 1)
],
{_S1, _} = run_log(Config, S0, Entries, fun single_active_invariant/1),

ok.
single_active_consumer_quiescing_disconnected_test(Config) ->
S0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue, ?FUNCTION_NAME_B),
Expand Down Expand Up @@ -2455,8 +2532,7 @@ machine_version_test(C) ->
consumers = #{Cid := #consumer{cfg = #consumer_cfg{priority = 0}}},
service_queue = S,
messages = Msgs}, ok,
[_|_]} = apply(meta(C, Idx),
{machine_version, 0, 2}, S1),
[_|_]} = apply(meta(C, Idx), {machine_version, 0, 2}, S1),
%% validate message conversion to lqueue
?assertEqual(1, lqueue:len(Msgs)),
?assert(priority_queue:is_queue(S)),
Expand Down
Loading