Skip to content

Commit 5c0ff84

Browse files
committed
Fix MQTT message error handling and improve connection responses (#1784)
1 parent 42f3eb5 commit 5c0ff84

File tree

4 files changed

+69
-9
lines changed

4 files changed

+69
-9
lines changed

mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTCommonInboundHandler.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,18 @@
1919
import io.netty.channel.ChannelHandler.Sharable;
2020
import io.netty.channel.ChannelHandlerContext;
2121
import io.netty.channel.ChannelInboundHandlerAdapter;
22+
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
2223
import io.netty.handler.codec.mqtt.MqttMessage;
2324
import io.netty.handler.codec.mqtt.MqttMessageType;
25+
import io.netty.handler.codec.mqtt.MqttVersion;
2426
import io.netty.handler.timeout.IdleState;
2527
import io.netty.handler.timeout.IdleStateEvent;
2628
import io.netty.util.ReferenceCountUtil;
2729
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage;
30+
import io.streamnative.pulsar.handlers.mqtt.common.messages.ack.MqttAck;
31+
import io.streamnative.pulsar.handlers.mqtt.common.messages.ack.MqttConnectAck;
32+
import io.streamnative.pulsar.handlers.mqtt.common.messages.ack.MqttDisconnectAck;
33+
import io.streamnative.pulsar.handlers.mqtt.common.messages.codes.mqtt5.Mqtt5DisConnReasonCode;
2834
import io.streamnative.pulsar.handlers.mqtt.common.utils.NettyUtils;
2935
import java.util.concurrent.ConcurrentHashMap;
3036
import lombok.extern.slf4j.Slf4j;
@@ -106,6 +112,50 @@ public void channelRead(ChannelHandlerContext ctx, Object message) {
106112
default:
107113
throw new UnsupportedOperationException("Unknown MessageType: " + messageType);
108114
}
115+
} catch (IllegalStateException ex) {
116+
ReferenceCountUtil.safeRelease(mqttMessage);
117+
MqttMessageType mqttMessageType = mqttMessage.fixedHeader().messageType();
118+
log.warn("Invalid MQTT message state: {}, mqttMessageType:{}", ex.getMessage(), mqttMessageType);
119+
120+
int protocolVersion = MqttVersion.MQTT_3_1.protocolLevel();
121+
try {
122+
Connection existingConnection = NettyUtils.getConnection(ctx.channel());
123+
if (existingConnection != null) {
124+
protocolVersion = existingConnection.getProtocolVersion();
125+
}
126+
} catch (Exception e) {
127+
}
128+
129+
if (mqttMessageType == MqttMessageType.CONNECT) {
130+
MqttConnectVariableHeader connectVariableHeader =
131+
(MqttConnectVariableHeader) mqttMessage.variableHeader();
132+
protocolVersion = connectVariableHeader.version();
133+
134+
// For CONNECT message errors, send a CONNACK error response.
135+
MqttMessage errorResponse = MqttConnectAck.errorBuilder().protocolError(protocolVersion);
136+
MqttAdapterMessage errorAdapterMsg = new MqttAdapterMessage(
137+
adapterMsg.getClientId(),
138+
errorResponse,
139+
adapterMsg.fromProxy()
140+
);
141+
ctx.writeAndFlush(errorAdapterMsg);
142+
} else {
143+
// For other message errors, send a DISCONNECT error response if supported.
144+
MqttAck errorAck = MqttDisconnectAck.errorBuilder(protocolVersion)
145+
.reasonCode(Mqtt5DisConnReasonCode.MALFORMED_PACKET)
146+
.reasonString("Invalid message format: " + ex.getMessage())
147+
.build();
148+
149+
if (errorAck.isProtocolSupported()) {
150+
MqttAdapterMessage errorAdapterMsg = new MqttAdapterMessage(
151+
adapterMsg.getClientId(),
152+
errorAck.getMqttMessage(),
153+
adapterMsg.fromProxy()
154+
);
155+
ctx.writeAndFlush(errorAdapterMsg);
156+
}
157+
}
158+
ctx.close();
109159
} catch (Throwable ex) {
110160
ReferenceCountUtil.safeRelease(mqttMessage);
111161
log.error("Exception was caught while processing MQTT message, ", ex);

tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MessageConverTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.pulsar.broker.service.Topic;
2828
import org.apache.pulsar.client.impl.MessageImpl;
2929
import org.apache.pulsar.common.naming.TopicDomain;
30-
import org.junit.Assert;
30+
import org.testng.Assert;
3131
import org.testng.annotations.BeforeClass;
3232
import org.testng.annotations.Test;
3333

tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import io.streamnative.pulsar.handlers.mqtt.common.utils.PulsarTopicUtils;
2727
import io.streamnative.pulsar.handlers.mqtt.mqtt3.fusesource.psk.PSKClient;
2828
import java.io.BufferedReader;
29-
import java.io.EOFException;
3029
import java.io.InputStream;
3130
import java.io.InputStreamReader;
3231
import java.nio.charset.StandardCharsets;
@@ -50,6 +49,7 @@
5049
import org.awaitility.Awaitility;
5150
import org.fusesource.mqtt.client.BlockingConnection;
5251
import org.fusesource.mqtt.client.MQTT;
52+
import org.fusesource.mqtt.client.MQTTException;
5353
import org.fusesource.mqtt.client.Message;
5454
import org.fusesource.mqtt.client.QoS;
5555
import org.fusesource.mqtt.client.Topic;
@@ -374,14 +374,19 @@ public void testSubscribeWithTopicFilter() throws Exception {
374374
connection2.disconnect();
375375
}
376376

377-
@Test(expectedExceptions = {EOFException.class, IllegalStateException.class})
377+
@Test(expectedExceptions = {MQTTException.class},
378+
expectedExceptionsMessageRegExp = ".*CONNECTION_REFUSED_SERVER_UNAVAILABLE.*")
378379
public void testInvalidClientId() throws Exception {
379380
MQTT mqtt = createMQTTClient();
380-
mqtt.setConnectAttemptsMax(1);
381381
// ClientId is invalid, for max length is 23 in mqtt 3.1
382382
mqtt.setClientId(UUID.randomUUID().toString().replace("-", ""));
383383
BlockingConnection connection = Mockito.spy(mqtt.blockingConnection());
384-
connection.connect();
384+
try {
385+
connection.connect();
386+
} catch (Exception ex) {
387+
log.info("Expected exception: {}", ex.getMessage());
388+
throw ex; // rethrow to verify the exception
389+
}
385390
verify(connection, Mockito.times(2)).connect();
386391
}
387392

tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import io.streamnative.pulsar.handlers.mqtt.common.TopicFilterImpl;
3030
import io.streamnative.pulsar.handlers.mqtt.mqtt3.fusesource.psk.PSKClient;
3131
import java.io.BufferedReader;
32-
import java.io.EOFException;
3332
import java.io.InputStream;
3433
import java.io.InputStreamReader;
3534
import java.net.URI;
@@ -62,6 +61,7 @@
6261
import org.awaitility.Awaitility;
6362
import org.fusesource.mqtt.client.BlockingConnection;
6463
import org.fusesource.mqtt.client.MQTT;
64+
import org.fusesource.mqtt.client.MQTTException;
6565
import org.fusesource.mqtt.client.Message;
6666
import org.fusesource.mqtt.client.QoS;
6767
import org.fusesource.mqtt.client.Topic;
@@ -167,14 +167,19 @@ public void testSendAndConsume(String topicName) throws Exception {
167167
connection.disconnect();
168168
}
169169

170-
@Test(expectedExceptions = {EOFException.class, IllegalStateException.class}, priority = 3)
170+
@Test(expectedExceptions = {MQTTException.class},
171+
expectedExceptionsMessageRegExp = ".*CONNECTION_REFUSED_SERVER_UNAVAILABLE.*")
171172
public void testInvalidClientId() throws Exception {
172173
MQTT mqtt = createMQTTProxyClient();
173-
mqtt.setConnectAttemptsMax(1);
174174
// ClientId is invalid, for max length is 23 in mqtt 3.1
175175
mqtt.setClientId(UUID.randomUUID().toString().replace("-", ""));
176176
BlockingConnection connection = Mockito.spy(mqtt.blockingConnection());
177-
connection.connect();
177+
try {
178+
connection.connect();
179+
} catch (Exception ex) {
180+
log.info("Expected exception: {}", ex.getMessage());
181+
throw ex; // rethrow to verify the exception
182+
}
178183
verify(connection, Mockito.times(2)).connect();
179184
}
180185

0 commit comments

Comments
 (0)