1- package javasabr.mqtt.application.integration
1+ package javasabr.mqtt.broker.application
22
33import com.hivemq.client.mqtt.MqttClient
44import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient
55import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
6- import javasabr.mqtt.application.integration .config.MqttBrokerTestConfig
7- import javasabr.mqtt.application.mock.MqttMockClient
6+ import javasabr.mqtt.broker.application .config.MqttBrokerTestConfig
7+ import javasabr.mqtt.model.MqttClientConnectionConfig
88import javasabr.mqtt.model.MqttProperties
99import javasabr.mqtt.model.MqttServerConnectionConfig
1010import javasabr.mqtt.model.MqttVersion
1111import javasabr.mqtt.network.MqttConnection
12+ import javasabr.mqtt.network.MqttMockClient
1213import org.springframework.beans.factory.annotation.Autowired
1314import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
1415import spock.lang.Specification
1516
1617import java.nio.charset.StandardCharsets
1718import java.util.concurrent.atomic.AtomicInteger
19+ import java.util.concurrent.atomic.AtomicReference
1820
1921import static javasabr.mqtt.network.MqttClient.UnsafeMqttClient
2022
@@ -32,28 +34,17 @@ class IntegrationSpecification extends Specification {
3234 @Autowired
3335 InetSocketAddress externalNetworkAddress
3436
35- @Autowired
36- InetSocketAddress internalNetworkAddress
37-
3837 @Autowired
3938 MqttServerConnectionConfig externalConnectionConfig
4039
4140 def buildExternalMqtt311Client () {
4241 return buildMqtt311Client(generateClientId(), externalNetworkAddress)
4342 }
4443
45- def buildInternalMqtt311Client () {
46- return buildMqtt311Client(generateClientId(), internalNetworkAddress)
47- }
48-
4944 def buildExternalMqtt5Client () {
5045 return buildMqtt5Client(generateClientId(), externalNetworkAddress)
5146 }
5247
53- def buildInternalMqtt5Client () {
54- return buildMqtt5Client(generateClientId(), internalNetworkAddress)
55- }
56-
5748 def buildExternalMqtt311Client (String clientId ) {
5849 return MqttClient . builder()
5950 .identifier(clientId)
@@ -71,7 +62,7 @@ class IntegrationSpecification extends Specification {
7162 .serverPort(address. getPort())
7263 .useMqttVersion3()
7364 .addDisconnectedListener {
74- println " [${ clientId} |mqtt311] disconnected:[$it . cause ]"
65+ println " [${ clientId} |mqtt311] disconnected:[${ it.cause.message } ]"
7566 }
7667 .build()
7768 .toAsync()
@@ -94,7 +85,7 @@ class IntegrationSpecification extends Specification {
9485 .serverPort(address. getPort())
9586 .useMqttVersion5()
9687 .addDisconnectedListener {
97- println " [${ clientId} |mqtt5] disconnected:[$it . cause ]"
88+ println " [${ clientId} |mqtt5] disconnected:[${ it.cause.message } ]"
9889 }
9990 .build()
10091 .toAsync()
@@ -144,38 +135,65 @@ class IntegrationSpecification extends Specification {
144135 )
145136 }
146137
147- def mqtt5MockedConnection (MqttServerConnectionConfig deviceConnectionConfig ) {
148-
149- return Stub (MqttConnection ) {
138+ def mqtt5MockedConnection (MqttServerConnectionConfig serverConnConfig ) {
139+ MqttClientConnectionConfig clientConnConfig = new MqttClientConnectionConfig (
140+ serverConnConfig. maxQos(),
141+ MqttVersion . MQTT_5 ,
142+ MqttProperties . SESSION_EXPIRY_INTERVAL_DISABLED ,
143+ serverConnConfig. receiveMaxPublishes(),
144+ serverConnConfig. maxPacketSize(),
145+ serverConnConfig. topicAliasMaxValue(),
146+ MqttProperties . SERVER_KEEP_ALIVE_DEFAULT ,
147+ false ,
148+ false ,
149+ serverConnConfig. sessionsEnabled(),
150+ serverConnConfig. retainAvailable(),
151+ serverConnConfig. wildcardSubscriptionAvailable(),
152+ serverConnConfig. subscriptionIdAvailable(),
153+ serverConnConfig. sharedSubscriptionAvailable())
154+ def connectionRef = new AtomicReference<MqttConnection > ()
155+ def connection = Stub (MqttConnection ) {
150156 isSupported(MqttVersion . MQTT_5 ) >> true
151157 isSupported(MqttVersion . MQTT_3_1_1 ) >> true
152- serverConnectionConfig() >> deviceConnectionConfig
158+ serverConnectionConfig() >> serverConnConfig
153159 client() >> Stub (UnsafeMqttClient ) {
154- connectionConfig() >> deviceConnectionConfig
155- sessionExpiryInterval() >> MqttProperties . SESSION_EXPIRY_INTERVAL_DISABLED
156- receiveMaxPublishes() >> deviceConnectionConfig. receiveMaxPublishes()
157- maxPacketSize() >> deviceConnectionConfig. maxPacketSize()
158- clientId() >> IntegrationSpecification . clientId
159- keepAlive() >> MqttProperties . SERVER_KEEP_ALIVE_DEFAULT
160- topicAliasMaxValue() >> deviceConnectionConfig. topicAliasMaxValue()
160+ connectionConfig() >> clientConnConfig
161+ connection() >> connectionRef. get()
162+ clientId() >> clientId
161163 }
162164 }
163- }
164-
165- def mqtt311MockedConnection (MqttServerConnectionConfig deviceConnectionConfig ) {
166- return Stub (MqttConnection ) {
165+ connectionRef. set(connection)
166+ return connection
167+ }
168+
169+ def mqtt311MockedConnection (MqttServerConnectionConfig serverConnConfig ) {
170+ MqttClientConnectionConfig clientConnConfig = new MqttClientConnectionConfig (
171+ serverConnConfig. maxQos(),
172+ MqttVersion . MQTT_3_1_1 ,
173+ MqttProperties . SESSION_EXPIRY_INTERVAL_DISABLED ,
174+ serverConnConfig. receiveMaxPublishes(),
175+ serverConnConfig. maxPacketSize(),
176+ serverConnConfig. topicAliasMaxValue(),
177+ MqttProperties . SERVER_KEEP_ALIVE_DEFAULT ,
178+ false ,
179+ false ,
180+ serverConnConfig. sessionsEnabled(),
181+ serverConnConfig. retainAvailable(),
182+ serverConnConfig. wildcardSubscriptionAvailable(),
183+ serverConnConfig. subscriptionIdAvailable(),
184+ serverConnConfig. sharedSubscriptionAvailable())
185+ def connectionRef = new AtomicReference<MqttConnection > ()
186+ def connection = Stub (MqttConnection ) {
167187 isSupported(MqttVersion . MQTT_5 ) >> false
168188 isSupported(MqttVersion . MQTT_3_1_1 ) >> true
169- serverConnectionConfig() >> deviceConnectionConfig
189+ serverConnectionConfig() >> serverConnConfig
170190 client() >> Stub (UnsafeMqttClient ) {
171- connectionConfig() >> deviceConnectionConfig
172- sessionExpiryInterval() >> MqttProperties . SESSION_EXPIRY_INTERVAL_DISABLED
173- receiveMaxPublishes() >> deviceConnectionConfig. receiveMaxPublishes()
174- maxPacketSize() >> deviceConnectionConfig. maxPacketSize()
175- clientId() >> IntegrationSpecification . clientId
176- keepAlive() >> MqttProperties . SERVER_KEEP_ALIVE_DEFAULT
177- topicAliasMaxValue() >> deviceConnectionConfig. topicAliasMaxValue()
191+ connectionConfig() >> clientConnConfig
192+ connection() >> connectionRef. get()
193+ clientId() >> clientId
178194 }
179195 }
196+ connectionRef. set(connection)
197+ return connection
180198 }
181199}
0 commit comments