@@ -11,13 +11,14 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator
1111import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
1212import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode
1313
14+ import java.util.concurrent.CompletableFuture
1415import java.util.concurrent.atomic.AtomicReference
1516
1617class ConnectSubscribePublishTest extends IntegrationSpecification {
1718
1819 def " should deliver publish message QoS 0 using mqtt 3.1.1" () {
1920 given :
20- def received = new AtomicReference <Mqtt3Publish > ()
21+ def received = new CompletableFuture <Mqtt3Publish > ()
2122 def subscriber = buildExternalMqtt311Client()
2223 def subscriberId = subscriber. getConfig(). clientIdentifier. get(). toString()
2324 def publisher = buildExternalMqtt311Client()
@@ -34,17 +35,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
3435 publishResult != null
3536 publishResult. qos == MqttQos . AT_MOST_ONCE
3637 publishResult. type == Mqtt3MessageType . PUBLISH
37- received. get () != null
38- received. get (). qos == MqttQos . AT_MOST_ONCE
39- received. get (). type == Mqtt3MessageType . PUBLISH
38+ received. join () != null
39+ received. join (). qos == MqttQos . AT_MOST_ONCE
40+ received. join (). type == Mqtt3MessageType . PUBLISH
4041 cleanup :
4142 subscriber. disconnect(). join()
4243 publisher. disconnect(). join()
4344 }
4445
4546 def " should deliver publish message QoS 0 using mqtt 5" () {
4647 given :
47- def received = new AtomicReference <Mqtt5Publish > ()
48+ def received = new CompletableFuture <Mqtt5Publish > ()
4849 def subscriber = buildExternalMqtt5Client()
4950 def subscriberId = subscriber. getConfig(). clientIdentifier. get(). toString()
5051 def publisher = buildExternalMqtt5Client()
@@ -61,17 +62,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
6162 publishResult != null
6263 publishResult. publish. qos == MqttQos . AT_MOST_ONCE
6364 publishResult. publish. type == Mqtt5MessageType . PUBLISH
64- received. get () != null
65- received. get (). qos == MqttQos . AT_MOST_ONCE
66- received. get (). type == Mqtt5MessageType . PUBLISH
65+ received. join () != null
66+ received. join (). qos == MqttQos . AT_MOST_ONCE
67+ received. join (). type == Mqtt5MessageType . PUBLISH
6768 cleanup :
6869 subscriber. disconnect(). join()
6970 publisher. disconnect(). join()
7071 }
7172
7273 def " should deliver publish message QoS 1 using mqtt 3.1.1" () {
7374 given :
74- def received = new AtomicReference <Mqtt3Publish > ()
75+ def received = new CompletableFuture <Mqtt3Publish > ()
7576 def subscriber = buildExternalMqtt311Client()
7677 def subscriberId = subscriber. getConfig(). clientIdentifier. get(). toString()
7778 def publisher = buildExternalMqtt311Client()
@@ -88,17 +89,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
8889 publishResult != null
8990 publishResult. qos == MqttQos . AT_LEAST_ONCE
9091 publishResult. type == Mqtt3MessageType . PUBLISH
91- received. get () != null
92- received. get (). qos == MqttQos . AT_LEAST_ONCE
93- received. get (). type == Mqtt3MessageType . PUBLISH
92+ received. join () != null
93+ received. join (). qos == MqttQos . AT_LEAST_ONCE
94+ received. join (). type == Mqtt3MessageType . PUBLISH
9495 cleanup :
9596 subscriber. disconnect(). join()
9697 publisher. disconnect(). join()
9798 }
9899
99- def " publisher should publish message QoS 1 using mqtt 5" () {
100+ def " should deliver publish message QoS 1 using mqtt 5" () {
100101 given :
101- def received = new AtomicReference <Mqtt5Publish > ()
102+ def received = new CompletableFuture <Mqtt5Publish > ()
102103 def subscriber = buildExternalMqtt5Client()
103104 def subscriberId = subscriber. getConfig(). clientIdentifier. get(). toString()
104105 def publisher = buildExternalMqtt5Client()
@@ -107,7 +108,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
107108 publisher. connect(). join()
108109 def subscribeResult = subscribe(subscriber, subscriberId, MqttQos . AT_LEAST_ONCE , received)
109110 def publishResult = publish(publisher, subscriberId, MqttQos . AT_LEAST_ONCE )
110- Thread . sleep(100 )
111111 then :
112112 noExceptionThrown()
113113 subscribeResult != null
@@ -116,17 +116,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
116116 publishResult != null
117117 publishResult. publish. qos == MqttQos . AT_LEAST_ONCE
118118 publishResult. publish. type == Mqtt5MessageType . PUBLISH
119- received. get () != null
120- received. get (). qos == MqttQos . AT_LEAST_ONCE
121- received. get (). type == Mqtt5MessageType . PUBLISH
119+ received. join () != null
120+ received. join (). qos == MqttQos . AT_LEAST_ONCE
121+ received. join (). type == Mqtt5MessageType . PUBLISH
122122 cleanup :
123123 subscriber. disconnect(). join()
124124 publisher. disconnect(). join()
125125 }
126126
127- def " publisher should publish message QoS 2 using mqtt 3.1.1" () {
127+ def " should deliver publish message QoS 2 using mqtt 3.1.1" () {
128128 given :
129- def received = new AtomicReference <Mqtt3Publish > ()
129+ def received = new CompletableFuture <Mqtt3Publish > ()
130130 def subscriber = buildExternalMqtt311Client()
131131 def subscriberId = subscriber. getConfig(). clientIdentifier. get(). toString()
132132 def publisher = buildExternalMqtt311Client()
@@ -135,7 +135,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
135135 publisher. connect(). join()
136136 def subscribeResult = subscribe(subscriber, subscriberId, MqttQos . EXACTLY_ONCE , received)
137137 def publishResult = publish(publisher, subscriberId, MqttQos . EXACTLY_ONCE )
138- Thread . sleep(100 )
139138 then :
140139 noExceptionThrown()
141140 subscribeResult != null
@@ -144,14 +143,14 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
144143 publishResult != null
145144 publishResult. qos == MqttQos . EXACTLY_ONCE
146145 publishResult. type == Mqtt3MessageType . PUBLISH
147- received. get () != null
148- received. get (). qos == MqttQos . EXACTLY_ONCE
149- received. get (). type == Mqtt3MessageType . PUBLISH
146+ received. join () != null
147+ received. join (). qos == MqttQos . EXACTLY_ONCE
148+ received. join (). type == Mqtt3MessageType . PUBLISH
150149 }
151150
152- def " publisher should publish message QoS 2 using mqtt 5" () {
151+ def " should deliver publish message QoS 2 using mqtt 5" () {
153152 given :
154- def received = new AtomicReference <Mqtt5Publish > ()
153+ def received = new CompletableFuture <Mqtt5Publish > ()
155154 def subscriber = buildExternalMqtt5Client()
156155 def subscriberId = subscriber. getConfig(). clientIdentifier. get(). toString()
157156 def publisher = buildExternalMqtt5Client()
@@ -169,9 +168,9 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
169168 publishResult != null
170169 publishResult. publish. qos == MqttQos . EXACTLY_ONCE
171170 publishResult. publish. type == Mqtt5MessageType . PUBLISH
172- received. get () != null
173- received. get (). qos == MqttQos . EXACTLY_ONCE
174- received. get (). type == Mqtt5MessageType . PUBLISH
171+ received. join () != null
172+ received. join (). qos == MqttQos . EXACTLY_ONCE
173+ received. join (). type == Mqtt5MessageType . PUBLISH
175174 cleanup :
176175 subscriber. disconnect(). join()
177176 publisher. disconnect(). join()
@@ -191,11 +190,11 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
191190 Mqtt5AsyncClient subscriber ,
192191 String subscriberId ,
193192 MqttQos qos ,
194- AtomicReference <Mqtt5Publish > received ) {
193+ CompletableFuture <Mqtt5Publish > received ) {
195194 return subscriber. subscribeWith()
196195 .topicFilter(" test/$subscriberId " )
197196 .qos(qos)
198- .callback({ publish -> received. set (publish) })
197+ .callback({ publish -> received. complete (publish) })
199198 .send()
200199 .join()
201200 }
@@ -213,11 +212,11 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
213212 Mqtt3AsyncClient subscriber ,
214213 String subscriberId ,
215214 MqttQos qos ,
216- AtomicReference <Mqtt3Publish > received ) {
215+ CompletableFuture <Mqtt3Publish > received ) {
217216 return subscriber. subscribeWith()
218217 .topicFilter(" test/$subscriberId " )
219218 .qos(qos)
220- .callback({ publish -> received. set (publish) })
219+ .callback({ publish -> received. complete (publish) })
221220 .send()
222221 .join()
223222 }
0 commit comments