Skip to content

Commit 8c905b9

Browse files
committed
Avoid crash in stream connection
1. Prior to this commit, closing a stream connection via: ``` ./sbin/rabbitmqctl close_all_user_connections guest enough ``` crashed the stream process as follows: ``` 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> crasher: 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> initial call: rabbit_stream_reader:init/1 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> pid: <0.1098.0> 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> registered_name: [] 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> exception error: no function clause matching 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> rabbit_stream_reader:open({call, 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> {<0.1233.0>, 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> #Ref<0.519694519.1387790337.15898>}}, 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> {shutdown,<<"enough">>}, ``` This commit fixes this crash. 2. Both CLI commands and management plugin use the same way to close MQTT, Web MQTT, and Stream connections: They all send a message via `Pid ! {shutdown, Reason}` to the connection. 3. This commit avoids making `rabbit` core app to know about 'Web MQTT'. 4 This commit simplifies rabbit_mqtt_reader by avoiding another handle_call clause
1 parent 69d407e commit 8c905b9

File tree

2 files changed

+4
-13
lines changed

2 files changed

+4
-13
lines changed

deps/rabbit/src/rabbit_connection_tracking.erl

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -427,11 +427,7 @@ close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
427427
%% Do an RPC call to the node running the direct client.
428428
Node = node(Pid),
429429
rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]);
430-
close_connection(#tracked_connection{pid = Pid,
431-
protocol = {'Web MQTT', _}}, Message) ->
432-
% this will work for connections to web mqtt plugin
433-
Pid ! {shutdown, Message};
434430
close_connection(#tracked_connection{pid = Pid}, Message) ->
435-
% best effort, this will work for connections to the stream plugin
436-
Node = node(Pid),
437-
rpc:call(Node, gen_server, call, [Pid, {shutdown, Message}, infinity]).
431+
%% Best effort will work for following plugins:
432+
%% rabbitmq_stream, rabbitmq_mqtt, rabbitmq_web_mqtt
433+
Pid ! {shutdown, Message}.

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,6 @@ init(Ref) ->
109109
handle_call({info, InfoItems}, _From, State) ->
110110
{reply, infos(InfoItems, State), State, ?HIBERNATE_AFTER};
111111

112-
handle_call({shutdown, Explanation} = Reason, _From, State = #state{conn_name = ConnName}) ->
113-
%% rabbit_networking:close_all_user_connections -> rabbit_reader:shutdow
114-
?LOG_INFO("MQTT closing connection ~tp: ~p", [ConnName, Explanation]),
115-
{stop, Reason, ok, State};
116-
117112
handle_call(Msg, From, State) ->
118113
{stop, {mqtt_unexpected_call, Msg, From}, State}.
119114

@@ -252,7 +247,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
252247
{noreply, State, ?HIBERNATE_AFTER};
253248

254249
handle_info({shutdown, Explanation} = Reason, State = #state{conn_name = ConnName}) ->
255-
%% rabbitmq_management plugin requests to close connection.
250+
%% rabbitmq_management plugin or CLI command requests to close connection.
256251
?LOG_INFO("MQTT closing connection ~tp: ~p", [ConnName, Explanation]),
257252
{stop, Reason, State};
258253

0 commit comments

Comments
 (0)