11package com.ss.mqtt.broker.test.integration
22
33import com.hivemq.client.mqtt.datatypes.MqttQos
4+ import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient
5+ import com.hivemq.client.mqtt.mqtt3.message.Mqtt3MessageType
6+ import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish
7+ import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAckReturnCode
48import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
59import com.hivemq.client.mqtt.mqtt5.message.Mqtt5MessageType
610import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator
@@ -11,7 +15,40 @@ import java.util.concurrent.atomic.AtomicReference
1115
1216class ConnectSubscribePublishTest extends IntegrationSpecification {
1317
14- def " publisher should publish message QoS 0" () {
18+ def " publisher should publish message QoS 0 using mqtt 3.1.1" () {
19+ given :
20+ def received = new AtomicReference<Mqtt3Publish > ()
21+ def subscriber = buildMqtt311Client()
22+ def subscriberId = subscriber. getConfig(). clientIdentifier. get()toString()
23+ def publisher = buildMqtt311Client()
24+ when :
25+ subscriber. connect(). join()
26+ publisher. connect(). join()
27+
28+ def subscribeResult = subscribe(subscriber, subscriberId, MqttQos . AT_MOST_ONCE , received)
29+ def publishResult = publish(publisher, subscriberId, MqttQos . AT_MOST_ONCE )
30+
31+ Thread . sleep(100 )
32+ then :
33+ noExceptionThrown()
34+
35+ subscribeResult != null
36+ subscribeResult. returnCodes. contains(Mqtt3SubAckReturnCode . SUCCESS_MAXIMUM_QOS_0 )
37+ subscribeResult. type == Mqtt3MessageType . SUBACK
38+
39+ publishResult != null
40+ publishResult. qos == MqttQos . AT_MOST_ONCE
41+ publishResult. type == Mqtt3MessageType . PUBLISH
42+
43+ received. get() != null
44+ received. get(). qos == MqttQos . AT_MOST_ONCE
45+ received. get(). type == Mqtt3MessageType . PUBLISH
46+ cleanup :
47+ subscriber. disconnect()
48+ publisher. disconnect()
49+ }
50+
51+ def " publisher should publish message QoS 0 using mqtt 5" () {
1552 given :
1653 def received = new AtomicReference<Mqtt5Publish > ()
1754 def subscriber = buildMqtt5Client()
@@ -44,7 +81,40 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
4481 publisher. disconnect()
4582 }
4683
47- def " publisher should publish message QoS 1" () {
84+ def " publisher should publish message QoS 1 using mqtt 3.1.1" () {
85+ given :
86+ def received = new AtomicReference<Mqtt3Publish > ()
87+ def subscriber = buildMqtt311Client()
88+ def subscriberId = subscriber. getConfig(). clientIdentifier. get()toString()
89+ def publisher = buildMqtt311Client()
90+ when :
91+ subscriber. connect(). join()
92+ publisher. connect(). join()
93+
94+ def subscribeResult = subscribe(subscriber, subscriberId, MqttQos . AT_LEAST_ONCE , received)
95+ def publishResult = publish(publisher, subscriberId, MqttQos . AT_LEAST_ONCE )
96+
97+ Thread . sleep(100 )
98+ then :
99+ noExceptionThrown()
100+
101+ subscribeResult != null
102+ subscribeResult. returnCodes. contains(Mqtt3SubAckReturnCode . SUCCESS_MAXIMUM_QOS_1 )
103+ subscribeResult. type == Mqtt3MessageType . SUBACK
104+
105+ publishResult != null
106+ publishResult. qos == MqttQos . AT_LEAST_ONCE
107+ publishResult. type == Mqtt3MessageType . PUBLISH
108+
109+ received. get() != null
110+ received. get(). qos == MqttQos . AT_LEAST_ONCE
111+ received. get(). type == Mqtt3MessageType . PUBLISH
112+ cleanup :
113+ subscriber. disconnect()
114+ publisher. disconnect()
115+ }
116+
117+ def " publisher should publish message QoS 1 using mqtt 5" () {
48118 given :
49119 def received = new AtomicReference<Mqtt5Publish > ()
50120 def subscriber = buildMqtt5Client()
@@ -78,7 +148,40 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
78148 publisher. disconnect(). join()
79149 }
80150
81- def " publisher should publish message QoS 2" () {
151+ def " publisher should publish message QoS 2 using mqtt 3.1.1" () {
152+ given :
153+ def received = new AtomicReference<Mqtt3Publish > ()
154+ def subscriber = buildMqtt311Client()
155+ def subscriberId = subscriber. getConfig(). clientIdentifier. get()toString()
156+ def publisher = buildMqtt311Client()
157+ when :
158+ subscriber. connect(). join()
159+ publisher. connect(). join()
160+
161+ def subscribeResult = subscribe(subscriber, subscriberId, MqttQos . EXACTLY_ONCE , received)
162+ def publishResult = publish(publisher, subscriberId, MqttQos . EXACTLY_ONCE )
163+
164+ Thread . sleep(100 )
165+ then :
166+ noExceptionThrown()
167+
168+ subscribeResult != null
169+ subscribeResult. returnCodes. contains(Mqtt3SubAckReturnCode . SUCCESS_MAXIMUM_QOS_2 )
170+ subscribeResult. type == Mqtt3MessageType . SUBACK
171+
172+ publishResult != null
173+ publishResult. qos == MqttQos . EXACTLY_ONCE
174+ publishResult. type == Mqtt3MessageType . PUBLISH
175+
176+ received. get() != null
177+ received. get(). qos == MqttQos . EXACTLY_ONCE
178+ received. get(). type == Mqtt3MessageType . PUBLISH
179+ cleanup :
180+ subscriber. disconnect()
181+ publisher. disconnect()
182+ }
183+
184+ def " publisher should publish message QoS 2 using mqtt 5" () {
82185 given :
83186 def received = new AtomicReference<Mqtt5Publish > ()
84187 def subscriber = buildMqtt5Client()
@@ -135,4 +238,27 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
135238 .send()
136239 .join()
137240 }
241+
242+ def publish (Mqtt3AsyncClient publisher , String subscriberId , MqttQos qos ) {
243+ return publisher. publishWith()
244+ .topic(" test/$subscriberId " )
245+ .qos(qos)
246+ .payload(publishPayload)
247+ .send()
248+ .join()
249+ }
250+
251+ def subscribe (
252+ Mqtt3AsyncClient subscriber ,
253+ String subscriberId ,
254+ MqttQos qos ,
255+ AtomicReference<Mqtt3Publish > received
256+ ) {
257+ return subscriber. subscribeWith()
258+ .topicFilter(" test/$subscriberId " )
259+ .qos(qos)
260+ .callback({ publish -> received. set(publish) })
261+ .send()
262+ .join()
263+ }
138264}
0 commit comments