Skip to content

Commit da3918c

Browse files
committed
[broker-26] fix connect options for connect in packet with mqtt 3.1.1 version
1 parent fd8d24f commit da3918c

File tree

11 files changed

+174
-42
lines changed

11 files changed

+174
-42
lines changed

src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
3434
client.setClientId(StringUtils.EMPTY);
3535

3636
if (StringUtils.isEmpty(clientId)) {
37-
log.warn("This client {} is already released", client);
37+
log.warn("This client {} is already released or rejected", client);
3838
return Mono.empty();
3939
}
4040

src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@
77
import com.ss.mqtt.broker.exception.ConnectionRejectException;
88
import com.ss.mqtt.broker.exception.MalformedPacketMqttException;
99
import com.ss.mqtt.broker.model.MqttSession;
10+
import com.ss.mqtt.broker.model.MqttVersion;
1011
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
1112
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
1213
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
13-
import com.ss.mqtt.broker.service.*;
14+
import com.ss.mqtt.broker.service.AuthenticationService;
15+
import com.ss.mqtt.broker.service.ClientIdRegistry;
16+
import com.ss.mqtt.broker.service.MqttSessionService;
17+
import com.ss.mqtt.broker.service.SubscriptionService;
1418
import com.ss.rlib.common.util.StringUtils;
1519
import lombok.RequiredArgsConstructor;
1620
import lombok.extern.log4j.Log4j2;
@@ -53,6 +57,16 @@ protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPa
5357
return clientIdRegistry.register(requestedClientId)
5458
.map(ifTrue(requestedClientId, client::setClientId));
5559
} else {
60+
61+
var mqttVersion = client
62+
.getConnection()
63+
.getMqttVersion();
64+
65+
// we can't assign generated client if for mqtt version less than 5
66+
if (mqttVersion.ordinal() < MqttVersion.MQTT_5.ordinal()) {
67+
return Mono.just(false);
68+
}
69+
5670
return clientIdRegistry.generate()
5771
.flatMap(newClientId -> clientIdRegistry.register(newClientId)
5872
.map(ifTrue(newClientId, client::setClientId)));

src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.ss.mqtt.broker.network.packet.in;
22

33
import com.ss.mqtt.broker.exception.ConnectionRejectException;
4-
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
54
import com.ss.mqtt.broker.model.MqttPropertyConstants;
65
import com.ss.mqtt.broker.model.MqttVersion;
76
import com.ss.mqtt.broker.model.PacketProperty;
7+
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
88
import com.ss.mqtt.broker.network.MqttConnection;
99
import com.ss.mqtt.broker.network.packet.PacketType;
1010
import com.ss.mqtt.broker.util.DebugUtils;
@@ -281,6 +281,15 @@ protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull B
281281

282282
hasUserName = NumberUtils.isSetBit(flags, 7);
283283
hasPassword = NumberUtils.isSetBit(flags, 6);
284+
285+
if (mqttVersion.ordinal() < MqttVersion.MQTT_5.ordinal()) {
286+
287+
// for mqtt < 5 we cannot have password without user
288+
if (!hasUserName && hasPassword) {
289+
throw new ConnectionRejectException(ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD);
290+
}
291+
}
292+
284293
willFlag = NumberUtils.isSetBit(flags, 2);
285294
keepAlive = readUnsignedShort(buffer);
286295
}

src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929

3030
public abstract class MqttReadablePacket extends AbstractReadablePacket<MqttConnection> {
3131

32+
static {
33+
DebugUtils.registerIncludedFields("userProperties");
34+
}
35+
3236
@Getter
3337
@RequiredArgsConstructor
3438
private static class Utf8Decoder {

src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
1414
def "publisher should publish message QoS 0"() {
1515
given:
1616
def received = new AtomicReference<Mqtt5Publish>()
17-
def subscriber = buildClient()
17+
def subscriber = buildMqtt5Client()
1818
def subscriberId = subscriber.getConfig().clientIdentifier.get()toString()
19-
def publisher = buildClient()
19+
def publisher = buildMqtt5Client()
2020
when:
2121
subscriber.connect().join()
2222
publisher.connect().join()
@@ -47,9 +47,9 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
4747
def "publisher should publish message QoS 1"() {
4848
given:
4949
def received = new AtomicReference<Mqtt5Publish>()
50-
def subscriber = buildClient()
50+
def subscriber = buildMqtt5Client()
5151
def subscriberId = subscriber.getConfig().clientIdentifier.get()toString()
52-
def publisher = buildClient()
52+
def publisher = buildMqtt5Client()
5353
when:
5454

5555
subscriber.connect().join()
@@ -81,9 +81,9 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
8181
def "publisher should publish message QoS 2"() {
8282
given:
8383
def received = new AtomicReference<Mqtt5Publish>()
84-
def subscriber = buildClient()
84+
def subscriber = buildMqtt5Client()
8585
def subscriberId = subscriber.getConfig().clientIdentifier.get()toString()
86-
def publisher = buildClient()
86+
def publisher = buildMqtt5Client()
8787
when:
8888

8989
subscriber.connect().join()

src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy

Lines changed: 102 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,36 @@
11
package com.ss.mqtt.broker.test.integration
22

3-
import com.hivemq.client.mqtt.MqttClient
3+
import com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3ConnAckException
4+
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAckReturnCode
45
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5ConnAckException
56
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode
67
import com.ss.mqtt.broker.model.MqttPropertyConstants
7-
import org.springframework.beans.factory.annotation.Autowired
8+
import com.ss.mqtt.broker.model.QoS
9+
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode
10+
import com.ss.mqtt.broker.network.packet.in.ConnectAckInPacket
11+
import com.ss.mqtt.broker.network.packet.out.Connect311OutPacket
12+
import com.ss.rlib.common.util.ArrayUtils
813

14+
import java.nio.charset.StandardCharsets
915
import java.util.concurrent.CompletionException
1016

1117
class ConnectionTest extends IntegrationSpecification {
1218

13-
@Autowired
14-
InetSocketAddress deviceNetworkAddress
19+
def "client should connect to broker without user and pass using mqtt 3.1.1"() {
20+
given:
21+
def client = buildMqtt311Client()
22+
when:
23+
def result = client.connect().join()
24+
then:
25+
result.returnCode == Mqtt3ConnAckReturnCode.SUCCESS
26+
!result.sessionPresent
27+
cleanup:
28+
client.disconnect().join()
29+
}
1530

16-
def "subscriber should connect to broker without user and pass"() {
31+
def "client should connect to broker without user and pass using mqtt 5"() {
1732
given:
18-
def client = buildClient()
33+
def client = buildMqtt5Client()
1934
when:
2035
def result = client.connect().join()
2136
then:
@@ -32,9 +47,21 @@ class ConnectionTest extends IntegrationSpecification {
3247
client.disconnect().join()
3348
}
3449

35-
def "subscriber should connect to broker with user and pass"() {
50+
def "client should connect to broker with user and pass using mqtt 3.1.1"() {
3651
given:
37-
def client = buildClient('1')
52+
def client = buildMqtt311Client()
53+
when:
54+
def result = connectWith(client, 'user1', 'password')
55+
then:
56+
result.returnCode == Mqtt3ConnAckReturnCode.SUCCESS
57+
!result.sessionPresent
58+
cleanup:
59+
client.disconnect().join()
60+
}
61+
62+
def "client should connect to broker with user and pass using mqtt 5"() {
63+
given:
64+
def client = buildMqtt5Client()
3865
when:
3966
def result = connectWith(client, 'user1', 'password')
4067
then:
@@ -51,9 +78,20 @@ class ConnectionTest extends IntegrationSpecification {
5178
client.disconnect().join()
5279
}
5380

54-
def "subscriber should connect to broker without providing a client id"() {
81+
def "client should not connect to broker without providing a client id using mqtt 3.1.1"() {
82+
given:
83+
def client = buildMqtt311Client("")
84+
when:
85+
client.connect().join()
86+
then:
87+
def ex = thrown CompletionException
88+
def cause = ex.cause as Mqtt3ConnAckException
89+
cause.mqttMessage.returnCode == Mqtt3ConnAckReturnCode.IDENTIFIER_REJECTED
90+
}
91+
92+
def "client should connect to broker without providing a client id using mqtt 5"() {
5593
given:
56-
def client = buildClient("")
94+
def client = buildMqtt5Client("")
5795
when:
5896
def result = client.connect().join()
5997
then:
@@ -64,15 +102,22 @@ class ConnectionTest extends IntegrationSpecification {
64102
client.disconnect().join()
65103
}
66104

67-
def "subscriber should not connect to broker with invalid client id"(String clientId) {
105+
def "client should not connect to broker with invalid client id using mqtt 3.1.1"(String clientId) {
106+
given:
107+
def client = buildMqtt311Client(clientId)
108+
when:
109+
client.connect().join()
110+
then:
111+
def ex = thrown CompletionException
112+
def cause = ex.cause as Mqtt3ConnAckException
113+
cause.mqttMessage.returnCode == Mqtt3ConnAckReturnCode.IDENTIFIER_REJECTED
114+
where:
115+
clientId << ["!@#!@*()^&"]
116+
}
117+
118+
def "client should not connect to broker with invalid client id using mqtt 5"(String clientId) {
68119
given:
69-
def client = MqttClient.builder()
70-
.identifier(clientId)
71-
.serverHost(deviceNetworkAddress.getHostName())
72-
.serverPort(deviceNetworkAddress.getPort())
73-
.useMqttVersion5()
74-
.build()
75-
.toAsync()
120+
def client = buildMqtt5Client(clientId)
76121
when:
77122
client.connect().join()
78123
then:
@@ -83,11 +128,47 @@ class ConnectionTest extends IntegrationSpecification {
83128
clientId << ["!@#!@*()^&"]
84129
}
85130

86-
def "subscriber should not connect to broker with wrong pass"() {
131+
def "client should not connect to broker with wrong pass using mqtt 3.1.1"() {
132+
given:
133+
def client = buildMqtt311Client()
134+
when:
135+
connectWith(client, "user", "wrongPassword")
136+
then:
137+
def ex = thrown CompletionException
138+
def cause = ex.cause as Mqtt3ConnAckException
139+
cause.mqttMessage.returnCode == Mqtt3ConnAckReturnCode.BAD_USER_NAME_OR_PASSWORD
140+
}
141+
142+
def "client should not connect to broker without username and with pass using mqtt 3.1.1"() {
143+
given:
144+
def client = buildMqtt311MockClient()
145+
def clientId = generateClientId()
146+
when:
147+
148+
client.connect()
149+
client.send(new Connect311OutPacket(
150+
"",
151+
"",
152+
clientId,
153+
"wrongPassword".getBytes(StandardCharsets.UTF_8),
154+
ArrayUtils.EMPTY_BYTE_ARRAY,
155+
QoS.AT_MOST_ONCE,
156+
keepAlive,
157+
false,
158+
false
159+
))
160+
161+
def connectAck = client.readNext() as ConnectAckInPacket
162+
163+
then:
164+
connectAck.reasonCode == ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD
165+
}
166+
167+
def "client should not connect to broker with wrong pass using mqtt 5"() {
87168
given:
88-
def client = buildClient()
169+
def client = buildMqtt5Client()
89170
when:
90-
connectWith(client, 'user', 'wrongPassword')
171+
connectWith(client, "user", "wrongPassword")
91172
then:
92173
def ex = thrown CompletionException
93174
def cause = ex.cause as Mqtt5ConnAckException

src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker.test.integration
22

33
import com.hivemq.client.mqtt.MqttClient
4+
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient
45
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
56
import com.ss.mqtt.broker.config.MqttConnectionConfig
67
import com.ss.mqtt.broker.model.MqttPropertyConstants
@@ -9,7 +10,6 @@ import com.ss.mqtt.broker.network.MqttConnection
910
import com.ss.mqtt.broker.test.integration.config.MqttBrokerTestConfig
1011
import com.ss.mqtt.broker.test.mock.MqttMockClient
1112
import org.springframework.beans.factory.annotation.Autowired
12-
import org.springframework.context.annotation.Bean
1313
import org.springframework.test.context.ContextConfiguration
1414
import spock.lang.Specification
1515

@@ -32,12 +32,26 @@ class IntegrationSpecification extends Specification {
3232

3333
@Autowired
3434
MqttConnectionConfig deviceConnectionConfig
35-
36-
def buildClient() {
37-
return buildClient(generateClientId())
35+
36+
def buildMqtt311Client() {
37+
return buildMqtt311Client(generateClientId())
38+
}
39+
40+
def buildMqtt5Client() {
41+
return buildMqtt5Client(generateClientId())
3842
}
3943

40-
def buildClient(String clientId) {
44+
def buildMqtt311Client(String clientId) {
45+
return MqttClient.builder()
46+
.identifier(clientId)
47+
.serverHost(deviceNetworkAddress.getHostName())
48+
.serverPort(deviceNetworkAddress.getPort())
49+
.useMqttVersion3()
50+
.build()
51+
.toAsync()
52+
}
53+
54+
def buildMqtt5Client(String clientId) {
4155
return MqttClient.builder()
4256
.identifier(clientId)
4357
.serverHost(deviceNetworkAddress.getHostName())
@@ -55,6 +69,16 @@ class IntegrationSpecification extends Specification {
5569
return prefix + "_" + idGenerator.incrementAndGet()
5670
}
5771

72+
def connectWith(Mqtt3AsyncClient client, String user, String pass) {
73+
return client.connectWith()
74+
.simpleAuth()
75+
.username(user)
76+
.password(pass.getBytes(encoding))
77+
.applySimpleAuth()
78+
.send()
79+
.join()
80+
}
81+
5882
def connectWith(Mqtt5AsyncClient client, String user, String pass) {
5983
return client.connectWith()
6084
.simpleAuth()

src/test/groovy/com/ss/mqtt/broker/test/integration/PublishRetryTest.groovy

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class PublishRetryTest extends IntegrationSpecification {
3030

3131
def "mqtt 3.1.1 client should be generate session with one pending QoS 1 packet"() {
3232
given:
33-
def publisher = buildClient()
33+
def publisher = buildMqtt5Client()
3434
def subscriber = buildMqtt311MockClient()
3535
def subscriberId = generateClientId()
3636
when:
@@ -91,7 +91,7 @@ class PublishRetryTest extends IntegrationSpecification {
9191

9292
def "mqtt 5 client should be generate session with one pending QoS 1 packet"() {
9393
given:
94-
def publisher = buildClient()
94+
def publisher = buildMqtt5Client()
9595
def subscriber = buildMqtt5MockClient()
9696
def subscriberId = generateClientId()
9797
when:
@@ -152,7 +152,7 @@ class PublishRetryTest extends IntegrationSpecification {
152152

153153
def "mqtt 3.1.1 client should be generate session with one pending QoS 2 packet"() {
154154
given:
155-
def publisher = buildClient()
155+
def publisher = buildMqtt5Client()
156156
def subscriber = buildMqtt311MockClient()
157157
def subscriberId = generateClientId()
158158
when:
@@ -234,7 +234,7 @@ class PublishRetryTest extends IntegrationSpecification {
234234

235235
def "mqtt 5 client should be generate session with one pending QoS 2 packet"() {
236236
given:
237-
def publisher = buildClient()
237+
def publisher = buildMqtt5Client()
238238
def subscriber = buildMqtt5MockClient()
239239
def subscriberId = generateClientId()
240240
when:

src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class ClientIdRegistryTest extends IntegrationSpecification {
109109
def "subscriber should register its client id on connect and unregister on disconnect"() {
110110
given:
111111
def clientId = clientIdRegistry.generate().block()
112-
def client = buildClient(clientId)
112+
def client = buildMqtt5Client(clientId)
113113
when:
114114
def result = client.connect().join()
115115
then:

0 commit comments

Comments
 (0)