diff --git a/deps/rabbit/src/rabbit_connection_tracking.erl b/deps/rabbit/src/rabbit_connection_tracking.erl index da906fa41144..207bcd9fc570 100644 --- a/deps/rabbit/src/rabbit_connection_tracking.erl +++ b/deps/rabbit/src/rabbit_connection_tracking.erl @@ -428,6 +428,6 @@ close_connection(#tracked_connection{pid = Pid, type = direct}, Message) -> Node = node(Pid), rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]); close_connection(#tracked_connection{pid = Pid}, Message) -> - % best effort, this will work for connections to the stream plugin - Node = node(Pid), - rpc:call(Node, gen_server, call, [Pid, {shutdown, Message}, infinity]). + %% Best effort will work for following plugins: + %% rabbitmq_stream, rabbitmq_mqtt, rabbitmq_web_mqtt + Pid ! {shutdown, Message}. diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index 502fd273d850..a81798e0031d 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -542,9 +542,8 @@ close_connections(Pids, Explanation) -> -spec close_all_user_connections(rabbit_types:username(), string()) -> 'ok'. close_all_user_connections(Username, Explanation) -> - Pids = [Pid || #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list_of_user(Username)], - [close_connection(Pid, Explanation) || Pid <- Pids], - ok. + Tracked = rabbit_connection_tracking:list_of_user(Username), + rabbit_connection_tracking:close_connections(Tracked, Explanation, 0). %% Meant to be used by tests only -spec close_all_connections(string()) -> 'ok'. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 82862b829bc4..3156301b665c 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -264,7 +264,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> {noreply, State, ?HIBERNATE_AFTER}; handle_info({shutdown, Explanation} = Reason, State = #state{conn_name = ConnName}) -> - %% rabbitmq_management plugin requests to close connection. + %% rabbitmq_management plugin or CLI command requests to close connection. ?LOG_INFO("MQTT closing connection ~tp: ~p", [ConnName, Explanation]), {stop, Reason, State}; diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index e7338a2c7c2b..d2ef3defe678 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -98,6 +98,8 @@ cluster_size_1_tests() -> ,block_only_publisher ,many_qos1_messages ,session_expiry + ,cli_close_all_connections + ,cli_close_all_user_connections ,management_plugin_connection ,management_plugin_enable ,disconnect @@ -232,8 +234,19 @@ end_per_group(_, Config) -> init_per_testcase(T, Config) when T =:= management_plugin_connection; +<<<<<<< HEAD +<<<<<<< HEAD T =:= management_plugin_enable -> ok = inets:start(), +======= + T =:= management_plugin_enable; + T =:= cli_close_all_user_connections; + T =:= cli_close_all_connections -> +======= + T =:= management_plugin_enable -> +>>>>>>> c2998ca17a (Simplify test cases) + inets:start(), +>>>>>>> bf54a61ed8 (Mqtt: test close connection) init_per_testcase0(T, Config); init_per_testcase(Testcase, Config) -> init_per_testcase0(Testcase, Config). @@ -1280,6 +1293,24 @@ rabbit_mqtt_qos0_queue_kill_node(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, 1), ?assertEqual([], rpc(Config, rabbit_db_binding, get_all, [])). +cli_close_all_connections(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + C = connect(ClientId, Config), + process_flag(trap_exit, true), + {ok, String} = rabbit_ct_broker_helpers:rabbitmqctl( + Config, 0, ["close_all_connections", "bye"]), + ?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])), + ok = await_exit(C). + +cli_close_all_user_connections(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + C = connect(ClientId, Config), + process_flag(trap_exit, true), + {ok, String} = rabbit_ct_broker_helpers:rabbitmqctl( + Config, 0, ["close_all_user_connections","guest", "bye"]), + ?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])), + ok = await_exit(C). + %% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin. management_plugin_connection(Config) -> KeepaliveSecs = 99,