Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,8 @@ overview(#?STATE{consumers = Cons,
Conf = #{name => Cfg#cfg.name,
resource => Cfg#cfg.resource,
dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
dead_letter_handler => Cfg#cfg.dead_letter_handler,
overflow_strategy => Cfg#cfg.overflow_strategy,
max_length => Cfg#cfg.max_length,
max_bytes => Cfg#cfg.max_bytes,
consumer_strategy => Cfg#cfg.consumer_strategy,
Expand Down
53 changes: 42 additions & 11 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,8 @@ declare_queue_error(Error, Queue, Leader, ActingUser) ->
ra_machine(Q) ->
{module, rabbit_fifo, ra_machine_config(Q)}.

ra_machine_config(Q) when ?is_amqqueue(Q) ->
gather_policy_config(Q, IsQueueDeclaration) ->
QName = amqqueue:get_name(Q),
{Name, _} = amqqueue:get_pid(Q),
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
OverflowBin = args_policy_lookup(<<"overflow">>, fun policy_has_precedence/2, Q),
Expand All @@ -327,28 +326,42 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>,
fun resolve_delivery_limit/2, Q) of
undefined ->
rabbit_log:info("~ts: delivery_limit not set, defaulting to ~b",
[rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]),
case IsQueueDeclaration of
true ->
rabbit_log:info(
"~ts: delivery_limit not set, defaulting to ~b",
[rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]);
false ->
ok
end,
?DEFAULT_DELIVERY_LIMIT;
DL ->
DL
end,
Expires = args_policy_lookup(<<"expires">>, fun min/2, Q),
MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dead_letter_handler(Q, Overflow),
become_leader_handler => {?MODULE, become_leader, [QName]},
DeadLetterHandler = dead_letter_handler(Q, Overflow),
#{dead_letter_handler => DeadLetterHandler,
max_length => MaxLength,
max_bytes => MaxBytes,
single_active_consumer_on => single_active_consumer_on(Q),
delivery_limit => DeliveryLimit,
overflow_strategy => Overflow,
created => erlang:system_time(millisecond),
expires => Expires,
msg_ttl => MsgTTL
}.

ra_machine_config(Q) when ?is_amqqueue(Q) ->
PolicyConfig = gather_policy_config(Q, true),
QName = amqqueue:get_name(Q),
{Name, _} = amqqueue:get_pid(Q),
PolicyConfig#{
name => Name,
queue_resource => QName,
become_leader_handler => {?MODULE, become_leader, [QName]},
single_active_consumer_on => single_active_consumer_on(Q),
created => erlang:system_time(millisecond)
}.

resolve_delivery_limit(PolVal, ArgVal)
when PolVal < 0 orelse ArgVal < 0 ->
max(PolVal, ArgVal);
Expand Down Expand Up @@ -624,7 +637,9 @@ handle_tick(QName,
ok;
_ ->
ok
end
end,
maybe_apply_policies(Q, Overview),
ok
catch
_:Err ->
rabbit_log:debug("~ts: handle tick failed with ~p",
Expand Down Expand Up @@ -708,6 +723,21 @@ system_recover(quorum_queues) ->
ok
end.

maybe_apply_policies(Q, #{config := CurrentConfig}) ->
NewPolicyConfig = gather_policy_config(Q, false),

RelevantKeys = maps:keys(NewPolicyConfig),
CurrentPolicyConfig = maps:with(RelevantKeys, CurrentConfig),

ShouldUpdate = NewPolicyConfig =/= CurrentPolicyConfig,
case ShouldUpdate of
true ->
rabbit_log:debug("Re-applying policies to ~ts", [rabbit_misc:rs(amqqueue:get_name(Q))]),
policy_changed(Q),
ok;
false -> ok
end.

-spec recover(binary(), [amqqueue:amqqueue()]) ->
{[amqqueue:amqqueue()], [amqqueue:amqqueue()]}.
recover(_Vhost, Queues) ->
Expand Down Expand Up @@ -2064,3 +2094,4 @@ file_handle_other_reservation() ->

file_handle_release_reservation() ->
ok.

181 changes: 180 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ groups() ->
single_active_consumer_priority,
force_shrink_member_to_current_member,
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member
force_vhost_queues_shrink_member_to_current_member,
policy_repair
]
++ all_tests()},
{cluster_size_5, [], [start_queue,
Expand Down Expand Up @@ -1300,6 +1301,159 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
?assertEqual(3, length(Nodes0))
end || Q <- QQs, VHost <- VHosts].

% Tests that, if the process of a QQ is dead in the moment of declaring a policy
% that affects such queue, when the process is made available again, the policy
% will eventually get applied. (https://github.com/rabbitmq/rabbitmq-server/issues/7863)
policy_repair(Config) ->
[Server0, _Server1, _Server2] = Servers =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),

QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
RaName = ra_name(QQ),
ExpectedMaxLength1 = 10,
Priority1 = 1,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_1">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength1}, {<<"overflow">>, <<"reject-publish">>}],
Priority1,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Wait for the policy to apply
QueryFun = fun rabbit_fifo:overview/1,
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength1}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength1 + some messages but after consuming all messages only
% MaxLength1 are retrieved.
% Checking twice to ensure consistency
publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 1),
% +1 because QQs let one pass
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength1 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ),

% Set higher priority policy, allowing more messages
ExpectedMaxLength2 = 20,
Priority2 = 2,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_2">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength2}, {<<"overflow">>, <<"reject-publish">>}],
Priority2,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Wait for the policy to apply
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength2}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength2 + some messages but after consuming all messages only
% MaxLength2 are retrieved.
% Checking twice to ensure consistency.
% + 1 because QQs let one pass
publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 1),
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength2 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ),

% Ensure the queue process is unavailable
lists:foreach(fun(Srv) -> ensure_qq_proc_dead(Config, Srv, RaName) end, Servers),

% Add policy with higher priority, allowing even more messages.
ExpectedMaxLength3 = 30,
Priority3 = 3,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_3">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength3}, {<<"overflow">>, <<"reject-publish">>}],
Priority3,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Restart the queue process.
{ok, Queue} =
rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_amqqueue,
lookup,
[{resource, <<"/">>, queue, QQ}]),
lists:foreach(
fun(Srv) ->
rabbit_ct_broker_helpers:rpc(
Config,
Srv,
rabbit_quorum_queue,
recover,
[foo, [Queue]]
)
end,
Servers),

% Wait for the queue to be available again.
lists:foreach(fun(Srv) ->
rabbit_ct_helpers:await_condition(
fun () ->
is_pid(
rabbit_ct_broker_helpers:rpc(
Config,
Srv,
erlang,
whereis,
[RaName]))
end)
end,
Servers),

% Wait for the policy to apply
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength3 + some messages but after consuming all messages only
% MaxLength3 are retrieved.
% Checking twice to ensure consistency.
% + 1 because QQs let one pass
publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 1),
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength3 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ).

priority_queue_fifo(Config) ->
%% testing: if hi priority messages are published before lo priority
%% messages they are always consumed first (fifo)
Expand Down Expand Up @@ -4333,3 +4487,28 @@ lists_interleave([Item | Items], List)
{Left, Right} = lists:split(2, List),
Left ++ [Item | lists_interleave(Items, Right)].

publish_confirm_many(Ch, Queue, Count) ->
lists:foreach(fun(_) -> publish_confirm(Ch, Queue) end, lists:seq(1, Count)).

consume_all(Ch, QQ) ->
Consume = fun C(Acc) ->
case amqp_channel:call(Ch, #'basic.get'{queue = QQ}) of
{#'basic.get_ok'{}, Msg} ->
C([Msg | Acc]);
_ ->
Acc
end
end,
Consume([]).

ensure_qq_proc_dead(Config, Server, RaName) ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the target process recovers in fewer than 500ms, this function will loop forever.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ra supervisor has a max restart intensity of 2 restarts per 5 seconds https://github.com/rabbitmq/ra/blob/main/src/ra_server_sup.erl#L36-L37. So supervisor will give up eventually.
Otoh if the process restart takes more than 500ms then this loop would stop before the process is dead completely. But I think this is highly unlikely for a test queue.

case rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [RaName]) of
undefined ->
ok;
Pid ->
rabbit_ct_broker_helpers:rpc(Config, Server, erlang, exit, [Pid, kill]),
%% Give some time for the supervisor to restart the process
timer:sleep(500),
ensure_qq_proc_dead(Config, Server, RaName)
end.

Loading