Skip to content

Commit d9ac549

Browse files
Backport a slew of Default Queue Type (DQT)-related fixes
Some context: * a8901bc * 61e82d8 * https://github.com/michaelklishin/rabbitmq-3.13-dqt-bug-reproduction References #11541, #12109, #12821, #13837.
1 parent 0a9bdfc commit d9ac549

File tree

6 files changed

+397
-9
lines changed

6 files changed

+397
-9
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,7 @@ augment_declare_args(VHost, Durable, Exclusive, AutoDelete, Args0) ->
753753
case vhost:get_metadata(V) of
754754
#{default_queue_type := DefaultQueueType}
755755
when is_binary(DefaultQueueType) andalso
756+
DefaultQueueType =/= <<"undefined">> andalso
756757
not HasQTypeArg ->
757758
update_args_table_with_queue_type(DefaultQueueType, Durable, Exclusive, AutoDelete, Args0);
758759
_ ->

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -841,17 +841,20 @@ inject_dqt(VHost) when ?is_vhost(VHost) ->
841841
inject_dqt(vhost:to_map(VHost));
842842
inject_dqt(VHost) when is_list(VHost) ->
843843
inject_dqt(rabbit_data_coercion:to_map(VHost));
844-
inject_dqt(M = #{default_queue_type := undefined}) ->
845-
NQT = short_alias_of(default()),
846-
Meta0 = maps:get(metadata, M, #{}),
847-
Meta = Meta0#{default_queue_type => NQT},
848-
849-
M#{default_queue_type => NQT, metadata => Meta};
850-
inject_dqt(M = #{default_queue_type := DQT}) ->
844+
inject_dqt(M) when is_map(M) ->
845+
RawDQT = maps:get(default_queue_type, M, undefined),
846+
%% See rabbitmq/rabbitmq-server#10469
847+
DQT = case RawDQT of
848+
undefined -> default();
849+
null -> default();
850+
nil -> default();
851+
<<"undefined">> -> default();
852+
"undefined" -> default();
853+
Valid -> Valid
854+
end,
851855
NQT = short_alias_of(DQT),
852856
Meta0 = maps:get(metadata, M, #{}),
853857
Meta = Meta0#{default_queue_type => NQT},
854-
855858
M#{default_queue_type => NQT, metadata => Meta}.
856859

857860
-spec vhosts_with_dqt([any()]) -> [map()].

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,18 @@ update_metadata(Name, Metadata0, _ActingUser) when is_map(Metadata0) andalso map
309309
end;
310310
update_metadata(Name, Metadata0, ActingUser) ->
311311
KnownKeys = [description, tags, default_queue_type, protected_from_deletion],
312-
Metadata = maps:with(KnownKeys, Metadata0),
312+
Metadata1 = maps:with(KnownKeys, Metadata0),
313+
%% See rabbitmq/rabbitmq-server#10469
314+
Metadata = case Metadata1 of
315+
#{default_queue_type := <<"undefined">>} ->
316+
maps:remove(default_queue_type, Metadata1);
317+
#{default_queue_type := null} ->
318+
maps:remove(default_queue_type, Metadata1);
319+
#{default_queue_type := nil} ->
320+
maps:remove(default_queue_type, Metadata1);
321+
_ ->
322+
Metadata1
323+
end,
313324

314325
case rabbit_db_vhost:merge_metadata(Name, Metadata) of
315326
{ok, VHost} ->
@@ -713,6 +724,12 @@ i(metadata, VHost) ->
713724
#{default_queue_type => DQT};
714725
M = #{default_queue_type := undefined} ->
715726
M#{default_queue_type => DQT};
727+
M = #{default_queue_type := <<"undefined">>} ->
728+
M#{default_queue_type => DQT};
729+
M = #{default_queue_type := null} ->
730+
M#{default_queue_type => DQT};
731+
M = #{default_queue_type := nil} ->
732+
M#{default_queue_type => DQT};
716733
M = #{default_queue_type := QT} ->
717734
M#{default_queue_type => rabbit_queue_type:short_alias_of(QT)};
718735
M when is_map(M) ->

deps/rabbit/src/vhost.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,15 @@ new_metadata(Description, Tags, undefined) ->
217217
#{description => Description,
218218
default_queue_type => rabbit_queue_type:default_alias(),
219219
tags => Tags};
220+
new_metadata(Description, Tags, <<"undefined">>) ->
221+
%% See rabbitmq/rabbitmq-server#10469
222+
new_metadata(Description, Tags, undefined);
223+
new_metadata(Description, Tags, null) ->
224+
%% JSON null (thoas), see rabbitmq/rabbitmq-server#10469
225+
new_metadata(Description, Tags, undefined);
226+
new_metadata(Description, Tags, nil) ->
227+
%% JSON null (Elixir JSON), see rabbitmq/rabbitmq-server#10469
228+
new_metadata(Description, Tags, undefined);
220229
new_metadata(Description, Tags, DefaultQueueType) ->
221230
#{description => Description,
222231
tags => Tags,
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(default_queue_type_prop_SUITE).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("proper/include/proper.hrl").
12+
13+
-import(rabbit_ct_broker_helpers, [rpc/5]).
14+
15+
-compile(nowarn_export_all).
16+
-compile(export_all).
17+
18+
all() ->
19+
[
20+
prop_inject_dqt_output_invariants,
21+
prop_inject_dqt_preserves_valid_types
22+
].
23+
24+
%% -------------------------------------------------------------------
25+
%% Test suite setup/teardown.
26+
%% -------------------------------------------------------------------
27+
28+
init_per_suite(Config) ->
29+
rabbit_ct_helpers:log_environment(),
30+
rabbit_ct_helpers:run_setup_steps(Config).
31+
32+
end_per_suite(Config) ->
33+
rabbit_ct_helpers:run_teardown_steps(Config).
34+
35+
init_per_group(Group, Config) ->
36+
Config1 = rabbit_ct_helpers:set_config(Config, [
37+
{rmq_nodename_suffix, Group},
38+
{rmq_nodes_count, 1}
39+
]),
40+
rabbit_ct_helpers:run_steps(Config1,
41+
rabbit_ct_broker_helpers:setup_steps() ++
42+
rabbit_ct_client_helpers:setup_steps()).
43+
44+
end_per_group(_Group, Config) ->
45+
rabbit_ct_helpers:run_steps(Config,
46+
rabbit_ct_client_helpers:teardown_steps() ++
47+
rabbit_ct_broker_helpers:teardown_steps()).
48+
49+
init_per_testcase(Testcase, Config) ->
50+
Config1 = rabbit_ct_helpers:set_config(Config, [
51+
{rmq_nodename_suffix, Testcase},
52+
{rmq_nodes_count, 1}
53+
]),
54+
Config2 = rabbit_ct_helpers:run_steps(Config1,
55+
rabbit_ct_broker_helpers:setup_steps() ++
56+
rabbit_ct_client_helpers:setup_steps()),
57+
rabbit_ct_helpers:testcase_started(Config2, Testcase).
58+
59+
end_per_testcase(Testcase, Config) ->
60+
Config1 = rabbit_ct_helpers:run_steps(Config,
61+
rabbit_ct_client_helpers:teardown_steps() ++
62+
rabbit_ct_broker_helpers:teardown_steps()),
63+
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
64+
65+
%% -------------------------------------------------------------------
66+
%% Property-based Tests
67+
%% -------------------------------------------------------------------
68+
69+
%% Property: inject_dqt always produces valid output regardless of input DQT value
70+
prop_inject_dqt_output_invariants(Config) ->
71+
Property = fun() -> prop_output_invariants(Config) end,
72+
rabbit_ct_proper_helpers:run_proper(Property, [], 100).
73+
74+
prop_output_invariants(Config) ->
75+
?FORALL({Name, DQT, ExtraMetaKeys},
76+
{vhost_name_gen(), dqt_gen(), list(atom())},
77+
begin
78+
ExtraMeta = maps:from_list([{K, K} || K <- ExtraMetaKeys]),
79+
Input = case DQT of
80+
none -> #{name => Name, metadata => ExtraMeta};
81+
Val -> #{name => Name, default_queue_type => Val, metadata => ExtraMeta}
82+
end,
83+
Result = rpc(Config, 0, rabbit_queue_type, inject_dqt, [Input]),
84+
85+
maps:is_key(default_queue_type, Result) andalso
86+
begin
87+
Meta = maps:get(metadata, Result, #{}),
88+
maps:is_key(default_queue_type, Meta) andalso
89+
lists:all(fun(K) -> maps:is_key(K, Meta) end, ExtraMetaKeys)
90+
end
91+
end).
92+
93+
%% Property: valid queue types are preserved, invalid ones fall back to default
94+
prop_inject_dqt_preserves_valid_types(Config) ->
95+
Property = fun() -> prop_preserves_valid_types(Config) end,
96+
rabbit_ct_proper_helpers:run_proper(Property, [], 100).
97+
98+
prop_preserves_valid_types(Config) ->
99+
Default = rpc(Config, 0, rabbit_queue_type, default_alias, []),
100+
?FORALL(DQT, dqt_gen(),
101+
begin
102+
Input = case DQT of
103+
none -> #{name => <<"/">>};
104+
Val -> #{name => <<"/">>, default_queue_type => Val}
105+
end,
106+
#{default_queue_type := ResultDQT} =
107+
rpc(Config, 0, rabbit_queue_type, inject_dqt, [Input]),
108+
109+
case DQT of
110+
none -> ResultDQT =:= Default;
111+
undefined -> ResultDQT =:= Default;
112+
<<"undefined">> -> ResultDQT =:= Default;
113+
"undefined" -> ResultDQT =:= Default;
114+
<<"classic">> -> ResultDQT =:= <<"classic">>;
115+
<<"quorum">> -> ResultDQT =:= <<"quorum">>;
116+
<<"stream">> -> ResultDQT =:= <<"stream">>
117+
end
118+
end).
119+
120+
%% -------------------------------------------------------------------
121+
%% Generators
122+
%% -------------------------------------------------------------------
123+
124+
vhost_name_gen() ->
125+
?LET(Name, non_empty(binary()), <<"/", Name/binary>>).
126+
127+
dqt_gen() ->
128+
oneof([
129+
none,
130+
undefined,
131+
<<"undefined">>,
132+
"undefined",
133+
<<"classic">>,
134+
<<"quorum">>,
135+
<<"stream">>
136+
]).

0 commit comments

Comments
 (0)