Skip to content

Commit e871b2e

Browse files
authored
Improve SubscriptionServie, part 1 (#50)
1 parent e09a611 commit e871b2e

File tree

164 files changed

+4453
-2789
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

164 files changed

+4453
-2789
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,19 @@
1717
import javasabr.mqtt.service.PublishDeliveringService;
1818
import javasabr.mqtt.service.PublishReceivingService;
1919
import javasabr.mqtt.service.SubscriptionService;
20+
import javasabr.mqtt.service.TopicService;
2021
import javasabr.mqtt.service.handler.client.ExternalMqttClientReleaseHandler;
2122
import javasabr.mqtt.service.impl.DefaultConnectionService;
2223
import javasabr.mqtt.service.impl.DefaultMessageOutFactoryService;
2324
import javasabr.mqtt.service.impl.DefaultMqttConnectionFactory;
2425
import javasabr.mqtt.service.impl.DefaultPublishDeliveringService;
2526
import javasabr.mqtt.service.impl.DefaultPublishReceivingService;
27+
import javasabr.mqtt.service.impl.DefaultTopicService;
2628
import javasabr.mqtt.service.impl.ExternalMqttClientFactory;
2729
import javasabr.mqtt.service.impl.FileCredentialsSource;
2830
import javasabr.mqtt.service.impl.InMemoryClientIdRegistry;
31+
import javasabr.mqtt.service.impl.InMemorySubscriptionService;
2932
import javasabr.mqtt.service.impl.SimpleAuthenticationService;
30-
import javasabr.mqtt.service.impl.SimpleSubscriptionService;
3133
import javasabr.mqtt.service.message.handler.MqttInMessageHandler;
3234
import javasabr.mqtt.service.message.handler.impl.ConnectInMqttInMessageHandler;
3335
import javasabr.mqtt.service.message.handler.impl.DisconnectMqttInMessageHandler;
@@ -96,7 +98,7 @@ AuthenticationService authenticationService(
9698

9799
@Bean
98100
SubscriptionService subscriptionService() {
99-
return new SimpleSubscriptionService();
101+
return new InMemorySubscriptionService();
100102
}
101103

102104
@Bean
@@ -115,6 +117,11 @@ MessageOutFactoryService mqttMessageOutFactoryService(
115117
return new DefaultMessageOutFactoryService(knownFactories);
116118
}
117119

120+
@Bean
121+
TopicService topicService() {
122+
return new DefaultTopicService();
123+
}
124+
118125
@Bean
119126
MqttInMessageHandler connectInMqttInMessageHandler(
120127
ClientIdRegistry clientIdRegistry,
@@ -131,47 +138,58 @@ MqttInMessageHandler connectInMqttInMessageHandler(
131138
}
132139

133140
@Bean
134-
MqttInMessageHandler publishAckMqttInMessageHandler() {
135-
return new PublishAckMqttInMessageHandler();
141+
MqttInMessageHandler publishAckMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
142+
return new PublishAckMqttInMessageHandler(messageOutFactoryService);
136143
}
137144

138145
@Bean
139-
MqttInMessageHandler publishCompleteMqttInMessageHandler() {
140-
return new PublishCompleteMqttInMessageHandler();
146+
MqttInMessageHandler publishCompleteMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
147+
return new PublishCompleteMqttInMessageHandler(messageOutFactoryService);
141148
}
142149

143150
@Bean
144-
MqttInMessageHandler publishMqttInMessageHandler(PublishReceivingService publishReceivingService) {
145-
return new PublishMqttInMessageHandler(publishReceivingService);
151+
MqttInMessageHandler publishMqttInMessageHandler(
152+
PublishReceivingService publishReceivingService,
153+
MessageOutFactoryService messageOutFactoryService,
154+
TopicService topicService) {
155+
return new PublishMqttInMessageHandler(
156+
publishReceivingService,
157+
messageOutFactoryService,
158+
topicService);
146159
}
147160

148161
@Bean
149-
MqttInMessageHandler publishReceiveMqttInMessageHandler() {
150-
return new PublishReceiveMqttInMessageHandler();
162+
MqttInMessageHandler publishReceiveMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
163+
return new PublishReceiveMqttInMessageHandler(messageOutFactoryService);
151164
}
152165

153166
@Bean
154-
MqttInMessageHandler publishReleaseMqttInMessageHandler() {
155-
return new PublishReleaseMqttInMessageHandler();
167+
MqttInMessageHandler publishReleaseMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
168+
return new PublishReleaseMqttInMessageHandler(messageOutFactoryService);
156169
}
157170

158171
@Bean
159-
MqttInMessageHandler disconnectMqttInMessageHandler() {
160-
return new DisconnectMqttInMessageHandler();
172+
MqttInMessageHandler disconnectMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
173+
return new DisconnectMqttInMessageHandler(messageOutFactoryService);
161174
}
162175

163176
@Bean
164177
MqttInMessageHandler subscribeMqttInMessageHandler(
165178
SubscriptionService subscriptionService,
166-
MessageOutFactoryService messageOutFactoryService) {
167-
return new SubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService);
179+
MessageOutFactoryService messageOutFactoryService,
180+
TopicService topicService) {
181+
return new SubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService, topicService);
168182
}
169183

170184
@Bean
171185
MqttInMessageHandler unsubscribeMqttInMessageHandler(
172186
SubscriptionService subscriptionService,
173-
MessageOutFactoryService messageOutFactoryService) {
174-
return new UnsubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService);
187+
MessageOutFactoryService messageOutFactoryService,
188+
TopicService topicService) {
189+
return new UnsubscribeMqttInMessageHandler(
190+
subscriptionService,
191+
messageOutFactoryService,
192+
topicService);
175193
}
176194

177195
@Bean
@@ -252,11 +270,11 @@ MqttClientReleaseHandler externalMqttClientReleaseHandler(
252270
@Bean
253271
MqttServerConnectionConfig externalConnectionConfig(Environment env) {
254272
return new MqttServerConnectionConfig(
255-
QoS.of(env.getProperty("mqtt.connection.max.qos", int.class, 2)),
273+
QoS.ofCode(env.getProperty("mqtt.connection.max.qos", int.class, 2)),
256274
env.getProperty(
257-
"mqtt.external.connection.max.packet.size",
275+
"mqtt.external.connection.max.message.size",
258276
int.class,
259-
MqttProperties.MAXIMUM_PACKET_SIZE_DEFAULT),
277+
MqttProperties.MAXIMUM_MESSAGE_SIZE_DEFAULT),
260278
env.getProperty(
261279
"mqtt.external.connection.max.string.length",
262280
int.class,
@@ -265,18 +283,22 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
265283
"mqtt.external.connection.max.binary.size",
266284
int.class,
267285
MqttProperties.MAXIMUM_BINARY_SIZE),
286+
env.getProperty(
287+
"mqtt.external.connection.max.topic.levels",
288+
int.class,
289+
MqttProperties.MAXIMUM_TOPIC_LEVELS),
268290
env.getProperty(
269291
"mqtt.external.connection.min.keep.alive",
270292
int.class,
271293
MqttProperties.SERVER_KEEP_ALIVE_DEFAULT),
272294
env.getProperty(
273295
"mqtt.external.connection.receive.maximum",
274296
int.class,
275-
MqttProperties.RECEIVE_MAXIMUM_DEFAULT),
297+
MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_DEFAULT),
276298
env.getProperty(
277299
"mqtt.external.connection.topic.alias.maximum",
278300
int.class,
279-
MqttProperties.TOPIC_ALIAS_MAXIMUM_DISABLED),
301+
MqttProperties.TOPIC_ALIAS_DEFAULT),
280302
env.getProperty(
281303
"mqtt.external.connection.default.session.expiration.time",
282304
long.class,

application/src/test/groovy/javasabr/mqtt/broker/application/ConnectSubscribePublishTest.groovy

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator
1111
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
1212
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode
1313

14-
import java.util.concurrent.atomic.AtomicReference
14+
import java.util.concurrent.CompletableFuture
1515

1616
class ConnectSubscribePublishTest extends IntegrationSpecification {
1717

18-
def "publisher should publish message QoS 0 using mqtt 3.1.1"() {
18+
def "should deliver publish message QoS 0 using mqtt 3.1.1"() {
1919
given:
20-
def received = new AtomicReference<Mqtt3Publish>()
20+
def received = new CompletableFuture<Mqtt3Publish>()
2121
def subscriber = buildExternalMqtt311Client()
2222
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
2323
def publisher = buildExternalMqtt311Client()
@@ -26,7 +26,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
2626
publisher.connect().join()
2727
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
2828
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
29-
Thread.sleep(100)
3029
then:
3130
noExceptionThrown()
3231
subscribeResult != null
@@ -35,17 +34,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
3534
publishResult != null
3635
publishResult.qos == MqttQos.AT_MOST_ONCE
3736
publishResult.type == Mqtt3MessageType.PUBLISH
38-
received.get() != null
39-
received.get().qos == MqttQos.AT_MOST_ONCE
40-
received.get().type == Mqtt3MessageType.PUBLISH
37+
received.join() != null
38+
received.join().qos == MqttQos.AT_MOST_ONCE
39+
received.join().type == Mqtt3MessageType.PUBLISH
4140
cleanup:
4241
subscriber.disconnect().join()
4342
publisher.disconnect().join()
4443
}
4544

46-
def "publisher should publish message QoS 0 using mqtt 5"() {
45+
def "should deliver publish message QoS 0 using mqtt 5"() {
4746
given:
48-
def received = new AtomicReference<Mqtt5Publish>()
47+
def received = new CompletableFuture<Mqtt5Publish>()
4948
def subscriber = buildExternalMqtt5Client()
5049
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
5150
def publisher = buildExternalMqtt5Client()
@@ -54,7 +53,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
5453
publisher.connect().join()
5554
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
5655
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
57-
Thread.sleep(100)
5856
then:
5957
noExceptionThrown()
6058
subscribeResult != null
@@ -63,17 +61,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
6361
publishResult != null
6462
publishResult.publish.qos == MqttQos.AT_MOST_ONCE
6563
publishResult.publish.type == Mqtt5MessageType.PUBLISH
66-
received.get() != null
67-
received.get().qos == MqttQos.AT_MOST_ONCE
68-
received.get().type == Mqtt5MessageType.PUBLISH
64+
received.join() != null
65+
received.join().qos == MqttQos.AT_MOST_ONCE
66+
received.join().type == Mqtt5MessageType.PUBLISH
6967
cleanup:
7068
subscriber.disconnect().join()
7169
publisher.disconnect().join()
7270
}
7371

74-
def "publisher should publish message QoS 1 using mqtt 3.1.1"() {
72+
def "should deliver publish message QoS 1 using mqtt 3.1.1"() {
7573
given:
76-
def received = new AtomicReference<Mqtt3Publish>()
74+
def received = new CompletableFuture<Mqtt3Publish>()
7775
def subscriber = buildExternalMqtt311Client()
7876
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
7977
def publisher = buildExternalMqtt311Client()
@@ -82,7 +80,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
8280
publisher.connect().join()
8381
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
8482
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)
85-
Thread.sleep(100)
8683
then:
8784
noExceptionThrown()
8885
subscribeResult != null
@@ -91,17 +88,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
9188
publishResult != null
9289
publishResult.qos == MqttQos.AT_LEAST_ONCE
9390
publishResult.type == Mqtt3MessageType.PUBLISH
94-
received.get() != null
95-
received.get().qos == MqttQos.AT_LEAST_ONCE
96-
received.get().type == Mqtt3MessageType.PUBLISH
91+
received.join() != null
92+
received.join().qos == MqttQos.AT_LEAST_ONCE
93+
received.join().type == Mqtt3MessageType.PUBLISH
9794
cleanup:
9895
subscriber.disconnect().join()
9996
publisher.disconnect().join()
10097
}
10198

102-
def "publisher should publish message QoS 1 using mqtt 5"() {
99+
def "should deliver publish message QoS 1 using mqtt 5"() {
103100
given:
104-
def received = new AtomicReference<Mqtt5Publish>()
101+
def received = new CompletableFuture<Mqtt5Publish>()
105102
def subscriber = buildExternalMqtt5Client()
106103
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
107104
def publisher = buildExternalMqtt5Client()
@@ -110,7 +107,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
110107
publisher.connect().join()
111108
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
112109
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)
113-
Thread.sleep(100)
114110
then:
115111
noExceptionThrown()
116112
subscribeResult != null
@@ -119,17 +115,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
119115
publishResult != null
120116
publishResult.publish.qos == MqttQos.AT_LEAST_ONCE
121117
publishResult.publish.type == Mqtt5MessageType.PUBLISH
122-
received.get() != null
123-
received.get().qos == MqttQos.AT_LEAST_ONCE
124-
received.get().type == Mqtt5MessageType.PUBLISH
118+
received.join() != null
119+
received.join().qos == MqttQos.AT_LEAST_ONCE
120+
received.join().type == Mqtt5MessageType.PUBLISH
125121
cleanup:
126122
subscriber.disconnect().join()
127123
publisher.disconnect().join()
128124
}
129125

130-
def "publisher should publish message QoS 2 using mqtt 3.1.1"() {
126+
def "should deliver publish message QoS 2 using mqtt 3.1.1"() {
131127
given:
132-
def received = new AtomicReference<Mqtt3Publish>()
128+
def received = new CompletableFuture<Mqtt3Publish>()
133129
def subscriber = buildExternalMqtt311Client()
134130
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
135131
def publisher = buildExternalMqtt311Client()
@@ -138,7 +134,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
138134
publisher.connect().join()
139135
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
140136
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)
141-
Thread.sleep(100)
142137
then:
143138
noExceptionThrown()
144139
subscribeResult != null
@@ -147,14 +142,14 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
147142
publishResult != null
148143
publishResult.qos == MqttQos.EXACTLY_ONCE
149144
publishResult.type == Mqtt3MessageType.PUBLISH
150-
received.get() != null
151-
received.get().qos == MqttQos.EXACTLY_ONCE
152-
received.get().type == Mqtt3MessageType.PUBLISH
145+
received.join() != null
146+
received.join().qos == MqttQos.EXACTLY_ONCE
147+
received.join().type == Mqtt3MessageType.PUBLISH
153148
}
154149

155-
def "publisher should publish message QoS 2 using mqtt 5"() {
150+
def "should deliver publish message QoS 2 using mqtt 5"() {
156151
given:
157-
def received = new AtomicReference<Mqtt5Publish>()
152+
def received = new CompletableFuture<Mqtt5Publish>()
158153
def subscriber = buildExternalMqtt5Client()
159154
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
160155
def publisher = buildExternalMqtt5Client()
@@ -172,9 +167,9 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
172167
publishResult != null
173168
publishResult.publish.qos == MqttQos.EXACTLY_ONCE
174169
publishResult.publish.type == Mqtt5MessageType.PUBLISH
175-
received.get() != null
176-
received.get().qos == MqttQos.EXACTLY_ONCE
177-
received.get().type == Mqtt5MessageType.PUBLISH
170+
received.join() != null
171+
received.join().qos == MqttQos.EXACTLY_ONCE
172+
received.join().type == Mqtt5MessageType.PUBLISH
178173
cleanup:
179174
subscriber.disconnect().join()
180175
publisher.disconnect().join()
@@ -194,11 +189,11 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
194189
Mqtt5AsyncClient subscriber,
195190
String subscriberId,
196191
MqttQos qos,
197-
AtomicReference<Mqtt5Publish> received) {
192+
CompletableFuture<Mqtt5Publish> received) {
198193
return subscriber.subscribeWith()
199194
.topicFilter("test/$subscriberId")
200195
.qos(qos)
201-
.callback({ publish -> received.set(publish) })
196+
.callback({ publish -> received.complete(publish) })
202197
.send()
203198
.join()
204199
}
@@ -216,11 +211,11 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
216211
Mqtt3AsyncClient subscriber,
217212
String subscriberId,
218213
MqttQos qos,
219-
AtomicReference<Mqtt3Publish> received) {
214+
CompletableFuture<Mqtt3Publish> received) {
220215
return subscriber.subscribeWith()
221216
.topicFilter("test/$subscriberId")
222217
.qos(qos)
223-
.callback({ publish -> received.set(publish) })
218+
.callback({ publish -> received.complete(publish) })
224219
.send()
225220
.join()
226221
}

0 commit comments

Comments
 (0)