Skip to content

Commit df97b2f

Browse files
authored
Migrate to rlib 10.0.alpha3 (#44)
* migrate to updated rlib
1 parent c964240 commit df97b2f

File tree

107 files changed

+570
-646
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

107 files changed

+570
-646
lines changed

application/src/main/java/javasabr/mqtt/application/config/MqttNetworkConfig.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,22 @@
1313
import java.util.function.BiFunction;
1414
import java.util.function.Consumer;
1515
import javasabr.mqtt.model.MqttConnectionConfig;
16+
import javasabr.mqtt.network.packet.in.MqttReadablePacket;
1617
import javasabr.rlib.network.BufferAllocator;
1718
import javasabr.rlib.network.Network;
1819
import javasabr.rlib.network.NetworkFactory;
1920
import javasabr.rlib.network.ServerNetworkConfig;
2021
import javasabr.rlib.network.ServerNetworkConfig.SimpleServerNetworkConfig;
2122
import javasabr.rlib.network.impl.DefaultBufferAllocator;
2223
import javasabr.rlib.network.server.ServerNetwork;
24+
import lombok.CustomLog;
2325
import lombok.RequiredArgsConstructor;
2426
import lombok.extern.log4j.Log4j2;
2527
import org.springframework.context.annotation.Bean;
2628
import org.springframework.context.annotation.Configuration;
2729
import org.springframework.core.env.Environment;
2830

29-
@Log4j2
31+
@CustomLog
3032
@Configuration
3133
@RequiredArgsConstructor
3234
public class MqttNetworkConfig {
@@ -44,7 +46,7 @@ ServerNetworkConfig internalNetworkConfig() {
4446
.pendingBufferSize(env.getProperty("mqtt.internal.network.pending.buffer.size", int.class, 4096))
4547
.writeBufferSize(env.getProperty("mqtt.internal.network.write.buffer.size", int.class, 2048))
4648
.threadGroupName("InternalNetwork")
47-
.threadGroupSize(env.getProperty("mqtt.internal.network.thread.count", int.class, 1))
49+
.threadGroupMaxSize(env.getProperty("mqtt.internal.network.thread.count", int.class, 1))
4850
.build();
4951
}
5052

@@ -56,7 +58,7 @@ ServerNetworkConfig externalNetworkConfig() {
5658
.pendingBufferSize(env.getProperty("mqtt.external.network.pending.buffer.size", int.class, 200))
5759
.writeBufferSize(env.getProperty("mqtt.external.network.write.buffer.size", int.class, 100))
5860
.threadGroupName("ExternalNetwork")
59-
.threadGroupSize(env.getProperty("mqtt.external.network.thread.count", int.class, 1))
61+
.threadGroupMaxSize(env.getProperty("mqtt.external.network.thread.count", int.class, 1))
6062
.build();
6163
}
6264

@@ -77,7 +79,7 @@ ServerNetwork<MqttConnection> externalNetwork(
7779
MqttConnectionConfig externalConnectionConfig,
7880
PacketInHandler[] packetHandlers,
7981
MqttClientReleaseHandler mqttClientReleaseHandler) {
80-
return NetworkFactory.newServerNetwork(
82+
return NetworkFactory.serverNetwork(
8183
externalNetworkConfig,
8284
externalConnectionFactory(
8385
externalBufferAllocator,
@@ -93,7 +95,7 @@ ServerNetwork<MqttConnection> internalNetwork(
9395
MqttConnectionConfig internalConnectionConfig,
9496
PacketInHandler[] packetHandlers,
9597
MqttClientReleaseHandler mqttClientReleaseHandler) {
96-
return NetworkFactory.newServerNetwork(
98+
return NetworkFactory.serverNetwork(
9799
internalNetworkConfig,
98100
internalConnectionFactory(
99101
internalBufferAllocator,
@@ -135,18 +137,18 @@ InetSocketAddress internalNetworkAddress(
135137
@Bean
136138
Consumer<MqttConnection> externalConnectionConsumer() {
137139
return mqttConnection -> {
138-
log.info("Accepted external connection: {}", mqttConnection);
139-
var client = (UnsafeMqttClient) mqttConnection.getClient();
140-
mqttConnection.onReceive((conn, packet) -> client.handle(packet));
140+
log.info(mqttConnection.remoteAddress(), "[%s] Accepted external connection"::formatted);
141+
var client = (UnsafeMqttClient) mqttConnection.client();
142+
mqttConnection.onReceive((conn, packet) -> client.handle((MqttReadablePacket) packet));
141143
};
142144
}
143145

144146
@Bean
145147
Consumer<MqttConnection> internalConnectionConsumer() {
146148
return mqttConnection -> {
147-
log.info("Accepted internal connection: {}", mqttConnection);
148-
var client = (UnsafeMqttClient) mqttConnection.getClient();
149-
mqttConnection.onReceive((conn, packet) -> client.handle(packet));
149+
log.info(mqttConnection.remoteAddress(), "[%s] Accepted internal connection"::formatted);
150+
var client = (UnsafeMqttClient) mqttConnection.client();
151+
mqttConnection.onReceive((conn, packet) -> client.handle((MqttReadablePacket) packet));
150152
};
151153
}
152154

application/src/test/groovy/javasabr/mqtt/application/extension/SpecificationExtensions.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package javasabr.mqtt.application.extension
22

3+
import javasabr.mqtt.network.MqttConnection
34
import javasabr.mqtt.network.packet.out.MqttWritablePacket
45
import javasabr.mqtt.network.utils.MqttDataUtils
56
import javasabr.mqtt.model.PacketProperty
@@ -16,7 +17,7 @@ class SpecificationExtensions extends Specification {
1617
static final writer = new MqttWritablePacket() {
1718

1819
@Override
19-
protected void writeImpl(ByteBuffer buffer) {}
20+
protected void writeImpl(MqttConnection connection, ByteBuffer buffer) {}
2021
}
2122

2223
static ByteBuffer putMbi(ByteBuffer self, int value) {

application/src/test/groovy/javasabr/mqtt/application/integration/ConnectSubscribePublishTest.groovy

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
2424
when:
2525
subscriber.connect().join()
2626
publisher.connect().join()
27-
2827
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
2928
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
30-
3129
Thread.sleep(100)
3230
then:
3331
noExceptionThrown()
34-
3532
subscribeResult != null
3633
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_0)
3734
subscribeResult.type == Mqtt3MessageType.SUBACK
38-
3935
publishResult != null
4036
publishResult.qos == MqttQos.AT_MOST_ONCE
4137
publishResult.type == Mqtt3MessageType.PUBLISH
42-
4338
received.get() != null
4439
received.get().qos == MqttQos.AT_MOST_ONCE
4540
received.get().type == Mqtt3MessageType.PUBLISH
@@ -57,22 +52,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
5752
when:
5853
subscriber.connect().join()
5954
publisher.connect().join()
60-
6155
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
6256
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
63-
6457
Thread.sleep(100)
6558
then:
6659
noExceptionThrown()
67-
6860
subscribeResult != null
6961
subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_0)
7062
subscribeResult.type == Mqtt5MessageType.SUBACK
71-
7263
publishResult != null
7364
publishResult.publish.qos == MqttQos.AT_MOST_ONCE
7465
publishResult.publish.type == Mqtt5MessageType.PUBLISH
75-
7666
received.get() != null
7767
received.get().qos == MqttQos.AT_MOST_ONCE
7868
received.get().type == Mqtt5MessageType.PUBLISH
@@ -90,22 +80,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
9080
when:
9181
subscriber.connect().join()
9282
publisher.connect().join()
93-
9483
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
9584
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)
96-
9785
Thread.sleep(100)
9886
then:
9987
noExceptionThrown()
100-
10188
subscribeResult != null
10289
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_1)
10390
subscribeResult.type == Mqtt3MessageType.SUBACK
104-
10591
publishResult != null
10692
publishResult.qos == MqttQos.AT_LEAST_ONCE
10793
publishResult.type == Mqtt3MessageType.PUBLISH
108-
10994
received.get() != null
11095
received.get().qos == MqttQos.AT_LEAST_ONCE
11196
received.get().type == Mqtt3MessageType.PUBLISH
@@ -121,25 +106,19 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
121106
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
122107
def publisher = buildExternalMqtt5Client()
123108
when:
124-
125109
subscriber.connect().join()
126110
publisher.connect().join()
127-
128111
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
129112
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)
130-
131113
Thread.sleep(100)
132114
then:
133115
noExceptionThrown()
134-
135116
subscribeResult != null
136117
subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1)
137118
subscribeResult.type == Mqtt5MessageType.SUBACK
138-
139119
publishResult != null
140120
publishResult.publish.qos == MqttQos.AT_LEAST_ONCE
141121
publishResult.publish.type == Mqtt5MessageType.PUBLISH
142-
143122
received.get() != null
144123
received.get().qos == MqttQos.AT_LEAST_ONCE
145124
received.get().type == Mqtt5MessageType.PUBLISH
@@ -157,28 +136,20 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
157136
when:
158137
subscriber.connect().join()
159138
publisher.connect().join()
160-
161139
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
162140
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)
163-
164141
Thread.sleep(100)
165142
then:
166143
noExceptionThrown()
167-
168144
subscribeResult != null
169145
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_2)
170146
subscribeResult.type == Mqtt3MessageType.SUBACK
171-
172147
publishResult != null
173148
publishResult.qos == MqttQos.EXACTLY_ONCE
174149
publishResult.type == Mqtt3MessageType.PUBLISH
175-
176150
received.get() != null
177151
received.get().qos == MqttQos.EXACTLY_ONCE
178152
received.get().type == Mqtt3MessageType.PUBLISH
179-
cleanup:
180-
subscriber.disconnect().join()
181-
publisher.disconnect().join()
182153
}
183154

184155
def "publisher should publish message QoS 2 using mqtt 5"() {
@@ -188,25 +159,19 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
188159
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
189160
def publisher = buildExternalMqtt5Client()
190161
when:
191-
192162
subscriber.connect().join()
193163
publisher.connect().join()
194-
195164
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
196165
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)
197-
198166
Thread.sleep(100)
199167
then:
200168
noExceptionThrown()
201-
202169
subscribeResult != null
203170
subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_2)
204171
subscribeResult.type == Mqtt5MessageType.SUBACK
205-
206172
publishResult != null
207173
publishResult.publish.qos == MqttQos.EXACTLY_ONCE
208174
publishResult.publish.type == Mqtt5MessageType.PUBLISH
209-
210175
received.get() != null
211176
received.get().qos == MqttQos.EXACTLY_ONCE
212177
received.get().type == Mqtt5MessageType.PUBLISH
@@ -229,8 +194,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
229194
Mqtt5AsyncClient subscriber,
230195
String subscriberId,
231196
MqttQos qos,
232-
AtomicReference<Mqtt5Publish> received
233-
) {
197+
AtomicReference<Mqtt5Publish> received) {
234198
return subscriber.subscribeWith()
235199
.topicFilter("test/$subscriberId")
236200
.qos(qos)
@@ -252,8 +216,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
252216
Mqtt3AsyncClient subscriber,
253217
String subscriberId,
254218
MqttQos qos,
255-
AtomicReference<Mqtt3Publish> received
256-
) {
219+
AtomicReference<Mqtt3Publish> received) {
257220
return subscriber.subscribeWith()
258221
.topicFilter("test/$subscriberId")
259222
.qos(qos)

application/src/test/groovy/javasabr/mqtt/application/integration/IntegrationSpecification.groovy

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ class IntegrationSpecification extends Specification {
7070
.serverHost(address.getHostName())
7171
.serverPort(address.getPort())
7272
.useMqttVersion3()
73+
.addDisconnectedListener {
74+
println "[${clientId}|mqtt311] disconnected:[$it.cause]"
75+
}
7376
.build()
7477
.toAsync()
7578
}
@@ -90,6 +93,9 @@ class IntegrationSpecification extends Specification {
9093
.serverHost(address.getHostName())
9194
.serverPort(address.getPort())
9295
.useMqttVersion5()
96+
.addDisconnectedListener {
97+
println "[${clientId}|mqtt5] disconnected:[$it.cause]"
98+
}
9399
.build()
94100
.toAsync()
95101
}
@@ -143,15 +149,15 @@ class IntegrationSpecification extends Specification {
143149
return Stub(MqttConnection) {
144150
isSupported(MqttVersion.MQTT_5) >> true
145151
isSupported(MqttVersion.MQTT_3_1_1) >> true
146-
getConfig() >> deviceConnectionConfig
147-
getClient() >> Stub(UnsafeMqttClient) {
148-
getConnectionConfig() >> deviceConnectionConfig
149-
getSessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
150-
getReceiveMax() >> deviceConnectionConfig.getReceiveMaximum()
151-
getMaximumPacketSize() >> deviceConnectionConfig.getMaximumPacketSize()
152-
getClientId() >> IntegrationSpecification.clientId
153-
getKeepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
154-
getTopicAliasMaximum() >> deviceConnectionConfig.getTopicAliasMaximum()
152+
config() >> deviceConnectionConfig
153+
client() >> Stub(UnsafeMqttClient) {
154+
connectionConfig() >> deviceConnectionConfig
155+
sessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
156+
receiveMax() >> deviceConnectionConfig.receiveMaximum()
157+
maximumPacketSize() >> deviceConnectionConfig.maximumPacketSize()
158+
clientId() >> IntegrationSpecification.clientId
159+
keepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
160+
topicAliasMaximum() >> deviceConnectionConfig.topicAliasMaximum()
155161
}
156162
}
157163
}
@@ -160,15 +166,15 @@ class IntegrationSpecification extends Specification {
160166
return Stub(MqttConnection) {
161167
isSupported(MqttVersion.MQTT_5) >> false
162168
isSupported(MqttVersion.MQTT_3_1_1) >> true
163-
getConfig() >> deviceConnectionConfig
164-
getClient() >> Stub(UnsafeMqttClient) {
165-
getConnectionConfig() >> deviceConnectionConfig
166-
getSessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
167-
getReceiveMax() >> deviceConnectionConfig.getReceiveMaximum()
168-
getMaximumPacketSize() >> deviceConnectionConfig.getMaximumPacketSize()
169-
getClientId() >> IntegrationSpecification.clientId
170-
getKeepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
171-
getTopicAliasMaximum() >> deviceConnectionConfig.getTopicAliasMaximum()
169+
config() >> deviceConnectionConfig
170+
client() >> Stub(UnsafeMqttClient) {
171+
connectionConfig() >> deviceConnectionConfig
172+
sessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
173+
receiveMax() >> deviceConnectionConfig.receiveMaximum()
174+
maximumPacketSize() >> deviceConnectionConfig.maximumPacketSize()
175+
clientId() >> IntegrationSpecification.clientId
176+
keepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
177+
topicAliasMaximum() >> deviceConnectionConfig.topicAliasMaximum()
172178
}
173179
}
174180
}

0 commit comments

Comments
 (0)