6363-export ([is_server_named_allowed /1 ]).
6464
6565-export ([check_max_age /1 ]).
66- -export ([get_queue_type /1 , get_resource_vhost_name /1 , get_resource_name /1 ]).
66+ -export ([get_queue_type /1 , get_queue_type / 2 , get_resource_vhost_name /1 , get_resource_name /1 ]).
6767
6868-export ([deactivate_limit_all /2 ]).
6969
@@ -220,8 +220,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
220220 {protocol_error , Type :: atom (), Reason :: string (), Args :: term ()}.
221221declare (QueueName = # resource {virtual_host = VHost }, Durable , AutoDelete , Args ,
222222 Owner , ActingUser , Node ) ->
223- ok = check_declare_arguments (QueueName , Args ),
224- Type = get_queue_type (Args ),
223+ % % note: this is a module name, not a shortcut such as <<"quorum">>
224+ DQT = rabbit_vhost :default_queue_type (VHost , rabbit_queue_type :fallback ()),
225+ ok = check_declare_arguments (QueueName , Args , DQT ),
226+ Type = get_queue_type (Args , DQT ),
225227 case rabbit_queue_type :is_enabled (Type ) of
226228 true ->
227229 Q = amqqueue :new (QueueName ,
@@ -248,10 +250,25 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
248250 [rabbit_misc :rs (QueueName ), Type , Node ]}
249251 end .
250252
253+ - spec get_queue_type (Args :: rabbit_framing :amqp_table ()) -> rabbit_queue_type :queue_type ().
254+ % % This version is not virtual host metadata-aware but will use
255+ % % the node-wide default type as well as 'rabbit_queue_type:fallback/0'.
256+ get_queue_type ([]) ->
257+ rabbit_queue_type :default ();
251258get_queue_type (Args ) ->
259+ get_queue_type (Args , rabbit_queue_type :default ()).
260+
261+ % % This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'
262+ get_queue_type ([], DefaultQueueType ) ->
263+ rabbit_queue_type :discover (DefaultQueueType );
264+ get_queue_type (Args , DefaultQueueType ) ->
252265 case rabbit_misc :table_lookup (Args , <<" x-queue-type" >>) of
253266 undefined ->
254- rabbit_queue_type :default ();
267+ rabbit_queue_type :discover (DefaultQueueType );
268+ {longstr , undefined } ->
269+ rabbit_queue_type :discover (DefaultQueueType );
270+ {longstr , <<" undefined" >>} ->
271+ rabbit_queue_type :discover (DefaultQueueType );
255272 {_ , V } ->
256273 rabbit_queue_type :discover (V )
257274 end .
@@ -783,7 +800,33 @@ assert_args_equivalence(Q, NewArgs) ->
783800 QueueTypeArgs = rabbit_queue_type :arguments (queue_arguments , Type ),
784801 rabbit_misc :assert_args_equivalence (ExistingArgs , NewArgs , QueueName , QueueTypeArgs ).
785802
786- check_declare_arguments (QueueName , Args ) ->
803+ - spec maybe_inject_default_queue_type_shortcut_into_args (
804+ rabbit_framing :amqp_table (), rabbit_queue_type :queue_type ()) -> rabbit_framing :amqp_table ().
805+ maybe_inject_default_queue_type_shortcut_into_args (Args0 , DefaultQueueType ) ->
806+ case rabbit_misc :table_lookup (Args0 , <<" x-queue-type" >>) of
807+ undefined ->
808+ inject_default_queue_type_shortcut_into_args ([], DefaultQueueType );
809+ {longstr , undefined } ->
810+ % % Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
811+ inject_default_queue_type_shortcut_into_args (Args0 , DefaultQueueType );
812+ {longstr , <<" undefined" >>} ->
813+ % % Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
814+ inject_default_queue_type_shortcut_into_args (Args0 , DefaultQueueType );
815+ _ValueIsAlreadySet ->
816+ Args0
817+ end .
818+
819+ - spec inject_default_queue_type_shortcut_into_args (
820+ rabbit_framing :amqp_table (), rabbit_queue_type :queue_type ()) -> rabbit_framing :amqp_table ().
821+ inject_default_queue_type_shortcut_into_args (Args0 , QueueType ) ->
822+ Shortcut = rabbit_queue_type :short_alias_of (QueueType ),
823+ NewVal = rabbit_data_coercion :to_binary (Shortcut ),
824+ rabbit_misc :set_table_value (Args0 , <<" x-queue-type" >>, longstr , NewVal ).
825+
826+ check_declare_arguments (QueueName , Args0 , DefaultQueueType ) ->
827+ % % If the x-queue-type was not provided by the client, inject the
828+ % % (virtual host, global or fallback) default before performing validation. MK.
829+ Args = maybe_inject_default_queue_type_shortcut_into_args (Args0 , DefaultQueueType ),
787830 check_arguments_type_and_value (QueueName , Args , [{<<" x-queue-type" >>, fun check_queue_type /2 }]),
788831 Type = get_queue_type (Args ),
789832 QueueTypeArgs = rabbit_queue_type :arguments (queue_arguments , Type ),
0 commit comments