Skip to content

Commit 985c087

Browse files
the-mikedavismergify[bot]
authored andcommitted
amqp_client: Emit 'connection.blocked' in direct connections
Previously direct 0-9-1 connections did not notice when memory or disk alarms were set. This could allow an 0-9-1 shovel where the destination is a direct connection to completely overload a broker which is already in alarm. With this change, direct connections register the connection process with `rabbit_alarm` and emit `connection.blocked` and `connection.unblocked` to the blocked handler if one is registered. `rabbit_amqp091_shovel` already respects the `connection.blocked`, so the destination will not receive any messages. (cherry picked from commit 9393ec9)
1 parent c077b7a commit 985c087

File tree

3 files changed

+59
-6
lines changed

3 files changed

+59
-6
lines changed

deps/amqp_client/src/amqp_gen_connection.erl

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
server_properties,
3232
%% connection.block, connection.unblock handler
3333
block_handler,
34+
blocked_by = sets:new([{version, 2}]),
3435
closing = false %% #closing{} | false
3536
}).
3637

@@ -199,9 +200,36 @@ handle_cast({server_misbehaved, AmqpError}, State) ->
199200
server_misbehaved_close(AmqpError, State);
200201
handle_cast({server_close, #'connection.close'{} = Close}, State) ->
201202
server_initiated_close(Close, State);
202-
handle_cast({register_blocked_handler, HandlerPid}, State) ->
203+
handle_cast({register_blocked_handler, HandlerPid},
204+
#state{blocked_by = BlockedBy} = State) ->
203205
Ref = erlang:monitor(process, HandlerPid),
204-
{noreply, State#state{block_handler = {HandlerPid, Ref}}}.
206+
State1 = State#state{block_handler = {HandlerPid, Ref}},
207+
%% If an alarm is already active, immediately block the handler.
208+
_ = case sets:is_empty(BlockedBy) of
209+
false ->
210+
HandlerPid ! #'connection.blocked'{};
211+
true ->
212+
ok
213+
end,
214+
{noreply, State1};
215+
handle_cast({conserve_resources, Source, Conserve},
216+
#state{blocked_by = BlockedBy} = State) ->
217+
WasNotBlocked = sets:is_empty(BlockedBy),
218+
BlockedBy1 = case Conserve of
219+
true ->
220+
sets:add_element(Source, BlockedBy);
221+
false ->
222+
sets:del_element(Source, BlockedBy)
223+
end,
224+
State1 = State#state{blocked_by = BlockedBy1},
225+
case sets:is_empty(BlockedBy1) of
226+
true ->
227+
handle_method(#'connection.unblocked'{}, State1);
228+
false when WasNotBlocked ->
229+
handle_method(#'connection.blocked'{}, State1);
230+
false ->
231+
{noreply, State1}
232+
end.
205233

206234
%% @private
207235
handle_info({'DOWN', _, process, BlockHandler, Reason},

deps/rabbit/src/rabbit_direct.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
-deprecated([{force_event_refresh, 1, eventually}]).
1414

1515
%% Internal
16-
-export([list_local/0]).
16+
-export([list_local/0,
17+
conserve_resources/3]).
1718

1819
%% For testing only
1920
-export([extract_extra_auth_props/4]).
@@ -206,6 +207,8 @@ connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) ->
206207
ok -> ok = pg_local:join(rabbit_direct, Pid),
207208
rabbit_core_metrics:connection_created(Pid, Infos),
208209
rabbit_event:notify(connection_created, Infos),
210+
_ = rabbit_alarm:register(
211+
Pid, {?MODULE, conserve_resources, []}),
209212
{ok, {User, rabbit_reader:server_properties(Protocol)}}
210213
catch
211214
exit:#amqp_error{name = Reason = not_allowed} ->
@@ -252,3 +255,9 @@ disconnect(Pid, Infos) ->
252255
pg_local:leave(rabbit_direct, Pid),
253256
rabbit_core_metrics:connection_closed(Pid),
254257
rabbit_event:notify(connection_closed, Infos).
258+
259+
-spec conserve_resources(pid(),
260+
rabbit_alarm:resource_alarm_source(),
261+
rabbit_alarm:resource_alert()) -> 'ok'.
262+
conserve_resources(ChannelPid, Source, {_, Conserve, _}) ->
263+
gen_server:cast(ChannelPid, {conserve_resources, Source, Conserve}).

deps/rabbitmq_shovel/test/amqp091_alarm_SUITE.erl

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,20 @@
1616

1717
all() ->
1818
[
19-
{group, all_tests}
19+
{group, network_connection},
20+
{group, direct_connection}
2021
].
2122

2223
groups() ->
2324
[
24-
{all_tests, [], all_tests()}
25+
{network_connection, [], [
26+
dest_resource_alarm_on_confirm,
27+
dest_resource_alarm_on_publish,
28+
dest_resource_alarm_no_ack
29+
]},
30+
{direct_connection, [], [
31+
dest_resource_alarm_on_confirm
32+
]}
2533
].
2634

2735
all_tests() ->
@@ -51,13 +59,21 @@ end_per_suite(Config) ->
5159
rabbit_ct_client_helpers:teardown_steps() ++
5260
rabbit_ct_broker_helpers:teardown_steps()).
5361

54-
init_per_group(_, Config) ->
62+
init_per_group(network_connection, Config) ->
5563
rabbit_ct_helpers:set_config(
5664
Config,
5765
[{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)},
5866
{shovel_source_idx, 1},
5967
{shovel_dest_uri, shovel_test_utils:make_uri(Config, 0)},
6068
{shovel_dest_idx, 0}
69+
]);
70+
init_per_group(direct_connection, Config) ->
71+
rabbit_ct_helpers:set_config(
72+
Config,
73+
[{shovel_source_uri, shovel_test_utils:make_uri(Config, 1)},
74+
{shovel_source_idx, 1},
75+
{shovel_dest_uri, <<"amqp://">>},
76+
{shovel_dest_idx, 0}
6177
]).
6278

6379
end_per_group(_, Config) ->

0 commit comments

Comments
 (0)