Skip to content

Commit 1962076

Browse files
Merge pull request #9271 from rabbitmq/mqtt-send-failed
Close MQTT connection if sending fails
2 parents 9f922f7 + 8b4a26b commit 1962076

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -336,13 +336,14 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
336336
connect_packet_unprocessed ->
337337
Send = fun(Data) ->
338338
try rabbit_net:port_command(Socket, Data)
339-
catch error:Error ->
339+
catch error:Reason ->
340340
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
341-
[Socket, Error])
341+
[Socket, Reason]),
342+
exit({send_failed, Reason})
342343
end,
343344
ok
344345
end,
345-
case rabbit_mqtt_processor:init(Packet, Socket, ConnName, Send) of
346+
try rabbit_mqtt_processor:init(Packet, Socket, ConnName, Send) of
346347
{ok, ProcState1} ->
347348
?LOG_INFO("Accepted MQTT connection ~ts for client ID ~ts",
348349
[ConnName, rabbit_mqtt_processor:info(client_id, ProcState1)]),
@@ -358,9 +359,11 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
358359
?LOG_ERROR("Rejected MQTT connection ~ts with Connect Reason Code ~p",
359360
[ConnName, ConnectReasonCode]),
360361
{stop, shutdown, {_SendWill = false, State}}
362+
catch exit:{send_failed, Reason} ->
363+
network_error(Reason, State)
361364
end;
362365
_ ->
363-
case rabbit_mqtt_processor:process_packet(Packet, ProcState) of
366+
try rabbit_mqtt_processor:process_packet(Packet, ProcState) of
364367
{ok, ProcState1} ->
365368
process_received_bytes(
366369
Rest,
@@ -377,6 +380,8 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
377380
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
378381
{stop, {disconnect, {client_initiated, SendWill}}, ProcState1} ->
379382
{stop, normal, {SendWill, pstate(State, ProcState1)}}
383+
catch exit:{send_failed, Reason} ->
384+
network_error(Reason, State)
380385
end
381386
end;
382387
{error, {disconnect_reason_code, ReasonCode} = Reason} ->

deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,7 @@ public void sessionRedelivery(TestInfo info) throws MqttException, InterruptedEx
520520
// Message has been delivered but connection has failed.
521521
waitAtMost(() -> receivedMessagesSize() == 1);
522522
assertThat(firstMessage().getPayload()).isEqualTo(payload);
523+
Thread.sleep(100);
523524
assertThat(client.isConnected()).isFalse();
524525

525526
receivedMessages.clear();

0 commit comments

Comments
 (0)