Skip to content

Commit bc1fa49

Browse files
authored
Merge pull request #36 from JavaSaBr/feature-broker-26
Additional difference in CONN packet between 3.1.1 and 5.0
2 parents a98509b + b79e1a5 commit bc1fa49

File tree

11 files changed

+313
-58
lines changed

11 files changed

+313
-58
lines changed

src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
3434
client.setClientId(StringUtils.EMPTY);
3535

3636
if (StringUtils.isEmpty(clientId)) {
37-
log.warn("This client {} is already released", client);
37+
log.warn("This client {} is already released or rejected", client);
3838
return Mono.empty();
3939
}
4040

src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@
77
import com.ss.mqtt.broker.exception.ConnectionRejectException;
88
import com.ss.mqtt.broker.exception.MalformedPacketMqttException;
99
import com.ss.mqtt.broker.model.MqttSession;
10+
import com.ss.mqtt.broker.model.MqttVersion;
1011
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
1112
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
1213
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
13-
import com.ss.mqtt.broker.service.*;
14+
import com.ss.mqtt.broker.service.AuthenticationService;
15+
import com.ss.mqtt.broker.service.ClientIdRegistry;
16+
import com.ss.mqtt.broker.service.MqttSessionService;
17+
import com.ss.mqtt.broker.service.SubscriptionService;
1418
import com.ss.rlib.common.util.StringUtils;
1519
import lombok.RequiredArgsConstructor;
1620
import lombok.extern.log4j.Log4j2;
@@ -53,6 +57,16 @@ protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPa
5357
return clientIdRegistry.register(requestedClientId)
5458
.map(ifTrue(requestedClientId, client::setClientId));
5559
} else {
60+
61+
var mqttVersion = client
62+
.getConnection()
63+
.getMqttVersion();
64+
65+
// we can't assign generated client if for mqtt version less than 5
66+
if (mqttVersion.ordinal() < MqttVersion.MQTT_5.ordinal()) {
67+
return Mono.just(false);
68+
}
69+
5670
return clientIdRegistry.generate()
5771
.flatMap(newClientId -> clientIdRegistry.register(newClientId)
5872
.map(ifTrue(newClientId, client::setClientId)));

src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.ss.mqtt.broker.network.packet.in;
22

33
import com.ss.mqtt.broker.exception.ConnectionRejectException;
4-
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
54
import com.ss.mqtt.broker.model.MqttPropertyConstants;
65
import com.ss.mqtt.broker.model.MqttVersion;
76
import com.ss.mqtt.broker.model.PacketProperty;
7+
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
88
import com.ss.mqtt.broker.network.MqttConnection;
99
import com.ss.mqtt.broker.network.packet.PacketType;
1010
import com.ss.mqtt.broker.util.DebugUtils;
@@ -281,6 +281,12 @@ protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull B
281281

282282
hasUserName = NumberUtils.isSetBit(flags, 7);
283283
hasPassword = NumberUtils.isSetBit(flags, 6);
284+
285+
// for mqtt < 5 we cannot have password without user
286+
if (mqttVersion.ordinal() < MqttVersion.MQTT_5.ordinal() && !hasUserName && hasPassword) {
287+
throw new ConnectionRejectException(ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD);
288+
}
289+
284290
willFlag = NumberUtils.isSetBit(flags, 2);
285291
keepAlive = readUnsignedShort(buffer);
286292
}

src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929

3030
public abstract class MqttReadablePacket extends AbstractReadablePacket<MqttConnection> {
3131

32+
static {
33+
DebugUtils.registerIncludedFields("userProperties");
34+
}
35+
3236
@Getter
3337
@RequiredArgsConstructor
3438
private static class Utf8Decoder {

src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy

Lines changed: 148 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package com.ss.mqtt.broker.test.integration
22

33
import com.hivemq.client.mqtt.datatypes.MqttQos
4+
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient
5+
import com.hivemq.client.mqtt.mqtt3.message.Mqtt3MessageType
6+
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish
7+
import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAckReturnCode
48
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
59
import com.hivemq.client.mqtt.mqtt5.message.Mqtt5MessageType
610
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator
@@ -11,53 +15,119 @@ import java.util.concurrent.atomic.AtomicReference
1115

1216
class ConnectSubscribePublishTest extends IntegrationSpecification {
1317

14-
def "publisher should publish message QoS 0"() {
18+
def "publisher should publish message QoS 0 using mqtt 3.1.1"() {
1519
given:
16-
def received = new AtomicReference<Mqtt5Publish>()
17-
def subscriber = buildClient()
20+
def received = new AtomicReference<Mqtt3Publish>()
21+
def subscriber = buildMqtt311Client()
1822
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
19-
def publisher = buildClient()
23+
def publisher = buildMqtt311Client()
2024
when:
2125
subscriber.connect().join()
2226
publisher.connect().join()
27+
28+
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
29+
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
30+
31+
Thread.sleep(100)
32+
then:
33+
noExceptionThrown()
34+
35+
subscribeResult != null
36+
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_0)
37+
subscribeResult.type == Mqtt3MessageType.SUBACK
38+
39+
publishResult != null
40+
publishResult.qos == MqttQos.AT_MOST_ONCE
41+
publishResult.type == Mqtt3MessageType.PUBLISH
42+
43+
received.get() != null
44+
received.get().qos == MqttQos.AT_MOST_ONCE
45+
received.get().type == Mqtt3MessageType.PUBLISH
46+
cleanup:
47+
subscriber.disconnect().join()
48+
publisher.disconnect().join()
49+
}
2350

51+
def "publisher should publish message QoS 0 using mqtt 5"() {
52+
given:
53+
def received = new AtomicReference<Mqtt5Publish>()
54+
def subscriber = buildMqtt5Client()
55+
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
56+
def publisher = buildMqtt5Client()
57+
when:
58+
subscriber.connect().join()
59+
publisher.connect().join()
60+
2461
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
2562
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
26-
63+
2764
Thread.sleep(100)
2865
then:
2966
noExceptionThrown()
3067

3168
subscribeResult != null
3269
subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_0)
3370
subscribeResult.type == Mqtt5MessageType.SUBACK
34-
71+
3572
publishResult != null
3673
publishResult.publish.qos == MqttQos.AT_MOST_ONCE
3774
publishResult.publish.type == Mqtt5MessageType.PUBLISH
38-
75+
3976
received.get() != null
4077
received.get().qos == MqttQos.AT_MOST_ONCE
4178
received.get().type == Mqtt5MessageType.PUBLISH
4279
cleanup:
43-
subscriber.disconnect()
44-
publisher.disconnect()
80+
subscriber.disconnect().join()
81+
publisher.disconnect().join()
4582
}
4683

47-
def "publisher should publish message QoS 1"() {
84+
def "publisher should publish message QoS 1 using mqtt 3.1.1"() {
4885
given:
49-
def received = new AtomicReference<Mqtt5Publish>()
50-
def subscriber = buildClient()
86+
def received = new AtomicReference<Mqtt3Publish>()
87+
def subscriber = buildMqtt311Client()
5188
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
52-
def publisher = buildClient()
89+
def publisher = buildMqtt311Client()
5390
when:
54-
5591
subscriber.connect().join()
5692
publisher.connect().join()
93+
94+
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
95+
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)
96+
97+
Thread.sleep(100)
98+
then:
99+
noExceptionThrown()
100+
101+
subscribeResult != null
102+
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_1)
103+
subscribeResult.type == Mqtt3MessageType.SUBACK
104+
105+
publishResult != null
106+
publishResult.qos == MqttQos.AT_LEAST_ONCE
107+
publishResult.type == Mqtt3MessageType.PUBLISH
108+
109+
received.get() != null
110+
received.get().qos == MqttQos.AT_LEAST_ONCE
111+
received.get().type == Mqtt3MessageType.PUBLISH
112+
cleanup:
113+
subscriber.disconnect().join()
114+
publisher.disconnect().join()
115+
}
57116

117+
def "publisher should publish message QoS 1 using mqtt 5"() {
118+
given:
119+
def received = new AtomicReference<Mqtt5Publish>()
120+
def subscriber = buildMqtt5Client()
121+
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
122+
def publisher = buildMqtt5Client()
123+
when:
124+
125+
subscriber.connect().join()
126+
publisher.connect().join()
127+
58128
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
59129
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)
60-
130+
61131
Thread.sleep(100)
62132
then:
63133
noExceptionThrown()
@@ -69,7 +139,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
69139
publishResult != null
70140
publishResult.publish.qos == MqttQos.AT_LEAST_ONCE
71141
publishResult.publish.type == Mqtt5MessageType.PUBLISH
72-
142+
73143
received.get() != null
74144
received.get().qos == MqttQos.AT_LEAST_ONCE
75145
received.get().type == Mqtt5MessageType.PUBLISH
@@ -78,17 +148,50 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
78148
publisher.disconnect().join()
79149
}
80150

81-
def "publisher should publish message QoS 2"() {
151+
def "publisher should publish message QoS 2 using mqtt 3.1.1"() {
152+
given:
153+
def received = new AtomicReference<Mqtt3Publish>()
154+
def subscriber = buildMqtt311Client()
155+
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
156+
def publisher = buildMqtt311Client()
157+
when:
158+
subscriber.connect().join()
159+
publisher.connect().join()
160+
161+
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
162+
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)
163+
164+
Thread.sleep(100)
165+
then:
166+
noExceptionThrown()
167+
168+
subscribeResult != null
169+
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_2)
170+
subscribeResult.type == Mqtt3MessageType.SUBACK
171+
172+
publishResult != null
173+
publishResult.qos == MqttQos.EXACTLY_ONCE
174+
publishResult.type == Mqtt3MessageType.PUBLISH
175+
176+
received.get() != null
177+
received.get().qos == MqttQos.EXACTLY_ONCE
178+
received.get().type == Mqtt3MessageType.PUBLISH
179+
cleanup:
180+
subscriber.disconnect().join()
181+
publisher.disconnect().join()
182+
}
183+
184+
def "publisher should publish message QoS 2 using mqtt 5"() {
82185
given:
83186
def received = new AtomicReference<Mqtt5Publish>()
84-
def subscriber = buildClient()
187+
def subscriber = buildMqtt5Client()
85188
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
86-
def publisher = buildClient()
189+
def publisher = buildMqtt5Client()
87190
when:
88191

89192
subscriber.connect().join()
90193
publisher.connect().join()
91-
194+
92195
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
93196
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)
94197

@@ -108,8 +211,8 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
108211
received.get().qos == MqttQos.EXACTLY_ONCE
109212
received.get().type == Mqtt5MessageType.PUBLISH
110213
cleanup:
111-
subscriber.disconnect()
112-
publisher.disconnect()
214+
subscriber.disconnect().join()
215+
publisher.disconnect().join()
113216
}
114217

115218
def publish(Mqtt5AsyncClient publisher, String subscriberId, MqttQos qos) {
@@ -135,4 +238,27 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
135238
.send()
136239
.join()
137240
}
241+
242+
def publish(Mqtt3AsyncClient publisher, String subscriberId, MqttQos qos) {
243+
return publisher.publishWith()
244+
.topic("test/$subscriberId")
245+
.qos(qos)
246+
.payload(publishPayload)
247+
.send()
248+
.join()
249+
}
250+
251+
def subscribe(
252+
Mqtt3AsyncClient subscriber,
253+
String subscriberId,
254+
MqttQos qos,
255+
AtomicReference<Mqtt3Publish> received
256+
) {
257+
return subscriber.subscribeWith()
258+
.topicFilter("test/$subscriberId")
259+
.qos(qos)
260+
.callback({ publish -> received.set(publish) })
261+
.send()
262+
.join()
263+
}
138264
}

0 commit comments

Comments
 (0)