Skip to content

Commit dcbd1e2

Browse files
Merge pull request #14339 from rabbitmq/mergify/bp/v4.1.x/pr-14304
Add a configurable limit for number of exchanges (backport #14304)
2 parents 8318530 + f9c27d3 commit dcbd1e2

File tree

4 files changed

+115
-18
lines changed

4 files changed

+115
-18
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,20 @@ end}.
10121012
{mapping, "max_message_size", "rabbit.max_message_size",
10131013
[{datatype, integer}, {validators, ["max_message_size"]}]}.
10141014

1015+
{mapping, "cluster_exchange_limit", "rabbit.cluster_exchange_limit",
1016+
[{datatype, [{atom, infinity}, integer]}, {validators, ["non_negative_integer"]}]}.
1017+
1018+
{translation, "rabbit.cluster_exchange_limit",
1019+
fun(Conf) ->
1020+
case cuttlefish:conf_get("cluster_exchange_limit", Conf, undefined) of
1021+
undefined -> cuttlefish:unset();
1022+
infinity -> infinity;
1023+
Val when is_integer(Val) -> Val;
1024+
_ -> cuttlefish:invalid("should be a non-negative integer")
1025+
end
1026+
end
1027+
}.
1028+
10151029
%% Customising Socket Options.
10161030
%%
10171031
%% See (https://www.erlang.org/doc/man/inet.html#setopts-2) for

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,11 @@ count_in_mnesia() ->
266266
mnesia:table_info(?MNESIA_TABLE, size).
267267

268268
count_in_khepri() ->
269-
Path = khepri_exchange_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
270-
case rabbit_khepri:count(Path) of
271-
{ok, Count} -> Count;
272-
_ -> 0
269+
try
270+
ets:info(?KHEPRI_PROJECTION, size)
271+
catch
272+
error:badarg ->
273+
0
273274
end.
274275

275276
%% -------------------------------------------------------------------
@@ -863,7 +864,12 @@ exists_in_mnesia(Name) ->
863864
ets:member(?MNESIA_TABLE, Name).
864865

865866
exists_in_khepri(Name) ->
866-
rabbit_khepri:exists(khepri_exchange_path(Name)).
867+
try
868+
ets:member(?KHEPRI_PROJECTION, Name)
869+
catch
870+
error:badarg ->
871+
false
872+
end.
867873

868874
%% -------------------------------------------------------------------
869875
%% clear().

deps/rabbit/src/rabbit_exchange.erl

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,13 @@ serial(X) ->
101101
Internal :: boolean(),
102102
Args :: rabbit_framing:amqp_table(),
103103
Username :: rabbit_types:username(),
104-
Ret :: {ok, rabbit_types:exchange()} | {error, timeout}.
104+
Ret :: {ok, rabbit_types:exchange()} |
105+
{error, timeout} |
106+
%% May exit with `#amqp_error{}` if validations fail:
107+
rabbit_types:channel_exit().
105108

106109
declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
110+
ok = check_exchange_limits(XName),
107111
X = rabbit_exchange_decorator:set(
108112
rabbit_policy:set(#exchange{name = XName,
109113
type = Type,
@@ -140,6 +144,25 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
140144
{ok, X}
141145
end.
142146

147+
check_exchange_limits(XName) ->
148+
Limit = rabbit_misc:get_env(rabbit, cluster_exchange_limit, infinity),
149+
case rabbit_db_exchange:count() >= Limit of
150+
false ->
151+
ok;
152+
true ->
153+
case rabbit_db_exchange:exists(XName) of
154+
true ->
155+
%% Allow re-declares of existing exchanges when at the
156+
%% exchange limit.
157+
ok;
158+
false ->
159+
rabbit_misc:protocol_error(
160+
precondition_failed,
161+
"cannot declare ~ts: exchange limit of ~tp is reached",
162+
[rabbit_misc:rs(XName), Limit])
163+
end
164+
end.
165+
143166
%% Used with binaries sent over the wire; the type may not exist.
144167

145168
-spec check_type

deps/rabbit/test/cluster_limit_SUITE.erl

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
-include_lib("amqp_client/include/amqp_client.hrl").
1313
-compile([nowarn_export_all, export_all]).
1414

15+
-define(EXCHANGE_LIMIT, 10).
1516

1617
all() ->
1718
[
@@ -22,7 +23,8 @@ groups() ->
2223
[
2324
{clustered, [],
2425
[
25-
{size_2, [], [queue_limit]}
26+
{size_2, [], [queue_limit,
27+
exchange_limit]}
2628
]}
2729
].
2830

@@ -34,7 +36,8 @@ init_per_suite(Config0) ->
3436
rabbit_ct_helpers:log_environment(),
3537
Config1 = rabbit_ct_helpers:merge_app_env(
3638
Config0, {rabbit, [{quorum_tick_interval, 1000},
37-
{cluster_queue_limit, 3}]}),
39+
{cluster_queue_limit, 3},
40+
{cluster_exchange_limit, ?EXCHANGE_LIMIT}]}),
3841
rabbit_ct_helpers:run_setup_steps(Config1, []).
3942

4043
end_per_suite(Config) ->
@@ -101,48 +104,99 @@ queue_limit(Config) ->
101104
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
102105
Q1 = ?config(queue_name, Config),
103106
?assertEqual({'queue.declare_ok', Q1, 0, 0},
104-
declare(Ch, Q1)),
107+
declare_queue(Ch, Q1)),
105108

106109
Q2 = ?config(alt_queue_name, Config),
107110
?assertEqual({'queue.declare_ok', Q2, 0, 0},
108-
declare(Ch, Q2)),
111+
declare_queue(Ch, Q2)),
109112

110113
Q3 = ?config(alt_2_queue_name, Config),
111114
?assertEqual({'queue.declare_ok', Q3, 0, 0},
112-
declare(Ch, Q3)),
115+
declare_queue(Ch, Q3)),
113116
Q4 = ?config(over_limit_queue_name, Config),
114117
ExpectedError = list_to_binary(io_lib:format("PRECONDITION_FAILED - cannot declare queue '~s': queue limit in cluster (3) is reached", [Q4])),
115118
?assertExit(
116119
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
117-
declare(Ch, Q4)),
120+
declare_queue(Ch, Q4)),
118121

119122
%% Trying the second server, in the cluster, but no queues on it,
120123
%% but should still fail as the limit is cluster wide.
121124
?assertExit(
122125
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
123-
declare(Ch2, Q4)),
126+
declare_queue(Ch2, Q4)),
124127

125128
%Trying other types of queues
126129
ChQQ = rabbit_ct_client_helpers:open_channel(Config, Server0),
127130
ChStream = rabbit_ct_client_helpers:open_channel(Config, Server1),
128131
?assertExit(
129132
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
130-
declare(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
133+
declare_queue(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
131134
?assertExit(
132135
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
133-
declare(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
136+
declare_queue(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
134137
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
135138
ok.
136139

137-
declare(Ch, Q) ->
138-
declare(Ch, Q, []).
140+
exchange_limit(Config) ->
141+
DefaultXs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, count, []),
142+
?assert(?EXCHANGE_LIMIT > DefaultXs),
139143

140-
declare(Ch, Q, Args) ->
144+
[Server0, Server1] =
145+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
146+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
147+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
148+
149+
%% Reach the limit.
150+
[begin
151+
XName = list_to_binary(rabbit_misc:format("x-~b", [N])),
152+
#'exchange.declare_ok'{} = declare_exchange(Ch1, XName, <<"fanout">>)
153+
end || N <- lists:seq(DefaultXs, ?EXCHANGE_LIMIT - 1)],
154+
155+
%% Trying to declare the next exchange fails.
156+
OverLimitXName = <<"over-limit-x">>,
157+
?assertExit(
158+
{{shutdown, {server_initiated_close, 406,
159+
<<"PRECONDITION_FAILED", _/binary>>}}, _},
160+
declare_exchange(Ch1, OverLimitXName, <<"fanout">>)),
161+
162+
%% Existing exchanges can be re-declared.
163+
ExistingX = list_to_binary(rabbit_misc:format("x-~b", [DefaultXs])),
164+
#'exchange.declare_ok'{} = declare_exchange(Ch2, ExistingX, <<"fanout">>),
165+
166+
%% The limit is cluster wide: the other node cannot declare the exchange
167+
%% either.
168+
?assertExit(
169+
{{shutdown, {server_initiated_close, 406,
170+
<<"PRECONDITION_FAILED", _/binary>>}}, _},
171+
declare_exchange(Ch2, OverLimitXName, <<"fanout">>)),
172+
173+
%% Clean up extra exchanges
174+
Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server0),
175+
[begin
176+
XName = list_to_binary(rabbit_misc:format("x-~b", [N])),
177+
#'exchange.delete_ok'{} = amqp_channel:call(
178+
Ch3,
179+
#'exchange.delete'{exchange = XName})
180+
end || N <- lists:seq(DefaultXs, ?EXCHANGE_LIMIT - 1)],
181+
182+
ok.
183+
184+
%% -------------------------------------------------------------------
185+
186+
declare_queue(Ch, Q) ->
187+
declare_queue(Ch, Q, []).
188+
189+
declare_queue(Ch, Q, Args) ->
141190
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
142191
durable = true,
143192
auto_delete = false,
144193
arguments = Args}).
145194

195+
declare_exchange(Ch, Name, Type) ->
196+
amqp_channel:call(Ch, #'exchange.declare'{exchange = Name,
197+
type = Type,
198+
durable = true}).
199+
146200
delete_queues() ->
147201
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
148202
|| Q <- rabbit_amqqueue:list()].

0 commit comments

Comments
 (0)