Skip to content

Commit ea6ef17

Browse files
committed
Mqtt: test close connection
1 parent 494c1b8 commit ea6ef17

File tree

3 files changed

+46
-5
lines changed

3 files changed

+46
-5
lines changed

deps/rabbit/src/rabbit_connection_tracking.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,10 @@ close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
427427
%% Do an RPC call to the node running the direct client.
428428
Node = node(Pid),
429429
rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]);
430+
close_connection(#tracked_connection{pid = Pid,
431+
protocol = {'Web MQTT', _}}, Message) ->
432+
% this will work for connections to web mqtt plugin
433+
Pid ! {shutdown, Message};
430434
close_connection(#tracked_connection{pid = Pid}, Message) ->
431435
% best effort, this will work for connections to the stream plugin
432436
Node = node(Pid),

deps/rabbit/src/rabbit_networking.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -531,9 +531,8 @@ close_connections(Pids, Explanation) ->
531531

532532
-spec close_all_user_connections(rabbit_types:username(), string()) -> 'ok'.
533533
close_all_user_connections(Username, Explanation) ->
534-
Pids = [Pid || #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list_of_user(Username)],
535-
[close_connection(Pid, Explanation) || Pid <- Pids],
536-
ok.
534+
Tracked = rabbit_connection_tracking:list_of_user(Username),
535+
rabbit_connection_tracking:close_connections(Tracked, Explanation, 0).
537536

538537
%% Meant to be used by tests only
539538
-spec close_all_connections(string()) -> 'ok'.

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ cluster_size_1_tests() ->
129129
,retained_message_conversion
130130
,bind_exchange_to_exchange
131131
,bind_exchange_to_exchange_single_message
132+
,cli_close_all_connections
133+
,cli_close_all_user_connections
132134
].
133135

134136
cluster_size_3_tests() ->
@@ -141,6 +143,8 @@ cluster_size_3_tests() ->
141143
rabbit_mqtt_qos0_queue,
142144
rabbit_mqtt_qos0_queue_kill_node,
143145
cli_list_queues,
146+
cli_close_all_connections,
147+
cli_close_all_user_connections,
144148
delete_create_queue,
145149
session_reconnect,
146150
session_takeover,
@@ -207,7 +211,9 @@ end_per_group(_, Config) ->
207211

208212
init_per_testcase(T, Config)
209213
when T =:= management_plugin_connection;
210-
T =:= management_plugin_enable ->
214+
T =:= management_plugin_enable;
215+
T =:= cli_close_all_user_connections;
216+
T =:= cli_close_all_connections ->
211217
inets:start(),
212218
init_per_testcase0(T, Config);
213219
init_per_testcase(Testcase, Config) ->
@@ -220,7 +226,9 @@ init_per_testcase0(Testcase, Config) ->
220226

221227
end_per_testcase(T, Config)
222228
when T =:= management_plugin_connection;
223-
T =:= management_plugin_enable ->
229+
T =:= management_plugin_enable;
230+
T =:= cli_close_all_user_connections;
231+
T =:= cli_close_all_connections ->
224232
ok = inets:stop(),
225233
end_per_testcase0(T, Config);
226234
end_per_testcase(Testcase, Config) ->
@@ -1208,6 +1216,36 @@ management_plugin_enable(Config) ->
12081216

12091217
ok = emqtt:disconnect(C).
12101218

1219+
cli_close_all_connections(Config) ->
1220+
KeepaliveSecs = 99,
1221+
ClientId = atom_to_binary(?FUNCTION_NAME),
1222+
1223+
_ = connect(ClientId, Config, [{keepalive, KeepaliveSecs}]),
1224+
eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10),
1225+
1226+
process_flag(trap_exit, true),
1227+
{ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["close_all_connections", "bye"]),
1228+
?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])),
1229+
1230+
process_flag(trap_exit, false),
1231+
eventually(?_assertEqual([], http_get(Config, "/connections")),
1232+
1000, 10).
1233+
1234+
cli_close_all_user_connections(Config) ->
1235+
KeepaliveSecs = 99,
1236+
ClientId = atom_to_binary(?FUNCTION_NAME),
1237+
1238+
_ = connect(ClientId, Config, [{keepalive, KeepaliveSecs}]),
1239+
eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10),
1240+
1241+
process_flag(trap_exit, true),
1242+
{ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["close_all_user_connections","guest", "bye"]),
1243+
?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])),
1244+
1245+
process_flag(trap_exit, false),
1246+
eventually(?_assertEqual([], http_get(Config, "/connections")),
1247+
1000, 10).
1248+
12111249
%% Test that queues of type rabbit_mqtt_qos0_queue can be listed via rabbitmqctl.
12121250
cli_list_queues(Config) ->
12131251
C = connect(?FUNCTION_NAME, Config),

0 commit comments

Comments
 (0)