Skip to content

Commit fc8b6c7

Browse files
committed
finish the first part of improving subscription service
1 parent 447b17b commit fc8b6c7

File tree

34 files changed

+620
-133
lines changed

34 files changed

+620
-133
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,9 +272,9 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
272272
return new MqttServerConnectionConfig(
273273
QoS.ofCode(env.getProperty("mqtt.connection.max.qos", int.class, 2)),
274274
env.getProperty(
275-
"mqtt.external.connection.max.packet.size",
275+
"mqtt.external.connection.max.message.size",
276276
int.class,
277-
MqttProperties.MAXIMUM_PACKET_SIZE_DEFAULT),
277+
MqttProperties.MAXIMUM_MESSAGE_SIZE_DEFAULT),
278278
env.getProperty(
279279
"mqtt.external.connection.max.string.length",
280280
int.class,
@@ -294,11 +294,11 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
294294
env.getProperty(
295295
"mqtt.external.connection.receive.maximum",
296296
int.class,
297-
MqttProperties.RECEIVE_MAXIMUM_DEFAULT),
297+
MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_DEFAULT),
298298
env.getProperty(
299299
"mqtt.external.connection.topic.alias.maximum",
300300
int.class,
301-
MqttProperties.TOPIC_ALIAS_MAXIMUM_DISABLED),
301+
MqttProperties.TOPIC_ALIAS_DEFAULT),
302302
env.getProperty(
303303
"mqtt.external.connection.default.session.expiration.time",
304304
long.class,

application/src/test/groovy/javasabr/mqtt/broker/application/IntegrationSpecification.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class IntegrationSpecification extends Specification {
142142
MqttVersion.MQTT_5,
143143
MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED,
144144
serverConnConfig.receiveMaxPublishes(),
145-
serverConnConfig.maxPacketSize(),
145+
serverConnConfig.maxMessageSize(),
146146
serverConnConfig.topicAliasMaxValue(),
147147
MqttProperties.SERVER_KEEP_ALIVE_DEFAULT,
148148
false,
@@ -169,7 +169,7 @@ class IntegrationSpecification extends Specification {
169169
MqttVersion.MQTT_3_1_1,
170170
MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED,
171171
serverConnConfig.receiveMaxPublishes(),
172-
serverConnConfig.maxPacketSize(),
172+
serverConnConfig.maxMessageSize(),
173173
serverConnConfig.topicAliasMaxValue(),
174174
MqttProperties.SERVER_KEEP_ALIVE_DEFAULT,
175175
false,

model/src/main/java/javasabr/mqtt/model/MqttClientConnectionConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ public record MqttClientConnectionConfig(
66
MqttVersion mqttVersion,
77
long sessionExpiryInterval,
88
int receiveMaxPublishes,
9-
int maxPacketSize,
9+
int maxMessageSize,
1010
int topicAliasMaxValue,
1111
int keepAlive,
1212
boolean requestResponseInformation,

model/src/main/java/javasabr/mqtt/model/MqttMessageProperty.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ public enum MqttMessageProperty {
3030
RESPONSE_INFORMATION(0x1A, MqttDataType.UTF_8_STRING),
3131
SERVER_REFERENCE(0x1C, MqttDataType.UTF_8_STRING),
3232
REASON_STRING(0x1F, MqttDataType.UTF_8_STRING),
33-
RECEIVE_MAXIMUM_PUBLISH(0x21, MqttDataType.SHORT),
33+
RECEIVE_MAXIMUM_PUBLISHES(0x21, MqttDataType.SHORT),
3434
TOPIC_ALIAS_MAXIMUM(0x22, MqttDataType.SHORT),
3535
TOPIC_ALIAS(0x23, MqttDataType.SHORT),
3636
MAXIMUM_QOS(0x24, MqttDataType.BYTE),
3737
RETAIN_AVAILABLE(0x25, MqttDataType.BYTE),
3838
USER_PROPERTY(0x26, MqttDataType.UTF_8_STRING_PAIR),
39-
MAXIMUM_PACKET_SIZE(0x27, MqttDataType.INTEGER),
39+
MAXIMUM_MESSAGE_SIZE(0x27, MqttDataType.INTEGER),
4040
WILDCARD_SUBSCRIPTION_AVAILABLE(0x28, MqttDataType.BYTE),
4141
SUBSCRIPTION_IDENTIFIER_AVAILABLE(0x29, MqttDataType.BYTE),
4242
SHARED_SUBSCRIPTION_AVAILABLE(0x2A, MqttDataType.BYTE);

model/src/main/java/javasabr/mqtt/model/MqttProperties.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ public interface MqttProperties {
44

55
QoS MAXIMUM_QOS_DEFAULT = QoS.EXACTLY_ONCE;
66

7-
int MAXIMUM_PROTOCOL_PACKET_SIZE = 256 * 1024 * 1024;
7+
int MAXIMUM_PROTOCOL_MESSAGE_SIZE = 256 * 1024 * 1024;
88
int MAXIMUM_PACKET_ID = 0xFFFF;
99

1010
long SESSION_EXPIRY_INTERVAL_DISABLED = 0;
@@ -13,24 +13,20 @@ public interface MqttProperties {
1313
long SESSION_EXPIRY_INTERVAL_INFINITY = 0xFFFFFFFFL;
1414
long SESSION_EXPIRY_INTERVAL_UNDEFINED = -1;
1515

16-
int RECEIVE_MAXIMUM_UNDEFINED = -1;
17-
int RECEIVE_MAXIMUM_MIN = 1;
18-
int RECEIVE_MAXIMUM_DEFAULT = 10;
19-
int RECEIVE_MAXIMUM_MAX = 0xFFFF;
16+
int RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED = -1;
17+
int RECEIVE_MAXIMUM_PUBLISHES_MIN = 1;
18+
int RECEIVE_MAXIMUM_PUBLISHES_DEFAULT = 10;
19+
int RECEIVE_MAXIMUM_PUBLISHES_MAX = 0xFFFF;
2020

21-
int MAXIMUM_PACKET_SIZE_UNDEFINED = -1;
22-
int MAXIMUM_PACKET_SIZE_DEFAULT = 1024;
23-
int MAXIMUM_PACKET_SIZE_MIN = 1;
24-
int MAXIMUM_PACKET_SIZE_MAX = MAXIMUM_PROTOCOL_PACKET_SIZE;
21+
int MAXIMUM_MESSAGE_SIZE_UNDEFINED = -1;
22+
int MAXIMUM_MESSAGE_SIZE_DEFAULT = 3074;
23+
int MAXIMUM_MESSAGE_SIZE_MIN = 128;
24+
int MAXIMUM_MESSAGE_SIZE_MAX = MAXIMUM_PROTOCOL_MESSAGE_SIZE;
2525

26-
int MAXIMUM_STRING_LENGTH = 2048;
26+
int MAXIMUM_STRING_LENGTH = 1024;
2727
int MAXIMUM_BINARY_SIZE = 2048;
2828
int MAXIMUM_TOPIC_LEVELS = 10;
2929

30-
int PAYLOAD_FORMAT_UNDEFINED = -1;
31-
int PAYLOAD_FORMAT_BINARY = 0;
32-
int PAYLOAD_FORMAT_UTF8_STRING = 1;
33-
3430
long MESSAGE_EXPIRY_INTERVAL_UNDEFINED = -1;
3531
long MESSAGE_EXPIRY_INTERVAL_INFINITY = 0;
3632

@@ -45,6 +41,7 @@ public interface MqttProperties {
4541

4642
int TOPIC_ALIAS_UNDEFINED = 0;
4743
int TOPIC_ALIAS_MIN = 1;
44+
int TOPIC_ALIAS_DEFAULT = 10;
4845
int TOPIC_ALIAS_MAX = 0xFFFF;
4946
int TOPIC_ALIAS_NOT_SET = 0;
5047

@@ -58,7 +55,7 @@ public interface MqttProperties {
5855
boolean RETAIN_AVAILABLE_DEFAULT = false;
5956
boolean WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT = false;
6057
boolean SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT = false;
61-
boolean SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT = false;
58+
boolean SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT = true;
6259

6360
int PACKET_ID_FOR_QOS_0 = 0;
6461
}

model/src/main/java/javasabr/mqtt/model/MqttServerConnectionConfig.java

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package javasabr.mqtt.model;
22

3+
import javasabr.rlib.common.util.NumberUtils;
4+
35
public record MqttServerConnectionConfig(
46
QoS maxQos,
5-
int maxPacketSize,
7+
int maxMessageSize,
68
int maxStringLength,
79
int maxBinarySize,
810
int maxTopicLevels,
@@ -16,4 +18,112 @@ public record MqttServerConnectionConfig(
1618
boolean wildcardSubscriptionAvailable,
1719
boolean subscriptionIdAvailable,
1820
boolean sharedSubscriptionAvailable) {
21+
22+
public MqttServerConnectionConfig(
23+
QoS maxQos,
24+
int maxMessageSize,
25+
int maxStringLength,
26+
int maxBinarySize,
27+
int maxTopicLevels,
28+
int minKeepAliveTime,
29+
int receiveMaxPublishes,
30+
int topicAliasMaxValue,
31+
long defaultSessionExpiryInterval,
32+
boolean keepAliveEnabled,
33+
boolean sessionsEnabled,
34+
boolean retainAvailable,
35+
boolean wildcardSubscriptionAvailable,
36+
boolean subscriptionIdAvailable,
37+
boolean sharedSubscriptionAvailable) {
38+
this.maxQos = maxQos;
39+
this.maxMessageSize = NumberUtils.validate(
40+
maxMessageSize,
41+
MqttProperties.MAXIMUM_MESSAGE_SIZE_MIN,
42+
MqttProperties.MAXIMUM_MESSAGE_SIZE_MAX);
43+
this.maxStringLength = NumberUtils.validate(
44+
maxStringLength,
45+
1,
46+
maxMessageSize / 2);
47+
this.maxBinarySize = NumberUtils.validate(
48+
maxBinarySize,
49+
1,
50+
maxMessageSize);
51+
this.maxTopicLevels = NumberUtils.validate(
52+
maxTopicLevels,
53+
1,
54+
Byte.MAX_VALUE);
55+
this.minKeepAliveTime = NumberUtils.validate(
56+
minKeepAliveTime,
57+
MqttProperties.SERVER_KEEP_ALIVE_MIN,
58+
MqttProperties.SERVER_KEEP_ALIVE_MAX);
59+
this.receiveMaxPublishes = receiveMaxPublishes;
60+
this.topicAliasMaxValue = NumberUtils.validate(
61+
topicAliasMaxValue,
62+
MqttProperties.TOPIC_ALIAS_MIN,
63+
MqttProperties.TOPIC_ALIAS_MAX);
64+
this.defaultSessionExpiryInterval = defaultSessionExpiryInterval;
65+
this.keepAliveEnabled = keepAliveEnabled;
66+
this.sessionsEnabled = sessionsEnabled;
67+
this.retainAvailable = retainAvailable;
68+
this.wildcardSubscriptionAvailable = wildcardSubscriptionAvailable;
69+
this.subscriptionIdAvailable = subscriptionIdAvailable;
70+
this.sharedSubscriptionAvailable = sharedSubscriptionAvailable;
71+
}
72+
73+
public MqttServerConnectionConfig withMaxQos(QoS maxQos) {
74+
return new MqttServerConnectionConfig(
75+
maxQos,
76+
maxMessageSize,
77+
maxStringLength,
78+
maxBinarySize,
79+
maxTopicLevels,
80+
minKeepAliveTime,
81+
receiveMaxPublishes,
82+
topicAliasMaxValue,
83+
defaultSessionExpiryInterval,
84+
keepAliveEnabled,
85+
sessionsEnabled,
86+
retainAvailable,
87+
wildcardSubscriptionAvailable,
88+
subscriptionIdAvailable,
89+
sharedSubscriptionAvailable);
90+
}
91+
92+
public MqttServerConnectionConfig withWildcardSubscriptionAvailable(boolean wildcardSubscriptionAvailable) {
93+
return new MqttServerConnectionConfig(
94+
maxQos,
95+
maxMessageSize,
96+
maxStringLength,
97+
maxBinarySize,
98+
maxTopicLevels,
99+
minKeepAliveTime,
100+
receiveMaxPublishes,
101+
topicAliasMaxValue,
102+
defaultSessionExpiryInterval,
103+
keepAliveEnabled,
104+
sessionsEnabled,
105+
retainAvailable,
106+
wildcardSubscriptionAvailable,
107+
subscriptionIdAvailable,
108+
sharedSubscriptionAvailable);
109+
}
110+
111+
public MqttServerConnectionConfig withSharedSubscriptionAvailable(boolean sharedSubscriptionAvailable) {
112+
return new MqttServerConnectionConfig(
113+
maxQos,
114+
maxMessageSize,
115+
maxStringLength,
116+
maxBinarySize,
117+
maxTopicLevels,
118+
minKeepAliveTime,
119+
receiveMaxPublishes,
120+
topicAliasMaxValue,
121+
defaultSessionExpiryInterval,
122+
keepAliveEnabled,
123+
sessionsEnabled,
124+
retainAvailable,
125+
wildcardSubscriptionAvailable,
126+
subscriptionIdAvailable,
127+
sharedSubscriptionAvailable);
128+
}
19129
}

model/src/main/java/javasabr/mqtt/model/subscribtion/Subscription.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package javasabr.mqtt.model.subscribtion;
22

33
import com.fasterxml.jackson.annotation.JsonValue;
4+
import javasabr.mqtt.model.MqttProperties;
45
import javasabr.mqtt.model.QoS;
56
import javasabr.mqtt.model.SubscribeRetainHandling;
67
import javasabr.mqtt.model.topic.TopicFilter;
@@ -10,6 +11,10 @@ public record Subscription(
1011
The subscriber's topic filter.
1112
*/
1213
TopicFilter topicFilter,
14+
/*
15+
* The associated ID for the subscription
16+
*/
17+
int subscriptionId,
1318
/*
1419
Maximum QoS field. This gives the maximum QoS level at which the Server can send Application Messages to the
1520
Client.
@@ -36,6 +41,7 @@ public record Subscription(
3641
public static Subscription minimal(TopicFilter topicFilter, QoS qos) {
3742
return new Subscription(
3843
topicFilter,
44+
MqttProperties.SUBSCRIPTION_ID_UNDEFINED,
3945
qos,
4046
SubscribeRetainHandling.SEND,
4147
true,

model/src/test/groovy/javasabr/mqtt/model/topic/tree/TopicTreeTest.groovy

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package javasabr.mqtt.model.topic.tree
22

3+
import javasabr.mqtt.model.MqttProperties
34
import javasabr.mqtt.model.QoS
45
import javasabr.mqtt.model.SubscribeRetainHandling
56
import javasabr.mqtt.model.subscriber.SingleSubscriber
@@ -545,6 +546,7 @@ class TopicTreeTest extends UnitSpecification {
545546
static def makeSubscription(String topicFilter) {
546547
return new Subscription(
547548
TopicFilter.valueOf(topicFilter),
549+
MqttProperties.SUBSCRIPTION_ID_UNDEFINED,
548550
QoS.AT_LEAST_ONCE,
549551
SubscribeRetainHandling.SEND,
550552
true,
@@ -554,6 +556,7 @@ class TopicTreeTest extends UnitSpecification {
554556
static def makeSharedSubscription(String topicFilter) {
555557
return new Subscription(
556558
SharedTopicFilter.valueOf(topicFilter),
559+
MqttProperties.SUBSCRIPTION_ID_UNDEFINED,
557560
QoS.AT_LEAST_ONCE,
558561
SubscribeRetainHandling.SEND,
559562
true,
@@ -563,6 +566,7 @@ class TopicTreeTest extends UnitSpecification {
563566
static def makeSubscription(String topicFilter, int qos) {
564567
return new Subscription(
565568
TopicFilter.valueOf(topicFilter),
569+
MqttProperties.SUBSCRIPTION_ID_UNDEFINED,
566570
QoS.ofCode(qos),
567571
SubscribeRetainHandling.SEND,
568572
true,

network/src/main/java/javasabr/mqtt/network/MqttSession.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,6 @@ default void resend(MqttClient client, Publish publish) {}
6161
void removeSubscription(TopicFilter subscribe);
6262

6363
Array<Subscription> storedSubscriptions();
64+
65+
Array<Subscription> findStoredSubscriptionWithId(int subscriptionId);
6466
}

network/src/main/java/javasabr/mqtt/network/message/in/ConnectAckMqttInMessage.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class ConnectAckMqttInMessage extends MqttInMessage {
4949
5050
If the Receive Maximum value is absent, then its value defaults to 65,535.
5151
*/
52-
MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISH,
52+
MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES,
5353
/*
5454
Followed by a Byte with a value of either 0 or 1. It is a Protocol Error to include Maximum QoS more than
5555
once, or to have a value other than 0 or 1. If the Maximum QoS is absent, the Client uses a Maximum
@@ -104,7 +104,7 @@ public class ConnectAckMqttInMessage extends MqttInMessage {
104104
a Server receives a packet whose size exceeds this limit, this is a Protocol Error, the Server uses
105105
DISCONNECT with Reason Code 0x95 (Packet too large), as described in section 4.13.
106106
*/
107-
MqttMessageProperty.MAXIMUM_PACKET_SIZE,
107+
MqttMessageProperty.MAXIMUM_MESSAGE_SIZE,
108108
/*
109109
Followed by the UTF-8 string which is the Assigned Client Identifier. It is a Protocol Error to include the
110110
Assigned Client Identifier more than once.
@@ -261,7 +261,7 @@ public class ConnectAckMqttInMessage extends MqttInMessage {
261261
long sessionExpiryInterval;
262262

263263
int receiveMaxPublishes;
264-
int maxPacketSize;
264+
int maxMessageSize;
265265
int topicAliasMaxValue;
266266
int serverKeepAlive;
267267

@@ -286,10 +286,10 @@ public ConnectAckMqttInMessage(byte info) {
286286
this.authenticationMethod = StringUtils.EMPTY;
287287
this.authenticationData = ArrayUtils.EMPTY_BYTE_ARRAY;
288288
this.serverKeepAlive = MqttProperties.SERVER_KEEP_ALIVE_UNDEFINED;
289-
this.maxPacketSize = MqttProperties.MAXIMUM_PACKET_SIZE_UNDEFINED;
289+
this.maxMessageSize = MqttProperties.MAXIMUM_MESSAGE_SIZE_UNDEFINED;
290290
this.sessionExpiryInterval = MqttProperties.SESSION_EXPIRY_INTERVAL_UNDEFINED;
291291
this.topicAliasMaxValue = MqttProperties.TOPIC_ALIAS_MAXIMUM_UNDEFINED;
292-
this.receiveMaxPublishes = MqttProperties.RECEIVE_MAXIMUM_UNDEFINED;
292+
this.receiveMaxPublishes = MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED;
293293
}
294294

295295
@Override
@@ -336,10 +336,10 @@ protected void applyProperty(MqttMessageProperty property, long value) {
336336
case SHARED_SUBSCRIPTION_AVAILABLE -> sharedSubscriptionAvailable = NumberUtils.toBoolean(value);
337337
case SUBSCRIPTION_IDENTIFIER_AVAILABLE -> subscriptionIdAvailable = NumberUtils.toBoolean(value);
338338
case RETAIN_AVAILABLE -> retainAvailable = NumberUtils.toBoolean(value);
339-
case RECEIVE_MAXIMUM_PUBLISH -> receiveMaxPublishes = (int) NumberUtils.validate(
339+
case RECEIVE_MAXIMUM_PUBLISHES -> receiveMaxPublishes = (int) NumberUtils.validate(
340340
value,
341-
MqttProperties.RECEIVE_MAXIMUM_MIN,
342-
MqttProperties.RECEIVE_MAXIMUM_MAX);
341+
MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_MIN,
342+
MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_MAX);
343343
case MAXIMUM_QOS -> maximumQos = QoS.ofCode((int) value);
344344
case SERVER_KEEP_ALIVE -> serverKeepAlive = NumberUtils.validate(
345345
(int) value,
@@ -353,10 +353,10 @@ protected void applyProperty(MqttMessageProperty property, long value) {
353353
value,
354354
MqttProperties.SESSION_EXPIRY_INTERVAL_MIN,
355355
MqttProperties.SESSION_EXPIRY_INTERVAL_INFINITY);
356-
case MAXIMUM_PACKET_SIZE -> maxPacketSize = NumberUtils.validate(
356+
case MAXIMUM_MESSAGE_SIZE -> maxMessageSize = NumberUtils.validate(
357357
(int) value,
358-
MqttProperties.MAXIMUM_PACKET_SIZE_MIN,
359-
MqttProperties.MAXIMUM_PACKET_SIZE_MAX);
358+
MqttProperties.MAXIMUM_MESSAGE_SIZE_MIN,
359+
MqttProperties.MAXIMUM_MESSAGE_SIZE_MAX);
360360
default -> unexpectedProperty(property);
361361
}
362362
}

0 commit comments

Comments
 (0)