Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 55 additions & 7 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
-export([is_server_named_allowed/1]).

-export([check_max_age/1]).
-export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]).
-export([get_queue_type/1, get_queue_type/2, get_resource_vhost_name/1, get_resource_name/1]).

-export([deactivate_limit_all/2]).

Expand Down Expand Up @@ -220,8 +220,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
Owner, ActingUser, Node) ->
ok = check_declare_arguments(QueueName, Args),
Type = get_queue_type(Args),
%% note: this is a module name, not a shortcut such as <<"quorum">>
DQT = rabbit_vhost:default_queue_type(VHost, rabbit_queue_type:fallback()),
ok = check_declare_arguments(QueueName, Args, DQT),
Type = get_queue_type(Args, DQT),
case rabbit_queue_type:is_enabled(Type) of
true ->
Q = amqqueue:new(QueueName,
Expand All @@ -248,10 +250,25 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
[rabbit_misc:rs(QueueName), Type, Node]}
end.

-spec get_queue_type(Args :: rabbit_framing:amqp_table()) -> rabbit_queue_type:queue_type().
%% This version is not virtual host metadata-aware but will use
%% the node-wide default type as well as 'rabbit_queue_type:fallback/0'.
get_queue_type([]) ->
rabbit_queue_type:default();
get_queue_type(Args) ->
get_queue_type(Args, rabbit_queue_type:default()).

%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'
get_queue_type([], DefaultQueueType) ->
rabbit_queue_type:discover(DefaultQueueType);
get_queue_type(Args, DefaultQueueType) ->
case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
undefined ->
rabbit_queue_type:default();
rabbit_queue_type:discover(DefaultQueueType);
{longstr, undefined} ->
rabbit_queue_type:discover(DefaultQueueType);
{longstr, <<"undefined">>} ->
rabbit_queue_type:discover(DefaultQueueType);
{_, V} ->
rabbit_queue_type:discover(V)
end.
Expand Down Expand Up @@ -733,15 +750,20 @@ augment_declare_args(VHost, Durable, Exclusive, AutoDelete, Args0) ->
case IsPermitted andalso IsCompatible of
true ->
%% patch up declare arguments with x-queue-type if there
%% is a vhost default set the queue is druable and not exclusive
%% is a vhost default set the queue is durable and not exclusive
%% and there is no queue type argument
%% present
rabbit_misc:set_table_value(Args0,
<<"x-queue-type">>,
longstr,
DefaultQueueType);
false ->
Args0
%% if the properties are incompatible with the declared
%% DQT, use the fall back type
rabbit_misc:set_table_value(Args0,
<<"x-queue-type">>,
longstr,
rabbit_queue_type:short_alias_of(rabbit_queue_type:fallback()))
end;
_ ->
Args0
Expand Down Expand Up @@ -783,7 +805,33 @@ assert_args_equivalence(Q, NewArgs) ->
QueueTypeArgs = rabbit_queue_type:arguments(queue_arguments, Type),
rabbit_misc:assert_args_equivalence(ExistingArgs, NewArgs, QueueName, QueueTypeArgs).

check_declare_arguments(QueueName, Args) ->
-spec maybe_inject_default_queue_type_shortcut_into_args(
rabbit_framing:amqp_table(), rabbit_queue_type:queue_type()) -> rabbit_framing:amqp_table().
maybe_inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType) ->
case rabbit_misc:table_lookup(Args0, <<"x-queue-type">>) of
undefined ->
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
{longstr, undefined} ->
%% Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
{longstr, <<"undefined">>} ->
%% Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
_ValueIsAlreadySet ->
Args0
end.

-spec inject_default_queue_type_shortcut_into_args(
rabbit_framing:amqp_table(), rabbit_queue_type:queue_type()) -> rabbit_framing:amqp_table().
inject_default_queue_type_shortcut_into_args(Args0, QueueType) ->
Shortcut = rabbit_queue_type:short_alias_of(QueueType),
NewVal = rabbit_data_coercion:to_binary(Shortcut),
rabbit_misc:set_table_value(Args0, <<"x-queue-type">>, longstr, NewVal).

check_declare_arguments(QueueName, Args0, DefaultQueueType) ->
%% If the x-queue-type was not provided by the client, inject the
%% (virtual host, global or fallback) default before performing validation. MK.
Args = maybe_inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType),
check_arguments_type_and_value(QueueName, Args, [{<<"x-queue-type">>, fun check_queue_type/2}]),
Type = get_queue_type(Args),
QueueTypeArgs = rabbit_queue_type:arguments(queue_arguments, Type),
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/src/rabbit_priority_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ priorities(Q) when ?is_amqqueue(Q) ->
case lists:member(Type, Ints) of
false -> none;
true ->
Max = min(RequestedMax, ?MAX_SUPPORTED_PRIORITY),
%% make sure the value is no greater than ?MAX_SUPPORTED_PRIORITY but
%% also is not negative
Max = max(1, min(RequestedMax, ?MAX_SUPPORTED_PRIORITY)),
lists:reverse(lists:seq(0, Max))
end;
_ -> none
Expand Down
48 changes: 44 additions & 4 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
init/0,
close/1,
discover/1,
short_alias_of/1,
feature_flag_name/1,
to_binary/1,
default/0,
fallback/0,
is_enabled/1,
is_compatible/4,
declare/2,
Expand Down Expand Up @@ -70,7 +72,7 @@
%% sequence number typically
-type correlation() :: term().
-type arguments() :: queue_arguments | consumer_arguments.
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue.
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue | module().
%% see AMQP 1.0 §2.6.7
-type delivery_count() :: sequence_no().
-type credit() :: uint().
Expand Down Expand Up @@ -253,20 +255,49 @@
-callback notify_decorators(amqqueue:amqqueue()) ->
ok.

-spec discover(binary() | atom()) -> queue_type().
discover(<<"undefined">>) ->
fallback();
discover(undefined) ->
fallback();
%% TODO: should this use a registry that's populated on boot?
discover(<<"quorum">>) ->
rabbit_quorum_queue;
discover(rabbit_quorum_queue) ->
rabbit_quorum_queue;
discover(<<"classic">>) ->
rabbit_classic_queue;
discover(rabbit_classic_queue) ->
rabbit_classic_queue;
discover(rabbit_stream_queue) ->
rabbit_stream_queue;
discover(<<"stream">>) ->
rabbit_stream_queue;
discover(Other) when is_atom(Other) ->
discover(rabbit_data_coercion:to_binary(Other));
discover(Other) when is_binary(Other) ->
T = rabbit_registry:binary_to_type(Other),
rabbit_log:debug("Queue type discovery: will look up a module for type '~tp'", [T]),
{ok, Mod} = rabbit_registry:lookup_module(queue, T),
Mod.

-spec short_alias_of(queue_type()) -> binary().
%% The opposite of discover/1: returns a short alias given a module name
short_alias_of(<<"rabbit_quorum_queue">>) ->
<<"quorum">>;
short_alias_of(rabbit_quorum_queue) ->
<<"quorum">>;
short_alias_of(<<"rabbit_classic_queue">>) ->
<<"classic">>;
short_alias_of(rabbit_classic_queue) ->
<<"classic">>;
short_alias_of(<<"rabbit_stream_queue">>) ->
<<"stream">>;
short_alias_of(rabbit_stream_queue) ->
<<"stream">>;
short_alias_of(_Other) ->
undefined.

feature_flag_name(<<"quorum">>) ->
quorum_queue;
feature_flag_name(<<"classic">>) ->
Expand All @@ -276,10 +307,19 @@ feature_flag_name(<<"stream">>) ->
feature_flag_name(_) ->
undefined.

%% If the client does not specify the type, the virtual host does not have any
%% metadata default, and rabbit.default_queue_type is not set in the application env,
%% use this type as the last resort.
-spec fallback() -> queue_type().
fallback() ->
rabbit_classic_queue.

-spec default() -> queue_type().
default() ->
rabbit_misc:get_env(rabbit,
default_queue_type,
rabbit_classic_queue).
V = rabbit_misc:get_env(rabbit,
default_queue_type,
fallback()),
rabbit_data_coercion:to_atom(V).

-spec to_binary(module()) -> binary().
to_binary(rabbit_classic_queue) ->
Expand Down
17 changes: 17 additions & 0 deletions deps/rabbit/src/rabbit_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
-export([delete_storage/1]).
-export([vhost_down/1]).
-export([put_vhost/6]).
-export([default_queue_type/1, default_queue_type/2]).

%%
%% API
Expand Down Expand Up @@ -481,6 +482,22 @@ default_name() ->
undefined -> <<"/">>
end.

-spec default_queue_type(VirtualHost :: vhost:name()) -> rabbit_queue_type:queue_type().
default_queue_type(VirtualHost) ->
default_queue_type(VirtualHost, rabbit_queue_type:fallback()).
-spec default_queue_type(VirtualHost :: vhost:name(), Fallback :: rabbit_queue_type:queue_type()) -> rabbit_queue_type:queue_type().
default_queue_type(VirtualHost, FallbackQueueType) ->
case exists(VirtualHost) of
false -> FallbackQueueType;
true ->
Record = lookup(VirtualHost),
case vhost:get_default_queue_type(Record) of
undefined -> FallbackQueueType;
<<"undefined">> -> FallbackQueueType;
Type -> Type
end
end.

-spec lookup(vhost:name()) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
lookup(VHostName) ->
case rabbit_db_vhost:get(VHostName) of
Expand Down
16 changes: 8 additions & 8 deletions deps/rabbitmq_management/src/rabbit_mgmt_wm_definitions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ all_definitions(ReqData, Context) ->
{rabbitmq_version, rabbit_data_coercion:to_binary(Vsn)},
{product_name, rabbit_data_coercion:to_binary(ProductName)},
{product_version, rabbit_data_coercion:to_binary(ProductVersion)}] ++
filter(
retain_whitelisted(
[{users, rabbit_mgmt_wm_users:users(all)},
{vhosts, rabbit_mgmt_wm_vhosts:basic()},
{permissions, rabbit_mgmt_wm_permissions:permissions()},
Expand Down Expand Up @@ -112,7 +112,7 @@ vhost_definitions(ReqData, VHost, Context) ->
|| P <- rabbit_runtime_parameters:list(VHost)],
rabbit_mgmt_util:reply(
[{rabbit_version, rabbit_data_coercion:to_binary(Vsn)}] ++
filter(
retain_whitelisted(
[{parameters, Parameters},
{policies, [strip_vhost(P) || P <- rabbit_mgmt_wm_policies:basic(ReqData)]},
{queues, Qs},
Expand Down Expand Up @@ -266,14 +266,14 @@ rw_state() ->
{bindings, [source, vhost, destination, destination_type, routing_key,
arguments]}].

filter(Items) ->
[filter_items(N, V, proplists:get_value(N, rw_state())) || {N, V} <- Items].
retain_whitelisted(Items) ->
[retain_whitelisted_items(N, V, proplists:get_value(N, rw_state())) || {N, V} <- Items].

filter_items(Name, List, Allowed) ->
{Name, [filter_item(I, Allowed) || I <- List]}.
retain_whitelisted_items(Name, List, Allowed) ->
{Name, [only_whitelisted_for_item(I, Allowed) || I <- List]}.

filter_item(Item, Allowed) ->
[{K, Fact} || {K, Fact} <- Item, lists:member(K, Allowed)].
only_whitelisted_for_item(Item, Allowed) ->
[{K, Fact} || {K, Fact} <- Item, lists:member(K, Allowed), Fact =/= undefined].

strip_vhost(Item) ->
lists:keydelete(vhost, 1, Item).
3 changes: 2 additions & 1 deletion deps/rabbitmq_management/test/stats_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
-module(stats_SUITE).

-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").

-compile(export_all).
Expand Down Expand Up @@ -175,4 +176,4 @@ format_range_constant(_Config) ->
SamplesFun),
5 = proplists:get_value(publish, Got),
PD = proplists:get_value(publish_details, Got),
0.0 = proplists:get_value(rate, PD).
?assertEqual(0.0, proplists:get_value(rate, PD)).