Skip to content

Commit 6b31d26

Browse files
committed
continue updating subscription service
1 parent 1074b1c commit 6b31d26

22 files changed

+140
-144
lines changed

application/src/test/groovy/javasabr/mqtt/broker/application/IntegrationSpecification.groovy

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ class IntegrationSpecification extends Specification {
137137

138138
def mqtt5MockedConnection(MqttServerConnectionConfig serverConnConfig) {
139139
MqttClientConnectionConfig clientConnConfig = new MqttClientConnectionConfig(
140+
serverConnConfig,
140141
serverConnConfig.maxQos(),
141142
MqttVersion.MQTT_5,
142143
MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED,
@@ -145,12 +146,7 @@ class IntegrationSpecification extends Specification {
145146
serverConnConfig.topicAliasMaxValue(),
146147
MqttProperties.SERVER_KEEP_ALIVE_DEFAULT,
147148
false,
148-
false,
149-
serverConnConfig.sessionsEnabled(),
150-
serverConnConfig.retainAvailable(),
151-
serverConnConfig.wildcardSubscriptionAvailable(),
152-
serverConnConfig.subscriptionIdAvailable(),
153-
serverConnConfig.sharedSubscriptionAvailable())
149+
false)
154150
def connectionRef = new AtomicReference<MqttConnection>()
155151
def connection = Stub(MqttConnection) {
156152
isSupported(MqttVersion.MQTT_5) >> true
@@ -168,6 +164,7 @@ class IntegrationSpecification extends Specification {
168164

169165
def mqtt311MockedConnection(MqttServerConnectionConfig serverConnConfig) {
170166
MqttClientConnectionConfig clientConnConfig = new MqttClientConnectionConfig(
167+
serverConnConfig,
171168
serverConnConfig.maxQos(),
172169
MqttVersion.MQTT_3_1_1,
173170
MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED,
@@ -176,12 +173,7 @@ class IntegrationSpecification extends Specification {
176173
serverConnConfig.topicAliasMaxValue(),
177174
MqttProperties.SERVER_KEEP_ALIVE_DEFAULT,
178175
false,
179-
false,
180-
serverConnConfig.sessionsEnabled(),
181-
serverConnConfig.retainAvailable(),
182-
serverConnConfig.wildcardSubscriptionAvailable(),
183-
serverConnConfig.subscriptionIdAvailable(),
184-
serverConnConfig.sharedSubscriptionAvailable())
176+
false)
185177
def connectionRef = new AtomicReference<MqttConnection>()
186178
def connection = Stub(MqttConnection) {
187179
isSupported(MqttVersion.MQTT_5) >> false

application/src/test/groovy/javasabr/mqtt/broker/application/PublishRetryTest.groovy

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import javasabr.mqtt.model.reason.code.ConnectAckReasonCode
66
import javasabr.mqtt.model.reason.code.PublishCompletedReasonCode
77
import javasabr.mqtt.model.reason.code.PublishReceivedReasonCode
88
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode
9+
import javasabr.mqtt.model.subscribtion.Subscription
10+
import javasabr.mqtt.model.topic.TopicFilter
911
import javasabr.mqtt.network.message.in.ConnectAckMqttInMessage
1012
import javasabr.mqtt.network.message.in.PublishMqttInMessage
1113
import javasabr.mqtt.network.message.in.PublishReleaseMqttInMessage
@@ -34,9 +36,8 @@ class PublishRetryTest extends IntegrationSpecification {
3436
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
3537
when:
3638
subscriber.send(new SubscribeMqtt311OutMessage(
37-
Array.of(new SubscribeTopicFilter("test/retry/$subscriberId", QoS.AT_LEAST_ONCE)),
38-
1
39-
))
39+
1,
40+
Array.of(Subscription.minimal(TopicFilter.valueOf("test/retry/$subscriberId"), QoS.AT_LEAST_ONCE))))
4041
def subscribeAck = subscriber.readNext() as SubscribeAckMqttInMessage
4142
then:
4243
subscribeAck.reasonCodes.stream()
@@ -82,9 +83,8 @@ class PublishRetryTest extends IntegrationSpecification {
8283
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
8384
when:
8485
subscriber.send(new SubscribeMqtt5OutMessage(
85-
Array.of(new SubscribeTopicFilter("test/retry/$subscriberId", QoS.AT_LEAST_ONCE)),
86-
1
87-
))
86+
1,
87+
Array.of(Subscription.minimal(TopicFilter.valueOf("test/retry/$subscriberId"), QoS.AT_LEAST_ONCE))))
8888
def subscribeAck = subscriber.readNext() as SubscribeAckMqttInMessage
8989
then:
9090
subscribeAck.reasonCodes.stream()
@@ -130,9 +130,8 @@ class PublishRetryTest extends IntegrationSpecification {
130130
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
131131
when:
132132
subscriber.send(new SubscribeMqtt311OutMessage(
133-
Array.of(new SubscribeTopicFilter("test/retry/$subscriberId", QoS.EXACTLY_ONCE)),
134-
1
135-
))
133+
1,
134+
Array.of(Subscription.minimal(TopicFilter.valueOf("test/retry/$subscriberId"), QoS.EXACTLY_ONCE))))
136135
def subscribeAck = subscriber.readNext() as SubscribeAckMqttInMessage
137136
then:
138137
subscribeAck.reasonCodes.stream()
@@ -193,9 +192,8 @@ class PublishRetryTest extends IntegrationSpecification {
193192
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
194193
when:
195194
subscriber.send(new SubscribeMqtt5OutMessage(
196-
Array.of(new SubscribeTopicFilter("test/retry/$subscriberId", QoS.EXACTLY_ONCE)),
197-
1
198-
))
195+
1,
196+
Array.of(Subscription.minimal(TopicFilter.valueOf("test/retry/$subscriberId"), QoS.EXACTLY_ONCE))))
199197
def subscribeAck = subscriber.readNext() as SubscribeAckMqttInMessage
200198
then:
201199
subscribeAck.reasonCodes.stream()

application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscribtionServiceTest.groovy

Lines changed: 75 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@ package javasabr.mqtt.broker.application.service
33
import com.hivemq.client.mqtt.datatypes.MqttQos
44
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException
55
import javasabr.mqtt.broker.application.IntegrationSpecification
6-
7-
import javasabr.mqtt.model.subscriber.SingleSubscriber
6+
import javasabr.mqtt.model.topic.TopicName
7+
import javasabr.mqtt.network.MqttClient
88
import javasabr.mqtt.service.ClientIdRegistry
99
import javasabr.mqtt.service.impl.InMemorySubscriptionService
10-
import org.spockframework.util.Pair
1110
import org.springframework.beans.factory.annotation.Autowired
1211
import spock.lang.Unroll
1312

@@ -24,156 +23,145 @@ class SubscribtionServiceTest extends IntegrationSpecification {
2423
def "should clear/restore topic subscribers after disconnect/reconnect"() {
2524
given:
2625
def subscriber = buildExternalMqtt5Client(clientId)
27-
def topicName = buildTopicName(topicFilter)
28-
29-
def matchesCount = 0
30-
SingleSubscriber matchedSubscriber = null
31-
def action = { subs, empty ->
32-
matchesCount++
33-
matchedSubscriber = subs
34-
ActionResult.SUCCESS
35-
}
26+
def topicName = TopicName.valueOf(topicFilter)
3627
when:
3728
subscriber.connectWith()
3829
.cleanStart(true)
3930
.send()
4031
.join()
4132
subscriber.subscribeWith()
4233
.topicFilter(topicFilter)
43-
.qos(AT_MOST_ONCE)
34+
.qos(MqttQos.AT_MOST_ONCE)
4435
.send()
4536
.join()
46-
47-
def actionResult = subscriptionService.forEachTopicSubscriber(topicName, null, action)
37+
def subscribers = subscriptionService
38+
.findSubscribers(topicName)
39+
then: "should find the subscriber"
40+
subscribers.size() == 1
41+
subscribers.get(0).owner() instanceof MqttClient
42+
when:
43+
def matchedSubscriber = subscribers.get(0)
44+
def subscription = matchedSubscriber.subscription()
45+
def owner = matchedSubscriber.owner() as MqttClient
4846
then:
49-
matchesCount == 1
50-
matchedSubscriber.user.clientId() == clientId
51-
matchedSubscriber.subscribe.topicFilter.getRawTopic() == topicFilter
52-
actionResult == ActionResult.SUCCESS
47+
owner.clientId() == clientId
48+
subscription.topicFilter().rawTopic() == topicFilter
5349
when:
5450
subscriber.disconnect().join()
51+
def subscribers2 = subscriptionService
52+
.findSubscribers(topicName)
53+
then: "shot not find anything after disconnection"
54+
subscribers2.size() == 0
55+
when:
5556
subscriber.connectWith()
5657
.cleanStart(false)
5758
.send()
5859
.join()
59-
60-
actionResult = subscriptionService.forEachTopicSubscriber(topicName, clientId, action)
60+
def subscribers3 = subscriptionService
61+
.findSubscribers(topicName)
62+
then: "should find the reconnected subscriber"
63+
subscribers3.size() == 1
64+
subscribers3.get(0).owner() instanceof MqttClient
65+
when:
66+
matchedSubscriber = subscribers3.get(0)
67+
subscription = matchedSubscriber.subscription()
68+
owner = matchedSubscriber.owner() as MqttClient
6169
then:
62-
matchesCount == 2
63-
matchedSubscriber.user.clientId() == clientId
64-
matchedSubscriber.subscribe.topicFilter.getRawTopic() == topicFilter
65-
actionResult == ActionResult.SUCCESS
70+
owner.clientId() == clientId
71+
subscription.topicFilter().rawTopic() == topicFilter
6672
cleanup:
6773
subscriber.disconnect().join()
6874
}
6975

7076
@Unroll
7177
def "should match subscriber with the highest QoS"(
7278
String topicName,
73-
Pair<String, MqttQos> topicFilter1,
74-
Pair<String, MqttQos> topicFilter2,
75-
String targetTopicFilter
76-
) {
79+
String topicFilter1,
80+
MqttQos qos1,
81+
String topicFilter2,
82+
MqttQos qos2,
83+
String expectedTopicFilter) {
7784
given:
7885
def subscriber = buildExternalMqtt5Client()
79-
80-
def matchesCount = 0
81-
SingleSubscriber matchedSubscriber = null
82-
def action = { subs, empty ->
83-
matchesCount++
84-
matchedSubscriber = subs
85-
ActionResult.SUCCESS
86-
}
8786
subscriber.connectWith()
8887
.send()
8988
.join()
9089
subscriber.subscribeWith()
91-
.topicFilter(topicFilter1.first())
92-
.qos(topicFilter1.second())
90+
.topicFilter(topicFilter1)
91+
.qos(qos1)
9392
.send()
9493
.join()
9594
subscriber.subscribeWith()
96-
.topicFilter(topicFilter2.first())
97-
.qos(topicFilter2.second())
95+
.topicFilter(topicFilter2)
96+
.qos(qos2)
9897
.send()
9998
.join()
10099
when:
101-
subscriptionService.forEachTopicSubscriber(buildTopicName(topicName), null, action)
100+
def subscribers = subscriptionService
101+
.findSubscribers(TopicName.valueOf(topicName))
102102
then:
103-
matchesCount == 1
104-
matchedSubscriber.subscribe.topicFilter.getRawTopic() == targetTopicFilter
103+
subscribers.size() == 1
104+
subscribers.get(0).subscription().topicFilter().rawTopic() == expectedTopicFilter
105105
cleanup:
106106
subscriber.disconnect().join()
107107
where:
108-
topicName | topicFilter1 | topicFilter2 | targetTopicFilter
109-
"topic/Filter" | of("topic/Filter", AT_MOST_ONCE) | of("topic/#", AT_LEAST_ONCE) | "topic/#"
110-
"topic/Filter" | of("topic/Filter", EXACTLY_ONCE) | of("topic/#", AT_LEAST_ONCE) | "topic/Filter"
111-
"topic/Another" | of("topic/Filter", EXACTLY_ONCE) | of("topic/#", AT_LEAST_ONCE) | "topic/#"
112-
"topic/Filter/First" | of("topic/+/First", AT_MOST_ONCE) | of("topic/#", AT_LEAST_ONCE) | "topic/#"
113-
"topic/Filter/First" | of("topic/+/First", EXACTLY_ONCE) | of("topic/#", AT_LEAST_ONCE) | "topic/+/First"
108+
topicName | topicFilter1 | qos1 | topicFilter2 | qos2 | expectedTopicFilter
109+
"topic/Filter" | "topic/Filter" | MqttQos.AT_MOST_ONCE | "topic/#" | MqttQos.AT_LEAST_ONCE | "topic/#"
110+
"topic/Filter" | "topic/Filter" | MqttQos.EXACTLY_ONCE | "topic/#" | MqttQos.AT_LEAST_ONCE | "topic/Filter"
111+
"topic/Another" | "topic/Filter" | MqttQos.EXACTLY_ONCE | "topic/#" | MqttQos.AT_LEAST_ONCE | "topic/#"
112+
"topic/Filter/First" | "topic/+/First" | MqttQos.AT_MOST_ONCE | "topic/#" | MqttQos.AT_LEAST_ONCE | "topic/#"
113+
"topic/Filter/First" | "topic/+/First" | MqttQos.EXACTLY_ONCE | "topic/#" | MqttQos.AT_LEAST_ONCE | "topic/+/First"
114114
}
115115

116116
@Unroll
117117
def "should match all subscribers with shared and single topic"(
118118
String topicName,
119-
Pair<String, MqttQos> topicFilter1,
120-
Pair<String, MqttQos> topicFilter2,
119+
String topicFilter1,
120+
MqttQos qos1,
121+
String topicFilter2,
122+
MqttQos qos2,
121123
String targetTopicFilter,
122-
int targetCount
123-
) {
124+
int targetCount) {
124125
given:
125126
def clientId1 = clientIdRegistry.generate().block()
126127
def clientId2 = clientIdRegistry.generate().block()
127128
def subscriber1 = buildExternalMqtt5Client(clientId1)
128129
def subscriber2 = buildExternalMqtt5Client(clientId2)
129-
130-
def matchesCount = 0
131-
def matchedSubscribers = new LinkedHashSet<String>()
132-
def action = { SingleSubscriber subscriber, String clientId ->
133-
matchesCount++
134-
matchedSubscribers.add(subscriber.user.clientId)
135-
ActionResult.SUCCESS
136-
}
137-
138130
subscriber1.connectWith()
139131
.send()
140132
.join()
141133
subscriber2.connectWith()
142134
.send()
143135
.join()
144-
145136
subscriber1.subscribeWith()
146-
.topicFilter(topicFilter1.first())
147-
.qos(topicFilter1.second())
137+
.topicFilter(topicFilter1)
138+
.qos(qos1)
148139
.send()
149140
.join()
150141
subscriber2.subscribeWith()
151-
.topicFilter(topicFilter2.first())
152-
.qos(topicFilter2.second())
142+
.topicFilter(topicFilter2)
143+
.qos(qos2)
153144
.send()
154145
.join()
155146
when:
156-
subscriptionService.forEachTopicSubscriber(buildTopicName(topicName), clientId, action)
147+
def subscribers = subscriptionService.findSubscribers(TopicName.valueOf(topicName))
157148
then:
158-
matchesCount == targetCount
159-
matchedSubscribers[0] == clientId1
160-
matchedSubscribers[1] == clientId2
149+
subscribers.size() == targetCount
150+
(subscribers[0].owner() as MqttClient).clientId() == clientId1
151+
(subscribers[1].owner() as MqttClient).clientId() == clientId2
161152
cleanup:
162153
subscriber1.disconnect().join()
163154
subscriber2.disconnect().join()
164155
where:
165-
topicName | topicFilter1 | topicFilter2 | targetTopicFilter | targetCount
166-
"topic/Filter" | of("\$share/group1/topic/Filter", AT_MOST_ONCE) | of("\$share/group2/topic/#", AT_LEAST_ONCE) | "topic/#" | 2
167-
"topic/Filter" | of("\$share/group1/topic/Filter", EXACTLY_ONCE) | of("topic/#", AT_LEAST_ONCE) | "topic/Filter" | 2
168-
"topic/Filter/First" | of("topic/+/First", AT_MOST_ONCE) | of("\$share/group2/topic/#", AT_LEAST_ONCE) | "topic/#" | 2
169-
"topic/Filter/First" | of("topic/+/First", EXACTLY_ONCE) | of("topic/#", AT_LEAST_ONCE) | "topic/+/First" | 2
156+
topicName | topicFilter1 | qos1 | topicFilter2 | qos2 | targetTopicFilter | targetCount
157+
"topic/Filter" | "\$share/group1/topic/Filter" | MqttQos.AT_MOST_ONCE | "\$share/group2/topic/#" | MqttQos.AT_LEAST_ONCE | "topic/#" | 2
158+
"topic/Filter" | "\$share/group1/topic/Filter" | MqttQos.EXACTLY_ONCE | "topic/#" | MqttQos.AT_LEAST_ONCE | "topic/Filter" | 2
159+
"topic/Filter/First" | "topic/+/First" | MqttQos.AT_MOST_ONCE | "\$share/group2/topic/#" | MqttQos.AT_LEAST_ONCE | "topic/#" | 2
160+
"topic/Filter/First" | "topic/+/First" | MqttQos.EXACTLY_ONCE | "topic/#" | MqttQos.AT_LEAST_ONCE | "topic/+/First" | 2
170161
}
171162

172163
@Unroll
173-
def "should reject subscribe with wrong topic filter"(
174-
String wrongTopicFilter,
175-
Class<Throwable> exception
176-
) {
164+
def "should reject subscribe with wrong topic filter"(String wrongTopicFilter, Class<Throwable> exception) {
177165
given:
178166
def subscriber = buildExternalMqtt5Client()
179167
when:
@@ -193,11 +181,11 @@ class SubscribtionServiceTest extends IntegrationSpecification {
193181
cleanup:
194182
subscriber.disconnect().join()
195183
where:
196-
wrongTopicFilter | exception
197-
"topic/" | CompletionException
198-
"topic//Filter" | CompletionException
199-
"/topic/Another" | CompletionException
200-
"topic/##" | IllegalArgumentException
201-
"++/Filter/First" | IllegalArgumentException
184+
wrongTopicFilter | exception
185+
"\$sys/topic/" | CompletionException
186+
"topic//Filter" | CompletionException
187+
"/topic/\u0000Another" | IllegalArgumentException
188+
"topic/##" | IllegalArgumentException
189+
"++/Filter/First" | IllegalArgumentException
202190
}
203191
}

model/src/main/java/javasabr/mqtt/model/publishing/Publish.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
package javasabr.mqtt.model.publishing;
22

3-
import javasabr.mqtt.model.TrackableMessage;
43
import javasabr.mqtt.model.PayloadFormat;
54
import javasabr.mqtt.model.QoS;
5+
import javasabr.mqtt.model.TrackableMessage;
66
import javasabr.mqtt.model.data.type.StringPair;
77
import javasabr.mqtt.model.topic.TopicName;
88
import javasabr.rlib.collections.array.Array;
99
import javasabr.rlib.collections.array.IntArray;
10+
import org.jspecify.annotations.Nullable;
1011

1112
public record Publish(
1213
int messageId,
1314
QoS qos,
1415
TopicName topicName,
15-
TopicName responseTopicName,
16+
@Nullable TopicName responseTopicName,
1617
byte[] payload,
1718
boolean duplicated,
1819
boolean retained,
19-
String contentType,
20+
@Nullable String contentType,
2021
IntArray subscriptionIds,
21-
byte[] correlationData,
22+
byte @Nullable [] correlationData,
2223
long messageExpiryInterval,
2324
int topicAlias,
2425
PayloadFormat payloadFormat,

model/src/main/java/javasabr/mqtt/model/subscriber/SharedSubscriber.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ public int size() {
6363
return subscribers.size();
6464
}
6565

66+
public boolean isEmpty() {
67+
return subscribers.isEmpty();
68+
}
69+
6670
public String group() {
6771
return topicFilter.shareName();
6872
}

0 commit comments

Comments
 (0)