Skip to content

Commit d1a25a1

Browse files
committed
[broker-22] fixed some issues
1 parent 32676de commit d1a25a1

File tree

5 files changed

+85
-72
lines changed

5 files changed

+85
-72
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
package com.ss.mqtt.broker.exception;
22

3+
import lombok.NoArgsConstructor;
4+
5+
@NoArgsConstructor
36
public class MalformedPacketMqttException extends MqttException {}

src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos1PublishInHandler.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,19 @@ protected void handleResult(
3737
@NotNull ActionResult result
3838
) {
3939

40-
var reasonCode = switch (result) {
41-
case EMPTY -> PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS;
42-
case SUCCESS -> PublishAckReasonCode.SUCCESS;
43-
default -> PublishAckReasonCode.UNSPECIFIED_ERROR;
44-
};
40+
PublishAckReasonCode reasonCode;
41+
42+
switch (result) {
43+
case EMPTY:
44+
reasonCode = PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS;
45+
break;
46+
case SUCCESS:
47+
reasonCode = PublishAckReasonCode.SUCCESS;
48+
break;
49+
default:
50+
reasonCode = PublishAckReasonCode.UNSPECIFIED_ERROR;
51+
break;
52+
}
4553

4654
client.send(client.getPacketOutFactory().newPublishAck(
4755
client,

src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos2PublishInHandler.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,29 @@ protected void handleResult(
4949
@NotNull ActionResult result
5050
) {
5151

52-
var reasonCode = switch (result) {
53-
case EMPTY -> PublishReceivedReasonCode.NO_MATCHING_SUBSCRIBERS;
54-
case SUCCESS -> PublishReceivedReasonCode.SUCCESS;
55-
default -> PublishReceivedReasonCode.UNSPECIFIED_ERROR;
56-
};
52+
// because it was checked
53+
final MqttSession session = client.getSession();
54+
55+
// it means this client was already closed
56+
if (session == null) {
57+
return;
58+
}
59+
60+
PublishReceivedReasonCode reasonCode;
61+
62+
switch (result) {
63+
case EMPTY:
64+
reasonCode = PublishReceivedReasonCode.NO_MATCHING_SUBSCRIBERS;
65+
break;
66+
case SUCCESS:
67+
reasonCode = PublishReceivedReasonCode.SUCCESS;
68+
break;
69+
default:
70+
reasonCode = PublishReceivedReasonCode.UNSPECIFIED_ERROR;
71+
break;
72+
}
73+
74+
session.registerInPublish(packet, this, packet.getPacketId());
5775

5876
client.send(client.getPacketOutFactory().newPublishReceived(
5977
client,

src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.ss.mqtt.broker.network.packet.HasPacketId;
99
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
1010
import com.ss.rlib.common.function.NotNullTripleConsumer;
11-
import com.ss.rlib.common.util.ClassUtils;
1211
import com.ss.rlib.common.util.NumberUtils;
1312
import com.ss.rlib.common.util.array.Array;
1413
import com.ss.rlib.common.util.array.ConcurrentArray;
@@ -139,18 +138,17 @@ public int nextPacketId() {
139138
}
140139

141140
@Override
142-
public void registerOutPublish(@NotNull PublishInPacket publish,
143-
@NotNull PendingPacketHandler handler,
144-
int packetId
141+
public void registerOutPublish(
142+
@NotNull PublishInPacket publish,
143+
@NotNull PendingPacketHandler handler, int packetId
145144
) {
146145
registerPublish(publish, handler, packetId, pendingOutPublishes);
147146
}
148147

149148
@Override
150149
public void registerInPublish(
151150
@NotNull PublishInPacket publish,
152-
@NotNull PendingPacketHandler handler,
153-
int packetId
151+
@NotNull PendingPacketHandler handler, int packetId
154152
) {
155153
registerPublish(publish, handler, packetId, pendingInPublishes);
156154
}
Lines changed: 42 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.ss.mqtt.broker.test.integration
22

3-
43
import com.hivemq.client.mqtt.datatypes.MqttQos
4+
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
55
import com.hivemq.client.mqtt.mqtt5.message.Mqtt5MessageType
66
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator
77
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
@@ -15,27 +15,16 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
1515
given:
1616
def received = new AtomicReference<Mqtt5Publish>()
1717
def subscriber = buildClient()
18+
def subscriberId = subscriber.getConfig().clientIdentifier.toString()
1819
def publisher = buildClient()
1920
when:
2021
subscriber.connect().join()
2122
publisher.connect().join()
22-
23-
def subscribeResult = subscriber.subscribeWith()
24-
.topicFilter(topicFilter)
25-
.qos(MqttQos.AT_MOST_ONCE)
26-
.callback({ publish -> received.set(publish) })
27-
.send()
28-
.join()
29-
30-
def publishResult = publisher.publishWith()
31-
.topic(topicFilter)
32-
.qos(MqttQos.AT_MOST_ONCE)
33-
.payload(publishPayload)
34-
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
35-
.send()
36-
.join()
3723

38-
Thread.sleep(500)
24+
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
25+
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
26+
27+
Thread.sleep(100)
3928
then:
4029
noExceptionThrown()
4130

@@ -46,12 +35,10 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
4635
publishResult != null
4736
publishResult.publish.qos == MqttQos.AT_MOST_ONCE
4837
publishResult.publish.type == Mqtt5MessageType.PUBLISH
49-
publishResult.publish.topic.levels.join("/") == topicFilter
5038

5139
received.get() != null
5240
received.get().qos == MqttQos.AT_MOST_ONCE
5341
received.get().type == Mqtt5MessageType.PUBLISH
54-
received.get().topic.levels.join("/") == topicFilter
5542
cleanup:
5643
subscriber.disconnect()
5744
publisher.disconnect()
@@ -61,28 +48,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
6148
given:
6249
def received = new AtomicReference<Mqtt5Publish>()
6350
def subscriber = buildClient()
51+
def subscriberId = subscriber.getConfig().clientIdentifier.toString()
6452
def publisher = buildClient()
6553
when:
6654

6755
subscriber.connect().join()
6856
publisher.connect().join()
69-
70-
def subscribeResult = subscriber.subscribeWith()
71-
.topicFilter(topicFilter)
72-
.qos(MqttQos.AT_LEAST_ONCE)
73-
.callback({ publish -> received.set(publish) })
74-
.send()
75-
.join()
76-
77-
def publishResult = publisher.publishWith()
78-
.topic(topicFilter)
79-
.qos(MqttQos.AT_LEAST_ONCE)
80-
.payload(publishPayload)
81-
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
82-
.send()
83-
.join()
84-
85-
Thread.sleep(500)
57+
58+
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
59+
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)
60+
61+
Thread.sleep(100)
8662
then:
8763
noExceptionThrown()
8864

@@ -93,12 +69,10 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
9369
publishResult != null
9470
publishResult.publish.qos == MqttQos.AT_LEAST_ONCE
9571
publishResult.publish.type == Mqtt5MessageType.PUBLISH
96-
publishResult.publish.topic.levels.join("/") == topicFilter
9772

9873
received.get() != null
9974
received.get().qos == MqttQos.AT_LEAST_ONCE
10075
received.get().type == Mqtt5MessageType.PUBLISH
101-
received.get().topic.levels.join("/") == topicFilter
10276
cleanup:
10377
subscriber.disconnect().join()
10478
publisher.disconnect().join()
@@ -108,28 +82,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
10882
given:
10983
def received = new AtomicReference<Mqtt5Publish>()
11084
def subscriber = buildClient()
85+
def subscriberId = subscriber.getConfig().clientIdentifier.toString()
11186
def publisher = buildClient()
11287
when:
11388

11489
subscriber.connect().join()
11590
publisher.connect().join()
91+
92+
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
93+
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)
11694

117-
def subscribeResult = subscriber.subscribeWith()
118-
.topicFilter(topicFilter)
119-
.qos(MqttQos.EXACTLY_ONCE)
120-
.callback({ publish -> received.set(publish) })
121-
.send()
122-
.join()
123-
124-
def publishResult = publisher.publishWith()
125-
.topic(topicFilter)
126-
.qos(MqttQos.EXACTLY_ONCE)
127-
.payload(publishPayload)
128-
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
129-
.send()
130-
.join()
131-
132-
Thread.sleep(500)
95+
Thread.sleep(100)
13396
then:
13497
noExceptionThrown()
13598

@@ -140,14 +103,37 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
140103
publishResult != null
141104
publishResult.publish.qos == MqttQos.EXACTLY_ONCE
142105
publishResult.publish.type == Mqtt5MessageType.PUBLISH
143-
publishResult.publish.topic.levels.join("/") == topicFilter
144106

145107
received.get() != null
146108
received.get().qos == MqttQos.EXACTLY_ONCE
147109
received.get().type == Mqtt5MessageType.PUBLISH
148-
received.get().topic.levels.join("/") == topicFilter
149110
cleanup:
150111
subscriber.disconnect()
151112
publisher.disconnect()
152113
}
114+
115+
def publish(Mqtt5AsyncClient publisher, String subscriberId, MqttQos qos) {
116+
return publisher.publishWith()
117+
.topic("test/$subscriberId")
118+
.qos(qos)
119+
.payload(publishPayload)
120+
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
121+
.send()
122+
.join()
123+
}
124+
125+
def subscribe(
126+
Mqtt5AsyncClient subscriber,
127+
String subscriberId,
128+
MqttQos qos,
129+
AtomicReference<Mqtt5Publish> received
130+
) {
131+
return subscriber.subscribeWith()
132+
.topicFilter("test/$subscriberId")
133+
.qos(qos)
134+
.callback({ publish -> received.set(publish) })
135+
.send()
136+
.join()
137+
}
138+
153139
}

0 commit comments

Comments
 (0)