Skip to content

Commit 5de87aa

Browse files
Merge pull request #11541 from rabbitmq/mk-rabbit_queue-virtual-host-default-aware
Make 'queue.declare' aware of virtual host DQT at validation time
2 parents f5cb65b + 31dc3b8 commit 5de87aa

File tree

6 files changed

+129
-21
lines changed

6 files changed

+129
-21
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
-export([is_server_named_allowed/1]).
6464

6565
-export([check_max_age/1]).
66-
-export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]).
66+
-export([get_queue_type/1, get_queue_type/2, get_resource_vhost_name/1, get_resource_name/1]).
6767

6868
-export([deactivate_limit_all/2]).
6969

@@ -220,8 +220,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
220220
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
221221
declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
222222
Owner, ActingUser, Node) ->
223-
ok = check_declare_arguments(QueueName, Args),
224-
Type = get_queue_type(Args),
223+
%% note: this is a module name, not a shortcut such as <<"quorum">>
224+
DQT = rabbit_vhost:default_queue_type(VHost, rabbit_queue_type:fallback()),
225+
ok = check_declare_arguments(QueueName, Args, DQT),
226+
Type = get_queue_type(Args, DQT),
225227
case rabbit_queue_type:is_enabled(Type) of
226228
true ->
227229
Q = amqqueue:new(QueueName,
@@ -248,10 +250,25 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
248250
[rabbit_misc:rs(QueueName), Type, Node]}
249251
end.
250252

253+
-spec get_queue_type(Args :: rabbit_framing:amqp_table()) -> rabbit_queue_type:queue_type().
254+
%% This version is not virtual host metadata-aware but will use
255+
%% the node-wide default type as well as 'rabbit_queue_type:fallback/0'.
256+
get_queue_type([]) ->
257+
rabbit_queue_type:default();
251258
get_queue_type(Args) ->
259+
get_queue_type(Args, rabbit_queue_type:default()).
260+
261+
%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'
262+
get_queue_type([], DefaultQueueType) ->
263+
rabbit_queue_type:discover(DefaultQueueType);
264+
get_queue_type(Args, DefaultQueueType) ->
252265
case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
253266
undefined ->
254-
rabbit_queue_type:default();
267+
rabbit_queue_type:discover(DefaultQueueType);
268+
{longstr, undefined} ->
269+
rabbit_queue_type:discover(DefaultQueueType);
270+
{longstr, <<"undefined">>} ->
271+
rabbit_queue_type:discover(DefaultQueueType);
255272
{_, V} ->
256273
rabbit_queue_type:discover(V)
257274
end.
@@ -733,15 +750,20 @@ augment_declare_args(VHost, Durable, Exclusive, AutoDelete, Args0) ->
733750
case IsPermitted andalso IsCompatible of
734751
true ->
735752
%% patch up declare arguments with x-queue-type if there
736-
%% is a vhost default set the queue is druable and not exclusive
753+
%% is a vhost default set the queue is durable and not exclusive
737754
%% and there is no queue type argument
738755
%% present
739756
rabbit_misc:set_table_value(Args0,
740757
<<"x-queue-type">>,
741758
longstr,
742759
DefaultQueueType);
743760
false ->
744-
Args0
761+
%% if the properties are incompatible with the declared
762+
%% DQT, use the fall back type
763+
rabbit_misc:set_table_value(Args0,
764+
<<"x-queue-type">>,
765+
longstr,
766+
rabbit_queue_type:short_alias_of(rabbit_queue_type:fallback()))
745767
end;
746768
_ ->
747769
Args0
@@ -783,7 +805,33 @@ assert_args_equivalence(Q, NewArgs) ->
783805
QueueTypeArgs = rabbit_queue_type:arguments(queue_arguments, Type),
784806
rabbit_misc:assert_args_equivalence(ExistingArgs, NewArgs, QueueName, QueueTypeArgs).
785807

786-
check_declare_arguments(QueueName, Args) ->
808+
-spec maybe_inject_default_queue_type_shortcut_into_args(
809+
rabbit_framing:amqp_table(), rabbit_queue_type:queue_type()) -> rabbit_framing:amqp_table().
810+
maybe_inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType) ->
811+
case rabbit_misc:table_lookup(Args0, <<"x-queue-type">>) of
812+
undefined ->
813+
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
814+
{longstr, undefined} ->
815+
%% Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
816+
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
817+
{longstr, <<"undefined">>} ->
818+
%% Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
819+
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
820+
_ValueIsAlreadySet ->
821+
Args0
822+
end.
823+
824+
-spec inject_default_queue_type_shortcut_into_args(
825+
rabbit_framing:amqp_table(), rabbit_queue_type:queue_type()) -> rabbit_framing:amqp_table().
826+
inject_default_queue_type_shortcut_into_args(Args0, QueueType) ->
827+
Shortcut = rabbit_queue_type:short_alias_of(QueueType),
828+
NewVal = rabbit_data_coercion:to_binary(Shortcut),
829+
rabbit_misc:set_table_value(Args0, <<"x-queue-type">>, longstr, NewVal).
830+
831+
check_declare_arguments(QueueName, Args0, DefaultQueueType) ->
832+
%% If the x-queue-type was not provided by the client, inject the
833+
%% (virtual host, global or fallback) default before performing validation. MK.
834+
Args = maybe_inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType),
787835
check_arguments_type_and_value(QueueName, Args, [{<<"x-queue-type">>, fun check_queue_type/2}]),
788836
Type = get_queue_type(Args),
789837
QueueTypeArgs = rabbit_queue_type:arguments(queue_arguments, Type),

deps/rabbit/src/rabbit_priority_queue.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ priorities(Q) when ?is_amqqueue(Q) ->
129129
case lists:member(Type, Ints) of
130130
false -> none;
131131
true ->
132-
Max = min(RequestedMax, ?MAX_SUPPORTED_PRIORITY),
132+
%% make sure the value is no greater than ?MAX_SUPPORTED_PRIORITY but
133+
%% also is not negative
134+
Max = max(1, min(RequestedMax, ?MAX_SUPPORTED_PRIORITY)),
133135
lists:reverse(lists:seq(0, Max))
134136
end;
135137
_ -> none

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
init/0,
1919
close/1,
2020
discover/1,
21+
short_alias_of/1,
2122
feature_flag_name/1,
2223
to_binary/1,
2324
default/0,
25+
fallback/0,
2426
is_enabled/1,
2527
is_compatible/4,
2628
declare/2,
@@ -70,7 +72,7 @@
7072
%% sequence number typically
7173
-type correlation() :: term().
7274
-type arguments() :: queue_arguments | consumer_arguments.
73-
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue.
75+
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue | module().
7476
%% see AMQP 1.0 §2.6.7
7577
-type delivery_count() :: sequence_no().
7678
-type credit() :: uint().
@@ -253,20 +255,49 @@
253255
-callback notify_decorators(amqqueue:amqqueue()) ->
254256
ok.
255257

258+
-spec discover(binary() | atom()) -> queue_type().
259+
discover(<<"undefined">>) ->
260+
fallback();
261+
discover(undefined) ->
262+
fallback();
256263
%% TODO: should this use a registry that's populated on boot?
257264
discover(<<"quorum">>) ->
258265
rabbit_quorum_queue;
266+
discover(rabbit_quorum_queue) ->
267+
rabbit_quorum_queue;
259268
discover(<<"classic">>) ->
260269
rabbit_classic_queue;
270+
discover(rabbit_classic_queue) ->
271+
rabbit_classic_queue;
272+
discover(rabbit_stream_queue) ->
273+
rabbit_stream_queue;
261274
discover(<<"stream">>) ->
262275
rabbit_stream_queue;
263276
discover(Other) when is_atom(Other) ->
264277
discover(rabbit_data_coercion:to_binary(Other));
265278
discover(Other) when is_binary(Other) ->
266279
T = rabbit_registry:binary_to_type(Other),
280+
rabbit_log:debug("Queue type discovery: will look up a module for type '~tp'", [T]),
267281
{ok, Mod} = rabbit_registry:lookup_module(queue, T),
268282
Mod.
269283

284+
-spec short_alias_of(queue_type()) -> binary().
285+
%% The opposite of discover/1: returns a short alias given a module name
286+
short_alias_of(<<"rabbit_quorum_queue">>) ->
287+
<<"quorum">>;
288+
short_alias_of(rabbit_quorum_queue) ->
289+
<<"quorum">>;
290+
short_alias_of(<<"rabbit_classic_queue">>) ->
291+
<<"classic">>;
292+
short_alias_of(rabbit_classic_queue) ->
293+
<<"classic">>;
294+
short_alias_of(<<"rabbit_stream_queue">>) ->
295+
<<"stream">>;
296+
short_alias_of(rabbit_stream_queue) ->
297+
<<"stream">>;
298+
short_alias_of(_Other) ->
299+
undefined.
300+
270301
feature_flag_name(<<"quorum">>) ->
271302
quorum_queue;
272303
feature_flag_name(<<"classic">>) ->
@@ -276,10 +307,19 @@ feature_flag_name(<<"stream">>) ->
276307
feature_flag_name(_) ->
277308
undefined.
278309

310+
%% If the client does not specify the type, the virtual host does not have any
311+
%% metadata default, and rabbit.default_queue_type is not set in the application env,
312+
%% use this type as the last resort.
313+
-spec fallback() -> queue_type().
314+
fallback() ->
315+
rabbit_classic_queue.
316+
317+
-spec default() -> queue_type().
279318
default() ->
280-
rabbit_misc:get_env(rabbit,
281-
default_queue_type,
282-
rabbit_classic_queue).
319+
V = rabbit_misc:get_env(rabbit,
320+
default_queue_type,
321+
fallback()),
322+
rabbit_data_coercion:to_atom(V).
283323

284324
-spec to_binary(module()) -> binary().
285325
to_binary(rabbit_classic_queue) ->

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
-export([delete_storage/1]).
2323
-export([vhost_down/1]).
2424
-export([put_vhost/6]).
25+
-export([default_queue_type/1, default_queue_type/2]).
2526

2627
%%
2728
%% API
@@ -481,6 +482,22 @@ default_name() ->
481482
undefined -> <<"/">>
482483
end.
483484

485+
-spec default_queue_type(VirtualHost :: vhost:name()) -> rabbit_queue_type:queue_type().
486+
default_queue_type(VirtualHost) ->
487+
default_queue_type(VirtualHost, rabbit_queue_type:fallback()).
488+
-spec default_queue_type(VirtualHost :: vhost:name(), Fallback :: rabbit_queue_type:queue_type()) -> rabbit_queue_type:queue_type().
489+
default_queue_type(VirtualHost, FallbackQueueType) ->
490+
case exists(VirtualHost) of
491+
false -> FallbackQueueType;
492+
true ->
493+
Record = lookup(VirtualHost),
494+
case vhost:get_default_queue_type(Record) of
495+
undefined -> FallbackQueueType;
496+
<<"undefined">> -> FallbackQueueType;
497+
Type -> Type
498+
end
499+
end.
500+
484501
-spec lookup(vhost:name()) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
485502
lookup(VHostName) ->
486503
case rabbit_db_vhost:get(VHostName) of

deps/rabbitmq_management/src/rabbit_mgmt_wm_definitions.erl

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ all_definitions(ReqData, Context) ->
6363
{rabbitmq_version, rabbit_data_coercion:to_binary(Vsn)},
6464
{product_name, rabbit_data_coercion:to_binary(ProductName)},
6565
{product_version, rabbit_data_coercion:to_binary(ProductVersion)}] ++
66-
filter(
66+
retain_whitelisted(
6767
[{users, rabbit_mgmt_wm_users:users(all)},
6868
{vhosts, rabbit_mgmt_wm_vhosts:basic()},
6969
{permissions, rabbit_mgmt_wm_permissions:permissions()},
@@ -112,7 +112,7 @@ vhost_definitions(ReqData, VHost, Context) ->
112112
|| P <- rabbit_runtime_parameters:list(VHost)],
113113
rabbit_mgmt_util:reply(
114114
[{rabbit_version, rabbit_data_coercion:to_binary(Vsn)}] ++
115-
filter(
115+
retain_whitelisted(
116116
[{parameters, Parameters},
117117
{policies, [strip_vhost(P) || P <- rabbit_mgmt_wm_policies:basic(ReqData)]},
118118
{queues, Qs},
@@ -266,14 +266,14 @@ rw_state() ->
266266
{bindings, [source, vhost, destination, destination_type, routing_key,
267267
arguments]}].
268268

269-
filter(Items) ->
270-
[filter_items(N, V, proplists:get_value(N, rw_state())) || {N, V} <- Items].
269+
retain_whitelisted(Items) ->
270+
[retain_whitelisted_items(N, V, proplists:get_value(N, rw_state())) || {N, V} <- Items].
271271

272-
filter_items(Name, List, Allowed) ->
273-
{Name, [filter_item(I, Allowed) || I <- List]}.
272+
retain_whitelisted_items(Name, List, Allowed) ->
273+
{Name, [only_whitelisted_for_item(I, Allowed) || I <- List]}.
274274

275-
filter_item(Item, Allowed) ->
276-
[{K, Fact} || {K, Fact} <- Item, lists:member(K, Allowed)].
275+
only_whitelisted_for_item(Item, Allowed) ->
276+
[{K, Fact} || {K, Fact} <- Item, lists:member(K, Allowed), Fact =/= undefined].
277277

278278
strip_vhost(Item) ->
279279
lists:keydelete(vhost, 1, Item).

deps/rabbitmq_management/test/stats_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
-module(stats_SUITE).
99

1010
-include_lib("proper/include/proper.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
1112
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
1213

1314
-compile(export_all).
@@ -175,4 +176,4 @@ format_range_constant(_Config) ->
175176
SamplesFun),
176177
5 = proplists:get_value(publish, Got),
177178
PD = proplists:get_value(publish_details, Got),
178-
0.0 = proplists:get_value(rate, PD).
179+
?assertEqual(0.0, proplists:get_value(rate, PD)).

0 commit comments

Comments
 (0)