Skip to content

Commit afd7e7e

Browse files
committed
Fix MQTT and Stream connection crash
This commit fixes #11985 and is a manual backport due to conflicts in #12144
1 parent e9032c8 commit afd7e7e

File tree

4 files changed

+26
-7
lines changed

4 files changed

+26
-7
lines changed

deps/rabbit/src/rabbit_connection_tracking.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,6 @@ close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
428428
Node = node(Pid),
429429
rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]);
430430
close_connection(#tracked_connection{pid = Pid}, Message) ->
431-
% best effort, this will work for connections to the stream plugin
432-
Node = node(Pid),
433-
rpc:call(Node, gen_server, call, [Pid, {shutdown, Message}, infinity]).
431+
%% Best effort will work for following plugins:
432+
%% rabbitmq_stream, rabbitmq_mqtt, rabbitmq_web_mqtt
433+
Pid ! {shutdown, Message}.

deps/rabbit/src/rabbit_networking.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -542,9 +542,8 @@ close_connections(Pids, Explanation) ->
542542

543543
-spec close_all_user_connections(rabbit_types:username(), string()) -> 'ok'.
544544
close_all_user_connections(Username, Explanation) ->
545-
Pids = [Pid || #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list_of_user(Username)],
546-
[close_connection(Pid, Explanation) || Pid <- Pids],
547-
ok.
545+
Tracked = rabbit_connection_tracking:list_of_user(Username),
546+
rabbit_connection_tracking:close_connections(Tracked, Explanation, 0).
548547

549548
%% Meant to be used by tests only
550549
-spec close_all_connections(string()) -> 'ok'.

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
264264
{noreply, State, ?HIBERNATE_AFTER};
265265

266266
handle_info({shutdown, Explanation} = Reason, State = #state{conn_name = ConnName}) ->
267-
%% rabbitmq_management plugin requests to close connection.
267+
%% rabbitmq_management plugin or CLI command requests to close connection.
268268
?LOG_INFO("MQTT closing connection ~tp: ~p", [ConnName, Explanation]),
269269
{stop, Reason, State};
270270

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ cluster_size_1_tests() ->
9898
,block_only_publisher
9999
,many_qos1_messages
100100
,session_expiry
101+
,cli_close_all_connections
102+
,cli_close_all_user_connections
101103
,management_plugin_connection
102104
,management_plugin_enable
103105
,disconnect
@@ -1280,6 +1282,24 @@ rabbit_mqtt_qos0_queue_kill_node(Config) ->
12801282
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
12811283
?assertEqual([], rpc(Config, rabbit_db_binding, get_all, [])).
12821284

1285+
cli_close_all_connections(Config) ->
1286+
ClientId = atom_to_binary(?FUNCTION_NAME),
1287+
C = connect(ClientId, Config),
1288+
process_flag(trap_exit, true),
1289+
{ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(
1290+
Config, 0, ["close_all_connections", "bye"]),
1291+
?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])),
1292+
ok = await_exit(C).
1293+
1294+
cli_close_all_user_connections(Config) ->
1295+
ClientId = atom_to_binary(?FUNCTION_NAME),
1296+
C = connect(ClientId, Config),
1297+
process_flag(trap_exit, true),
1298+
{ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(
1299+
Config, 0, ["close_all_user_connections","guest", "bye"]),
1300+
?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])),
1301+
ok = await_exit(C).
1302+
12831303
%% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin.
12841304
management_plugin_connection(Config) ->
12851305
KeepaliveSecs = 99,

0 commit comments

Comments
 (0)