Skip to content

Commit d2056cb

Browse files
committed
[broker-25] finish implementing re-try publish mechanism
1 parent 3840681 commit d2056cb

File tree

8 files changed

+202
-7
lines changed

8 files changed

+202
-7
lines changed

src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ private void updatePendingPacket(
5050
var packetId = response.getPacketId();
5151
var pendingPublish = pendingPublishes.findAnyConvertedToIntInReadLock(
5252
packetId,
53-
PendingPublish::getPublish,
54-
PublishInPacket::getPacketId,
53+
PendingPublish::getPacketId,
5554
NumberUtils::equals
5655
);
5756

src/main/java/com/ss/mqtt/broker/network/packet/out/PublishComplete5OutPacket.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.ss.mqtt.broker.model.PacketProperty;
44
import com.ss.mqtt.broker.model.reason.code.PublishCompletedReasonCode;
55
import com.ss.mqtt.broker.model.data.type.StringPair;
6+
import com.ss.rlib.common.util.StringUtils;
67
import com.ss.rlib.common.util.array.Array;
78
import org.jetbrains.annotations.NotNull;
89

@@ -41,6 +42,10 @@ public class PublishComplete5OutPacket extends PublishComplete311OutPacket {
4142
private final @NotNull PublishCompletedReasonCode reasonCode;
4243
private final @NotNull String reason;
4344

45+
public PublishComplete5OutPacket(int packetId, @NotNull PublishCompletedReasonCode reasonCode) {
46+
this(packetId, reasonCode, Array.empty(), StringUtils.EMPTY);
47+
}
48+
4449
public PublishComplete5OutPacket(
4550
int packetId,
4651
@NotNull PublishCompletedReasonCode reasonCode,

src/main/java/com/ss/mqtt/broker/network/packet/out/PublishReceived5OutPacket.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.ss.mqtt.broker.model.PacketProperty;
44
import com.ss.mqtt.broker.model.reason.code.PublishReceivedReasonCode;
55
import com.ss.mqtt.broker.model.data.type.StringPair;
6+
import com.ss.rlib.common.util.StringUtils;
67
import com.ss.rlib.common.util.array.Array;
78
import org.jetbrains.annotations.NotNull;
89

@@ -41,6 +42,10 @@ public class PublishReceived5OutPacket extends PublishReceived311OutPacket {
4142
private final @NotNull PublishReceivedReasonCode reasonCode;
4243
private final @NotNull String reason;
4344

45+
public PublishReceived5OutPacket(int packetId, @NotNull PublishReceivedReasonCode reasonCode) {
46+
this(packetId, reasonCode, Array.empty(), StringUtils.EMPTY);
47+
}
48+
4449
public PublishReceived5OutPacket(
4550
int packetId,
4651
@NotNull PublishReceivedReasonCode reasonCode,

src/main/java/com/ss/mqtt/broker/network/packet/out/PublishRelease311OutPacket.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ protected byte getPacketType() {
2121
return PACKET_TYPE;
2222
}
2323

24+
@Override
25+
protected byte getPacketFlags() {
26+
return 2;
27+
}
28+
2429
@Override
2530
public int getExpectedLength() {
2631
return 2;

src/main/java/com/ss/mqtt/broker/util/DebugUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ public static void registerIncludedFields(@NotNull String... fieldNames) {
7474
.getInstance(Option.RETAIN_CLASS_REFERENCE)
7575
.getCallerClass();
7676

77-
// FIXME use new method from 9.9.0
78-
var allFields = ReflectionUtils.getAllFields(callerClass, Object.class, true);
77+
var allFields = ReflectionUtils.getAllDeclaredFields(callerClass);
7978

8079
for (var fieldName : fieldNames) {
8180
if (!allFields.anyMatchConverted(fieldName, Field::getName, String::equals)) {

src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
1515
given:
1616
def received = new AtomicReference<Mqtt5Publish>()
1717
def subscriber = buildClient()
18-
def subscriberId = subscriber.getConfig().clientIdentifier.toString()
18+
def subscriberId = subscriber.getConfig().clientIdentifier.get()toString()
1919
def publisher = buildClient()
2020
when:
2121
subscriber.connect().join()
@@ -48,7 +48,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
4848
given:
4949
def received = new AtomicReference<Mqtt5Publish>()
5050
def subscriber = buildClient()
51-
def subscriberId = subscriber.getConfig().clientIdentifier.toString()
51+
def subscriberId = subscriber.getConfig().clientIdentifier.get()toString()
5252
def publisher = buildClient()
5353
when:
5454

@@ -82,7 +82,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
8282
given:
8383
def received = new AtomicReference<Mqtt5Publish>()
8484
def subscriber = buildClient()
85-
def subscriberId = subscriber.getConfig().clientIdentifier.toString()
85+
def subscriberId = subscriber.getConfig().clientIdentifier.get()toString()
8686
def publisher = buildClient()
8787
when:
8888

src/test/groovy/com/ss/mqtt/broker/test/integration/PublishRetryTest.groovy

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,19 @@ import com.hivemq.client.mqtt.datatypes.MqttQos
44
import com.ss.mqtt.broker.model.QoS
55
import com.ss.mqtt.broker.model.SubscribeTopicFilter
66
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode
7+
import com.ss.mqtt.broker.model.reason.code.PublishCompletedReasonCode
8+
import com.ss.mqtt.broker.model.reason.code.PublishReceivedReasonCode
79
import com.ss.mqtt.broker.model.reason.code.SubscribeAckReasonCode
810
import com.ss.mqtt.broker.network.packet.in.ConnectAckInPacket
911
import com.ss.mqtt.broker.network.packet.in.PublishInPacket
12+
import com.ss.mqtt.broker.network.packet.in.PublishReleaseInPacket
1013
import com.ss.mqtt.broker.network.packet.in.SubscribeAckInPacket
1114
import com.ss.mqtt.broker.network.packet.out.Connect311OutPacket
1215
import com.ss.mqtt.broker.network.packet.out.Connect5OutPacket
16+
import com.ss.mqtt.broker.network.packet.out.PublishComplete311OutPacket
17+
import com.ss.mqtt.broker.network.packet.out.PublishComplete5OutPacket
18+
import com.ss.mqtt.broker.network.packet.out.PublishReceived311OutPacket
19+
import com.ss.mqtt.broker.network.packet.out.PublishReceived5OutPacket
1320
import com.ss.mqtt.broker.network.packet.out.Subscribe311OutPacket
1421
import com.ss.mqtt.broker.network.packet.out.Subscribe5OutPacket
1522
import com.ss.mqtt.broker.service.MqttSessionService
@@ -142,4 +149,175 @@ class PublishRetryTest extends IntegrationSpecification {
142149
subscriber.close()
143150
publisher.disconnect().join()
144151
}
152+
153+
def "mqtt 3.1.1 client should be generate session with one pending QoS 2 packet"() {
154+
given:
155+
def publisher = buildClient()
156+
def subscriber = buildMqtt311MockClient()
157+
def subscriberId = generateClientId()
158+
when:
159+
160+
publisher.connect().join()
161+
162+
subscriber.connect()
163+
subscriber.send(new Connect311OutPacket(subscriberId, keepAlive))
164+
165+
def connectAck = subscriber.readNext() as ConnectAckInPacket
166+
167+
then:
168+
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
169+
when:
170+
171+
subscriber.send(new Subscribe311OutPacket(
172+
Array.of(new SubscribeTopicFilter("test/retry/$subscriberId", QoS.EXACTLY_ONCE)),
173+
1
174+
))
175+
176+
def subscribeAck = subscriber.readNext() as SubscribeAckInPacket
177+
178+
then:
179+
subscribeAck.reasonCodes.stream()
180+
.allMatch({ it == SubscribeAckReasonCode.GRANTED_QOS_2 })
181+
when:
182+
183+
publisher.publishWith()
184+
.topic("test/retry/$subscriberId")
185+
.qos(MqttQos.AT_MOST_ONCE)
186+
.payload(publishPayload)
187+
.send()
188+
.join()
189+
190+
def receivedPublish = subscriber.readNext() as PublishInPacket
191+
192+
then:
193+
receivedPublish.payload == publishPayload
194+
when:
195+
196+
subscriber.close()
197+
198+
subscriber.connect()
199+
subscriber.send(new Connect311OutPacket(subscriberId, keepAlive))
200+
201+
connectAck = subscriber.readNext() as ConnectAckInPacket
202+
def receivedDupPublish = subscriber.readNext() as PublishInPacket
203+
204+
then:
205+
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
206+
receivedDupPublish.duplicate
207+
receivedDupPublish.packetId == receivedPublish.packetId
208+
receivedDupPublish.payload == publishPayload
209+
when:
210+
211+
subscriber.close()
212+
213+
subscriber.connect()
214+
subscriber.send(new Connect311OutPacket(subscriberId, keepAlive))
215+
216+
connectAck = subscriber.readNext() as ConnectAckInPacket
217+
receivedDupPublish = subscriber.readNext() as PublishInPacket
218+
219+
subscriber.send(new PublishReceived311OutPacket(receivedDupPublish.getPacketId()))
220+
def releaseAck = subscriber.readNext() as PublishReleaseInPacket
221+
222+
subscriber.send(new PublishComplete311OutPacket(receivedDupPublish.getPacketId()))
223+
224+
then:
225+
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
226+
receivedDupPublish.duplicate
227+
receivedDupPublish.packetId == receivedPublish.packetId
228+
receivedDupPublish.payload == publishPayload
229+
releaseAck.packetId == receivedPublish.packetId
230+
cleanup:
231+
subscriber.close()
232+
publisher.disconnect().join()
233+
}
234+
235+
def "mqtt 5 client should be generate session with one pending QoS 2 packet"() {
236+
given:
237+
def publisher = buildClient()
238+
def subscriber = buildMqtt5MockClient()
239+
def subscriberId = generateClientId()
240+
when:
241+
242+
publisher.connect().join()
243+
244+
subscriber.connect()
245+
subscriber.send(new Connect5OutPacket(subscriberId, keepAlive))
246+
247+
def connectAck = subscriber.readNext() as ConnectAckInPacket
248+
249+
then:
250+
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
251+
when:
252+
253+
subscriber.send(new Subscribe5OutPacket(
254+
Array.of(new SubscribeTopicFilter("test/retry/$subscriberId", QoS.EXACTLY_ONCE)),
255+
1
256+
))
257+
258+
def subscribeAck = subscriber.readNext() as SubscribeAckInPacket
259+
260+
then:
261+
subscribeAck.reasonCodes.stream()
262+
.allMatch({ it == SubscribeAckReasonCode.GRANTED_QOS_2 })
263+
when:
264+
265+
publisher.publishWith()
266+
.topic("test/retry/$subscriberId")
267+
.qos(MqttQos.AT_MOST_ONCE)
268+
.payload(publishPayload)
269+
.send()
270+
.join()
271+
272+
def receivedPublish = subscriber.readNext() as PublishInPacket
273+
274+
then:
275+
receivedPublish.payload == publishPayload
276+
when:
277+
278+
subscriber.close()
279+
280+
subscriber.connect()
281+
subscriber.send(new Connect5OutPacket(subscriberId, keepAlive))
282+
283+
connectAck = subscriber.readNext() as ConnectAckInPacket
284+
def receivedDupPublish = subscriber.readNext() as PublishInPacket
285+
286+
then:
287+
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
288+
receivedDupPublish.duplicate
289+
receivedDupPublish.packetId == receivedPublish.packetId
290+
receivedDupPublish.payload == publishPayload
291+
when:
292+
293+
subscriber.close()
294+
295+
subscriber.connect()
296+
subscriber.send(new Connect5OutPacket(subscriberId, keepAlive))
297+
298+
connectAck = subscriber.readNext() as ConnectAckInPacket
299+
receivedDupPublish = subscriber.readNext() as PublishInPacket
300+
301+
subscriber.send(new PublishReceived5OutPacket(
302+
receivedDupPublish.getPacketId(),
303+
PublishReceivedReasonCode.SUCCESS
304+
))
305+
306+
def releaseAck = subscriber.readNext() as PublishReleaseInPacket
307+
308+
subscriber.send(new PublishComplete5OutPacket(
309+
receivedDupPublish.getPacketId(),
310+
PublishCompletedReasonCode.SUCCESS
311+
))
312+
313+
then:
314+
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
315+
receivedDupPublish.duplicate
316+
receivedDupPublish.packetId == receivedPublish.packetId
317+
receivedDupPublish.payload == publishPayload
318+
releaseAck.packetId == receivedPublish.packetId
319+
cleanup:
320+
subscriber.close()
321+
publisher.disconnect().join()
322+
}
145323
}

src/test/groovy/com/ss/mqtt/broker/test/mock/MqttMockClient.groovy

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import com.ss.mqtt.broker.network.packet.in.ConnectAckInPacket
66
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket
77
import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket
88
import com.ss.mqtt.broker.network.packet.in.PublishInPacket
9+
import com.ss.mqtt.broker.network.packet.in.PublishReleaseInPacket
910
import com.ss.mqtt.broker.network.packet.in.SubscribeAckInPacket
1011
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket
1112
import com.ss.mqtt.broker.util.MqttDataUtils
@@ -94,6 +95,9 @@ class MqttMockClient {
9495
case PacketType.PUBLISH:
9596
packet = new PublishInPacket(info)
9697
break
98+
case PacketType.PUBLISH_RELEASED:
99+
packet = new PublishReleaseInPacket(info)
100+
break
97101
default:
98102
throw new IllegalStateException("Unknown packet of type: $type")
99103
}

0 commit comments

Comments
 (0)