Skip to content

Commit d3e7199

Browse files
committed
migrate to updated rlib
1 parent d7d9b71 commit d3e7199

File tree

63 files changed

+147
-216
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+147
-216
lines changed

application/src/main/java/javasabr/mqtt/application/config/MqttNetworkConfig.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@
2121
import javasabr.rlib.network.ServerNetworkConfig.SimpleServerNetworkConfig;
2222
import javasabr.rlib.network.impl.DefaultBufferAllocator;
2323
import javasabr.rlib.network.server.ServerNetwork;
24+
import lombok.CustomLog;
2425
import lombok.RequiredArgsConstructor;
2526
import lombok.extern.log4j.Log4j2;
2627
import org.springframework.context.annotation.Bean;
2728
import org.springframework.context.annotation.Configuration;
2829
import org.springframework.core.env.Environment;
2930

30-
@Log4j2
31+
@CustomLog
3132
@Configuration
3233
@RequiredArgsConstructor
3334
public class MqttNetworkConfig {
@@ -45,7 +46,7 @@ ServerNetworkConfig internalNetworkConfig() {
4546
.pendingBufferSize(env.getProperty("mqtt.internal.network.pending.buffer.size", int.class, 4096))
4647
.writeBufferSize(env.getProperty("mqtt.internal.network.write.buffer.size", int.class, 2048))
4748
.threadGroupName("InternalNetwork")
48-
.threadGroupSize(env.getProperty("mqtt.internal.network.thread.count", int.class, 1))
49+
.threadGroupMaxSize(env.getProperty("mqtt.internal.network.thread.count", int.class, 1))
4950
.build();
5051
}
5152

@@ -57,7 +58,7 @@ ServerNetworkConfig externalNetworkConfig() {
5758
.pendingBufferSize(env.getProperty("mqtt.external.network.pending.buffer.size", int.class, 200))
5859
.writeBufferSize(env.getProperty("mqtt.external.network.write.buffer.size", int.class, 100))
5960
.threadGroupName("ExternalNetwork")
60-
.threadGroupSize(env.getProperty("mqtt.external.network.thread.count", int.class, 1))
61+
.threadGroupMaxSize(env.getProperty("mqtt.external.network.thread.count", int.class, 1))
6162
.build();
6263
}
6364

@@ -136,7 +137,7 @@ InetSocketAddress internalNetworkAddress(
136137
@Bean
137138
Consumer<MqttConnection> externalConnectionConsumer() {
138139
return mqttConnection -> {
139-
log.info("Accepted external connection: {}", mqttConnection);
140+
log.info(mqttConnection.remoteAddress(), "[%s] Accepted external connection"::formatted);
140141
var client = (UnsafeMqttClient) mqttConnection.client();
141142
mqttConnection.onReceive((conn, packet) -> client.handle((MqttReadablePacket) packet));
142143
};
@@ -145,7 +146,7 @@ Consumer<MqttConnection> externalConnectionConsumer() {
145146
@Bean
146147
Consumer<MqttConnection> internalConnectionConsumer() {
147148
return mqttConnection -> {
148-
log.info("Accepted internal connection: {}", mqttConnection);
149+
log.info(mqttConnection.remoteAddress(), "[%s] Accepted internal connection"::formatted);
149150
var client = (UnsafeMqttClient) mqttConnection.client();
150151
mqttConnection.onReceive((conn, packet) -> client.handle((MqttReadablePacket) packet));
151152
};

application/src/test/groovy/javasabr/mqtt/application/integration/ConnectSubscribePublishTest.groovy

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
2424
when:
2525
subscriber.connect().join()
2626
publisher.connect().join()
27-
2827
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
2928
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
30-
3129
Thread.sleep(100)
3230
then:
3331
noExceptionThrown()
34-
3532
subscribeResult != null
3633
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_0)
3734
subscribeResult.type == Mqtt3MessageType.SUBACK
38-
3935
publishResult != null
4036
publishResult.qos == MqttQos.AT_MOST_ONCE
4137
publishResult.type == Mqtt3MessageType.PUBLISH
42-
4338
received.get() != null
4439
received.get().qos == MqttQos.AT_MOST_ONCE
4540
received.get().type == Mqtt3MessageType.PUBLISH
@@ -57,22 +52,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
5752
when:
5853
subscriber.connect().join()
5954
publisher.connect().join()
60-
6155
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
6256
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
63-
6457
Thread.sleep(100)
6558
then:
6659
noExceptionThrown()
67-
6860
subscribeResult != null
6961
subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_0)
7062
subscribeResult.type == Mqtt5MessageType.SUBACK
71-
7263
publishResult != null
7364
publishResult.publish.qos == MqttQos.AT_MOST_ONCE
7465
publishResult.publish.type == Mqtt5MessageType.PUBLISH
75-
7666
received.get() != null
7767
received.get().qos == MqttQos.AT_MOST_ONCE
7868
received.get().type == Mqtt5MessageType.PUBLISH
@@ -90,22 +80,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
9080
when:
9181
subscriber.connect().join()
9282
publisher.connect().join()
93-
9483
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
9584
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)
96-
9785
Thread.sleep(100)
9886
then:
9987
noExceptionThrown()
100-
10188
subscribeResult != null
10289
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_1)
10390
subscribeResult.type == Mqtt3MessageType.SUBACK
104-
10591
publishResult != null
10692
publishResult.qos == MqttQos.AT_LEAST_ONCE
10793
publishResult.type == Mqtt3MessageType.PUBLISH
108-
10994
received.get() != null
11095
received.get().qos == MqttQos.AT_LEAST_ONCE
11196
received.get().type == Mqtt3MessageType.PUBLISH
@@ -121,25 +106,19 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
121106
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
122107
def publisher = buildExternalMqtt5Client()
123108
when:
124-
125109
subscriber.connect().join()
126110
publisher.connect().join()
127-
128111
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
129112
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)
130-
131113
Thread.sleep(100)
132114
then:
133115
noExceptionThrown()
134-
135116
subscribeResult != null
136117
subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1)
137118
subscribeResult.type == Mqtt5MessageType.SUBACK
138-
139119
publishResult != null
140120
publishResult.publish.qos == MqttQos.AT_LEAST_ONCE
141121
publishResult.publish.type == Mqtt5MessageType.PUBLISH
142-
143122
received.get() != null
144123
received.get().qos == MqttQos.AT_LEAST_ONCE
145124
received.get().type == Mqtt5MessageType.PUBLISH
@@ -157,28 +136,20 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
157136
when:
158137
subscriber.connect().join()
159138
publisher.connect().join()
160-
161139
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
162140
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)
163-
164141
Thread.sleep(100)
165142
then:
166143
noExceptionThrown()
167-
168144
subscribeResult != null
169145
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_2)
170146
subscribeResult.type == Mqtt3MessageType.SUBACK
171-
172147
publishResult != null
173148
publishResult.qos == MqttQos.EXACTLY_ONCE
174149
publishResult.type == Mqtt3MessageType.PUBLISH
175-
176150
received.get() != null
177151
received.get().qos == MqttQos.EXACTLY_ONCE
178152
received.get().type == Mqtt3MessageType.PUBLISH
179-
cleanup:
180-
subscriber.disconnect().join()
181-
publisher.disconnect().join()
182153
}
183154

184155
def "publisher should publish message QoS 2 using mqtt 5"() {
@@ -188,25 +159,19 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
188159
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
189160
def publisher = buildExternalMqtt5Client()
190161
when:
191-
192162
subscriber.connect().join()
193163
publisher.connect().join()
194-
195164
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
196165
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)
197-
198166
Thread.sleep(100)
199167
then:
200168
noExceptionThrown()
201-
202169
subscribeResult != null
203170
subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_2)
204171
subscribeResult.type == Mqtt5MessageType.SUBACK
205-
206172
publishResult != null
207173
publishResult.publish.qos == MqttQos.EXACTLY_ONCE
208174
publishResult.publish.type == Mqtt5MessageType.PUBLISH
209-
210175
received.get() != null
211176
received.get().qos == MqttQos.EXACTLY_ONCE
212177
received.get().type == Mqtt5MessageType.PUBLISH
@@ -229,8 +194,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
229194
Mqtt5AsyncClient subscriber,
230195
String subscriberId,
231196
MqttQos qos,
232-
AtomicReference<Mqtt5Publish> received
233-
) {
197+
AtomicReference<Mqtt5Publish> received) {
234198
return subscriber.subscribeWith()
235199
.topicFilter("test/$subscriberId")
236200
.qos(qos)
@@ -252,8 +216,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
252216
Mqtt3AsyncClient subscriber,
253217
String subscriberId,
254218
MqttQos qos,
255-
AtomicReference<Mqtt3Publish> received
256-
) {
219+
AtomicReference<Mqtt3Publish> received) {
257220
return subscriber.subscribeWith()
258221
.topicFilter("test/$subscriberId")
259222
.qos(qos)

application/src/test/groovy/javasabr/mqtt/application/integration/IntegrationSpecification.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ class IntegrationSpecification extends Specification {
7070
.serverHost(address.getHostName())
7171
.serverPort(address.getPort())
7272
.useMqttVersion3()
73+
.addDisconnectedListener {
74+
println "[${clientId}|mqtt311] disconnected:[$it.cause]"
75+
}
7376
.build()
7477
.toAsync()
7578
}
@@ -90,6 +93,9 @@ class IntegrationSpecification extends Specification {
9093
.serverHost(address.getHostName())
9194
.serverPort(address.getPort())
9295
.useMqttVersion5()
96+
.addDisconnectedListener {
97+
println "[${clientId}|mqtt5] disconnected:[$it.cause]"
98+
}
9399
.build()
94100
.toAsync()
95101
}

0 commit comments

Comments
 (0)