Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,13 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
{{utf8, <<"durable">>}, {boolean, Durable}},
{{utf8, <<"auto_delete">>}, {boolean, AutoDelete}},
{{utf8, <<"exclusive">>}, {boolean, Exclusive}},
{{utf8, <<"type">>}, {utf8, rabbit_queue_type:to_binary(QType)}},
{{utf8, <<"type">>},
{utf8, case rabbit_queue_type:short_alias_of(QType) of
undefined ->
atom_to_binary(QType);
ShortName ->
ShortName
end}},
{{utf8, <<"arguments">>}, QArgs}
],
KVList1 = if is_list(Replicas) ->
Expand Down
25 changes: 2 additions & 23 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
close/1,
discover/1,
short_alias_of/1,
feature_flag_name/1,
to_binary/1,
default/0,
default_alias/0,
fallback/0,
Expand Down Expand Up @@ -300,7 +298,7 @@ discover(Other) when is_binary(Other) ->
{ok, Mod} = rabbit_registry:lookup_module(queue, T),
Mod.

-spec short_alias_of(queue_type()) -> binary().
-spec short_alias_of(queue_type()) -> undefined | binary().
%% The opposite of discover/1: returns a short alias given a module name
short_alias_of(<<"rabbit_quorum_queue">>) ->
<<"quorum">>;
Expand Down Expand Up @@ -335,15 +333,6 @@ short_alias_of(<<"stream">>) ->
short_alias_of(_Other) ->
undefined.

feature_flag_name(<<"quorum">>) ->
quorum_queue;
feature_flag_name(<<"classic">>) ->
undefined;
feature_flag_name(<<"stream">>) ->
stream_queue;
feature_flag_name(_) ->
undefined.

%% If the client does not specify the type, the virtual host does not have any
%% metadata default, and rabbit.default_queue_type is not set in the application env,
%% use this type as the last resort.
Expand All @@ -362,19 +351,9 @@ default() ->
default_alias() ->
short_alias_of(default()).

-spec to_binary(module()) -> binary().
to_binary(rabbit_classic_queue) ->
<<"classic">>;
to_binary(rabbit_quorum_queue) ->
<<"quorum">>;
to_binary(rabbit_stream_queue) ->
<<"stream">>;
to_binary(Other) ->
atom_to_binary(Other).

%% is a specific queue type implementation enabled
-spec is_enabled(module()) -> boolean().
is_enabled(Type) ->
is_enabled(Type) when is_atom(Type) ->
Type:is_enabled().

-spec is_compatible(module(), boolean(), boolean(), boolean()) ->
Expand Down
19 changes: 9 additions & 10 deletions deps/rabbit/src/rabbit_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,18 @@ do_add(Name, Metadata, ActingUser) ->
case Metadata of
#{default_queue_type := DQT} ->
%% check that the queue type is known
rabbit_log:debug("Default queue type of virtual host '~ts' is ~tp", [Name, DQT]),
rabbit_log:debug("Default queue type of virtual host '~ts' is ~tp",
[Name, DQT]),
try rabbit_queue_type:discover(DQT) of
_ ->
case rabbit_queue_type:feature_flag_name(DQT) of
undefined -> ok;
Flag when is_atom(Flag) ->
case rabbit_feature_flags:is_enabled(Flag) of
true -> ok;
false -> throw({error, queue_type_feature_flag_is_not_enabled})
end
QueueType when is_atom(QueueType) ->
case rabbit_queue_type:is_enabled(QueueType) of
true ->
ok;
false ->
throw({error, queue_type_feature_flag_is_not_enabled})
end
catch _:_ ->
throw({error, invalid_queue_type, DQT})
throw({error, invalid_queue_type, DQT})
end;
_ ->
ok
Expand Down
Loading