Skip to content

Commit 26a5301

Browse files
Make 'queue.declare' aware of virtual host DQT
at validation time. DQT = default queue type. When a client provides no queue type, validation should take the defaults (virtual host, global, and the last resort fallback) into account instead of considering the type to be "undefined". References #11457 ##11528
1 parent f5cb65b commit 26a5301

File tree

4 files changed

+99
-11
lines changed

4 files changed

+99
-11
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
-export([is_server_named_allowed/1]).
6464

6565
-export([check_max_age/1]).
66-
-export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]).
66+
-export([get_queue_type/1, get_queue_type/2, get_resource_vhost_name/1, get_resource_name/1]).
6767

6868
-export([deactivate_limit_all/2]).
6969

@@ -220,8 +220,11 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
220220
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
221221
declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
222222
Owner, ActingUser, Node) ->
223-
ok = check_declare_arguments(QueueName, Args),
224-
Type = get_queue_type(Args),
223+
%% note: this is a module name, not a shortcut such as <<"quorum">>
224+
DQT = rabbit_vhost:default_queue_type(VHost, rabbit_queue_type:fallback()),
225+
rabbit_log:debug("Args: ~tp, DQT: ~tp", [Args, DQT]),
226+
ok = check_declare_arguments(QueueName, Args, DQT),
227+
Type = get_queue_type(Args, DQT),
225228
case rabbit_queue_type:is_enabled(Type) of
226229
true ->
227230
Q = amqqueue:new(QueueName,
@@ -248,10 +251,19 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
248251
[rabbit_misc:rs(QueueName), Type, Node]}
249252
end.
250253

254+
-spec get_queue_type(Args :: rabbit_framing:amqp_table()) -> rabbit_queue_type:queue_type().
255+
%% This version is not virtual host metadata-aware but will use
256+
%% the node-wide default type as well as 'rabbit_queue_type:fallback/0'.
251257
get_queue_type(Args) ->
258+
get_queue_type(Args, rabbit_queue_type:default()).
259+
260+
%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'
261+
get_queue_type(Args, DefaultQueueType) ->
252262
case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
253-
undefined ->
254-
rabbit_queue_type:default();
263+
{longstr, undefined} ->
264+
rabbit_queue_type:discover(DefaultQueueType);
265+
{longstr, <<"undefined">>} ->
266+
rabbit_queue_type:discover(DefaultQueueType);
255267
{_, V} ->
256268
rabbit_queue_type:discover(V)
257269
end.
@@ -783,7 +795,31 @@ assert_args_equivalence(Q, NewArgs) ->
783795
QueueTypeArgs = rabbit_queue_type:arguments(queue_arguments, Type),
784796
rabbit_misc:assert_args_equivalence(ExistingArgs, NewArgs, QueueName, QueueTypeArgs).
785797

786-
check_declare_arguments(QueueName, Args) ->
798+
-spec maybe_inject_default_queue_type_shortcut_into_args(
799+
rabbit_framing:amqp_table(), rabbit_queue_type:queue_type()) -> rabbit_framing:amqp_table().
800+
maybe_inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType) ->
801+
case rabbit_misc:table_lookup(Args0, <<"x-queue-type">>) of
802+
{longstr, undefined} ->
803+
%% Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
804+
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
805+
{longstr, <<"undefined">>} ->
806+
%% Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
807+
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
808+
_ValueIsAlreadySet ->
809+
Args0
810+
end.
811+
812+
-spec inject_default_queue_type_shortcut_into_args(
813+
rabbit_framing:amqp_table(), rabbit_queue_type:queue_type()) -> rabbit_framing:amqp_table().
814+
inject_default_queue_type_shortcut_into_args(Args0, QueueType) ->
815+
Shortcut = rabbit_queue_type:short_alias_of(QueueType),
816+
NewVal = rabbit_data_coercion:to_binary(Shortcut),
817+
rabbit_misc:set_table_value(Args0, <<"x-queue-type">>, longstr, NewVal).
818+
819+
check_declare_arguments(QueueName, Args0, DefaultQueueType) ->
820+
%% If the x-queue-type was not provided by the client, inject the
821+
%% (virtual host, global or fallback) default before performing validation. MK.
822+
Args = maybe_inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType),
787823
check_arguments_type_and_value(QueueName, Args, [{<<"x-queue-type">>, fun check_queue_type/2}]),
788824
Type = get_queue_type(Args),
789825
QueueTypeArgs = rabbit_queue_type:arguments(queue_arguments, Type),

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
init/0,
1919
close/1,
2020
discover/1,
21+
short_alias_of/1,
2122
feature_flag_name/1,
2223
to_binary/1,
2324
default/0,
25+
fallback/0,
2426
is_enabled/1,
2527
is_compatible/4,
2628
declare/2,
@@ -70,7 +72,7 @@
7072
%% sequence number typically
7173
-type correlation() :: term().
7274
-type arguments() :: queue_arguments | consumer_arguments.
73-
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue.
75+
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue | module().
7476
%% see AMQP 1.0 §2.6.7
7577
-type delivery_count() :: sequence_no().
7678
-type credit() :: uint().
@@ -253,6 +255,11 @@
253255
-callback notify_decorators(amqqueue:amqqueue()) ->
254256
ok.
255257

258+
-spec discover(binary() | atom()) -> queue_type().
259+
discover(<<"undefined">>) ->
260+
fallback();
261+
discover(undefined) ->
262+
fallback();
256263
%% TODO: should this use a registry that's populated on boot?
257264
discover(<<"quorum">>) ->
258265
rabbit_quorum_queue;
@@ -264,9 +271,27 @@ discover(Other) when is_atom(Other) ->
264271
discover(rabbit_data_coercion:to_binary(Other));
265272
discover(Other) when is_binary(Other) ->
266273
T = rabbit_registry:binary_to_type(Other),
274+
rabbit_log:debug("Queue type discovery: will look up a module for type '~tp'", [T]),
267275
{ok, Mod} = rabbit_registry:lookup_module(queue, T),
268276
Mod.
269277

278+
-spec short_alias_of(queue_type()) -> binary().
279+
%% The opposite of discover/1: returns a short alias given a module name
280+
short_alias_of(<<"rabbit_quorum_queue">>) ->
281+
<<"quorum">>;
282+
short_alias_of(rabbit_quorum_queue) ->
283+
<<"quorum">>;
284+
short_alias_of(<<"rabbit_classic_queue">>) ->
285+
<<"classic">>;
286+
short_alias_of(rabbit_classic_queue) ->
287+
<<"classic">>;
288+
short_alias_of(<<"rabbit_stream_queue">>) ->
289+
<<"stream">>;
290+
short_alias_of(rabbit_stream_queue) ->
291+
<<"stream">>;
292+
short_alias_of(_Other) ->
293+
undefined.
294+
270295
feature_flag_name(<<"quorum">>) ->
271296
quorum_queue;
272297
feature_flag_name(<<"classic">>) ->
@@ -276,10 +301,19 @@ feature_flag_name(<<"stream">>) ->
276301
feature_flag_name(_) ->
277302
undefined.
278303

304+
%% If the client does not specify the type, the virtual host does not have any
305+
%% metadata default, and rabbit.default_queue_type is not set in the application env,
306+
%% use this type as the last resort.
307+
-spec fallback() -> queue_type().
308+
fallback() ->
309+
rabbit_classic_queue.
310+
311+
-spec default() -> queue_type().
279312
default() ->
280-
rabbit_misc:get_env(rabbit,
281-
default_queue_type,
282-
rabbit_classic_queue).
313+
V = rabbit_misc:get_env(rabbit,
314+
default_queue_type,
315+
fallback()),
316+
rabbit_data_coercion:to_atom(V).
283317

284318
-spec to_binary(module()) -> binary().
285319
to_binary(rabbit_classic_queue) ->

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
-export([delete_storage/1]).
2323
-export([vhost_down/1]).
2424
-export([put_vhost/6]).
25+
-export([default_queue_type/1, default_queue_type/2]).
2526

2627
%%
2728
%% API
@@ -481,6 +482,22 @@ default_name() ->
481482
undefined -> <<"/">>
482483
end.
483484

485+
-spec default_queue_type(VirtualHost :: vhost:name()) -> rabbit_queue_type:queue_type().
486+
default_queue_type(VirtualHost) ->
487+
default_queue_type(VirtualHost, rabbit_queue_type:fallback()).
488+
-spec default_queue_type(VirtualHost :: vhost:name(), Fallback :: rabbit_queue_type:queue_type()) -> rabbit_queue_type:queue_type().
489+
default_queue_type(VirtualHost, FallbackQueueType) ->
490+
case exists(VirtualHost) of
491+
false -> FallbackQueueType;
492+
true ->
493+
Record = lookup(VirtualHost),
494+
case vhost:get_default_queue_type(Record) of
495+
undefined -> FallbackQueueType;
496+
<<"undefined">> -> FallbackQueueType;
497+
Type -> Type
498+
end
499+
end.
500+
484501
-spec lookup(vhost:name()) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
485502
lookup(VHostName) ->
486503
case rabbit_db_vhost:get(VHostName) of

deps/rabbitmq_management/test/stats_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
-module(stats_SUITE).
99

1010
-include_lib("proper/include/proper.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
1112
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
1213

1314
-compile(export_all).
@@ -175,4 +176,4 @@ format_range_constant(_Config) ->
175176
SamplesFun),
176177
5 = proplists:get_value(publish, Got),
177178
PD = proplists:get_value(publish_details, Got),
178-
0.0 = proplists:get_value(rate, PD).
179+
?assertEqual(0.0, proplists:get_value(rate, PD)).

0 commit comments

Comments
 (0)