Skip to content

Commit 072b95a

Browse files
authored
Reorganizing packet handling, part 1 (#45)
* next step of reorganizing packet handling
1 parent df97b2f commit 072b95a

File tree

106 files changed

+2050
-2215
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

106 files changed

+2050
-2215
lines changed

application/src/main/java/javasabr/mqtt/application/config/MqttBrokerConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package javasabr.mqtt.application.config;
22

33
import javasabr.mqtt.service.handler.client.DefaultMqttClientReleaseHandler;
4-
import javasabr.mqtt.network.handler.client.MqttClientReleaseHandler;
4+
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
55
import javasabr.mqtt.service.handler.in.ConnectInPacketHandler;
66
import javasabr.mqtt.service.handler.in.DisconnetInPacketHandler;
7-
import javasabr.mqtt.network.handler.packet.in.PacketInHandler;
7+
import javasabr.mqtt.network.handler.PacketInHandler;
88
import javasabr.mqtt.service.handler.in.PublishAckInPacketHandler;
99
import javasabr.mqtt.service.handler.in.PublishCompleteInPacketHandler;
1010
import javasabr.mqtt.service.handler.in.PublishInPacketHandler;
1111
import javasabr.mqtt.service.handler.in.PublishReceiveInPacketHandler;
1212
import javasabr.mqtt.service.handler.in.PublishReleaseInPacketHandler;
1313
import javasabr.mqtt.service.handler.in.SubscribeInPacketHandler;
1414
import javasabr.mqtt.service.handler.in.UnsubscribeInPacketHandler;
15-
import javasabr.mqtt.network.handler.publish.PublishInHandler;
15+
import javasabr.mqtt.network.handler.PublishInHandler;
1616
import javasabr.mqtt.service.handler.publish.in.Qos0PublishInHandler;
1717
import javasabr.mqtt.service.handler.publish.in.Qos1PublishInHandler;
1818
import javasabr.mqtt.service.handler.publish.in.Qos2PublishInHandler;

application/src/main/java/javasabr/mqtt/application/config/MqttNetworkConfig.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package javasabr.mqtt.application.config;
22

3-
import javasabr.mqtt.network.handler.client.MqttClientReleaseHandler;
4-
import javasabr.mqtt.network.handler.packet.in.PacketInHandler;
3+
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
4+
import javasabr.mqtt.network.handler.PacketInHandler;
55
import javasabr.mqtt.model.MqttProperties;
66
import javasabr.mqtt.model.QoS;
77
import javasabr.mqtt.network.MqttConnection;
@@ -12,7 +12,7 @@
1212
import java.nio.channels.AsynchronousSocketChannel;
1313
import java.util.function.BiFunction;
1414
import java.util.function.Consumer;
15-
import javasabr.mqtt.model.MqttConnectionConfig;
15+
import javasabr.mqtt.model.MqttServerConnectionConfig;
1616
import javasabr.mqtt.network.packet.in.MqttReadablePacket;
1717
import javasabr.rlib.network.BufferAllocator;
1818
import javasabr.rlib.network.Network;
@@ -23,7 +23,6 @@
2323
import javasabr.rlib.network.server.ServerNetwork;
2424
import lombok.CustomLog;
2525
import lombok.RequiredArgsConstructor;
26-
import lombok.extern.log4j.Log4j2;
2726
import org.springframework.context.annotation.Bean;
2827
import org.springframework.context.annotation.Configuration;
2928
import org.springframework.core.env.Environment;
@@ -76,7 +75,7 @@ BufferAllocator externalBufferAllocator(ServerNetworkConfig externalNetworkConfi
7675
ServerNetwork<MqttConnection> externalNetwork(
7776
ServerNetworkConfig externalNetworkConfig,
7877
BufferAllocator externalBufferAllocator,
79-
MqttConnectionConfig externalConnectionConfig,
78+
MqttServerConnectionConfig externalConnectionConfig,
8079
PacketInHandler[] packetHandlers,
8180
MqttClientReleaseHandler mqttClientReleaseHandler) {
8281
return NetworkFactory.serverNetwork(
@@ -92,7 +91,7 @@ ServerNetwork<MqttConnection> externalNetwork(
9291
ServerNetwork<MqttConnection> internalNetwork(
9392
ServerNetworkConfig internalNetworkConfig,
9493
BufferAllocator internalBufferAllocator,
95-
MqttConnectionConfig internalConnectionConfig,
94+
MqttServerConnectionConfig internalConnectionConfig,
9695
PacketInHandler[] packetHandlers,
9796
MqttClientReleaseHandler mqttClientReleaseHandler) {
9897
return NetworkFactory.serverNetwork(
@@ -153,13 +152,21 @@ Consumer<MqttConnection> internalConnectionConsumer() {
153152
}
154153

155154
@Bean
156-
MqttConnectionConfig externalConnectionConfig() {
157-
return new MqttConnectionConfig(
155+
MqttServerConnectionConfig externalConnectionConfig() {
156+
return new MqttServerConnectionConfig(
158157
QoS.of(env.getProperty("mqtt.connection.max.qos", int.class, 2)),
159158
env.getProperty(
160159
"mqtt.external.connection.max.packet.size",
161160
int.class,
162161
MqttProperties.MAXIMUM_PACKET_SIZE_DEFAULT),
162+
env.getProperty(
163+
"mqtt.external.connection.max.string.length",
164+
int.class,
165+
MqttProperties.MAXIMUM_STRING_LENGTH),
166+
env.getProperty(
167+
"mqtt.external.connection.max.binary.size",
168+
int.class,
169+
MqttProperties.MAXIMUM_BINARY_SIZE),
163170
env.getProperty(
164171
"mqtt.external.connection.min.keep.alive",
165172
int.class,
@@ -203,13 +210,21 @@ MqttConnectionConfig externalConnectionConfig() {
203210
}
204211

205212
@Bean
206-
MqttConnectionConfig internalConnectionConfig() {
207-
return new MqttConnectionConfig(
213+
MqttServerConnectionConfig internalConnectionConfig() {
214+
return new MqttServerConnectionConfig(
208215
QoS.of(env.getProperty("mqtt.internal.connection.max.qos", int.class, 2)),
209216
env.getProperty(
210217
"mqtt.internal.connection.max.packet.size",
211218
int.class,
212219
MqttProperties.MAXIMUM_PACKET_SIZE_DEFAULT),
220+
env.getProperty(
221+
"mqtt.internal.connection.max.string.length",
222+
int.class,
223+
MqttProperties.MAXIMUM_STRING_LENGTH),
224+
env.getProperty(
225+
"mqtt.internal.connection.max.binary.size",
226+
int.class,
227+
MqttProperties.MAXIMUM_BINARY_SIZE),
213228
env.getProperty(
214229
"mqtt.internal.connection.min.keep.alive",
215230
int.class,
@@ -254,7 +269,7 @@ MqttConnectionConfig internalConnectionConfig() {
254269

255270
private ChannelFactory externalConnectionFactory(
256271
BufferAllocator bufferAllocator,
257-
MqttConnectionConfig connectionConfig,
272+
MqttServerConnectionConfig connectionConfig,
258273
PacketInHandler[] packetHandlers,
259274
MqttClientReleaseHandler releaseHandler) {
260275
return connectionFactory(
@@ -267,7 +282,7 @@ private ChannelFactory externalConnectionFactory(
267282

268283
private ChannelFactory internalConnectionFactory(
269284
BufferAllocator bufferAllocator,
270-
MqttConnectionConfig connectionConfig,
285+
MqttServerConnectionConfig connectionConfig,
271286
PacketInHandler[] packetHandlers,
272287
MqttClientReleaseHandler releaseHandler) {
273288
return connectionFactory(
@@ -280,7 +295,7 @@ private ChannelFactory internalConnectionFactory(
280295

281296
private ChannelFactory connectionFactory(
282297
BufferAllocator bufferAllocator,
283-
MqttConnectionConfig connectionConfig,
298+
MqttServerConnectionConfig connectionConfig,
284299
PacketInHandler[] packetHandlers,
285300
MqttClientReleaseHandler releaseHandler,
286301
BiFunction<MqttConnection, MqttClientReleaseHandler, UnsafeMqttClient> clientFactory) {

application/src/test/groovy/javasabr/mqtt/application/extension/SpecificationExtensions.groovy

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,14 @@ class SpecificationExtensions extends Specification {
5656

5757
static ByteBuffer putProperty(ByteBuffer self, PacketProperty property, Array<?> values) {
5858

59-
switch (property.getDataType()) {
60-
case PacketDataType.UTF_8_STRING_PAIR:
59+
switch (property.dataType()) {
60+
case PacketDataType.UTF_8_STRING_PAIR: {
6161
writer.writeStringPairProperties(self, property, values as Array<StringPair>)
6262
break
63-
default:
63+
}
64+
default: {
6465
throw new IllegalStateException()
66+
}
6567
}
6668

6769
return self

application/src/test/groovy/javasabr/mqtt/application/integration/IntegrationSpecification.groovy

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
66
import javasabr.mqtt.network.MqttConnection
77
import javasabr.mqtt.application.integration.config.MqttBrokerTestConfig
88
import javasabr.mqtt.application.mock.MqttMockClient
9-
import javasabr.mqtt.model.MqttConnectionConfig
9+
import javasabr.mqtt.model.MqttServerConnectionConfig
1010
import javasabr.mqtt.model.MqttProperties
1111
import javasabr.mqtt.model.MqttVersion
1212
import org.springframework.beans.factory.annotation.Autowired
@@ -36,7 +36,7 @@ class IntegrationSpecification extends Specification {
3636
InetSocketAddress internalNetworkAddress
3737

3838
@Autowired
39-
MqttConnectionConfig externalConnectionConfig
39+
MqttServerConnectionConfig externalConnectionConfig
4040

4141
def buildExternalMqtt311Client() {
4242
return buildMqtt311Client(generateClientId(), externalNetworkAddress)
@@ -144,37 +144,37 @@ class IntegrationSpecification extends Specification {
144144
)
145145
}
146146

147-
def mqtt5MockedConnection(MqttConnectionConfig deviceConnectionConfig) {
147+
def mqtt5MockedConnection(MqttServerConnectionConfig deviceConnectionConfig) {
148148

149149
return Stub(MqttConnection) {
150150
isSupported(MqttVersion.MQTT_5) >> true
151151
isSupported(MqttVersion.MQTT_3_1_1) >> true
152-
config() >> deviceConnectionConfig
152+
serverConnectionConfig() >> deviceConnectionConfig
153153
client() >> Stub(UnsafeMqttClient) {
154154
connectionConfig() >> deviceConnectionConfig
155155
sessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
156-
receiveMax() >> deviceConnectionConfig.receiveMaximum()
157-
maximumPacketSize() >> deviceConnectionConfig.maximumPacketSize()
156+
receiveMaxPublishes() >> deviceConnectionConfig.receiveMaxPublishes()
157+
maxPacketSize() >> deviceConnectionConfig.maxPacketSize()
158158
clientId() >> IntegrationSpecification.clientId
159159
keepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
160-
topicAliasMaximum() >> deviceConnectionConfig.topicAliasMaximum()
160+
topicAliasMaxValue() >> deviceConnectionConfig.topicAliasMaxValue()
161161
}
162162
}
163163
}
164164

165-
def mqtt311MockedConnection(MqttConnectionConfig deviceConnectionConfig) {
165+
def mqtt311MockedConnection(MqttServerConnectionConfig deviceConnectionConfig) {
166166
return Stub(MqttConnection) {
167167
isSupported(MqttVersion.MQTT_5) >> false
168168
isSupported(MqttVersion.MQTT_3_1_1) >> true
169-
config() >> deviceConnectionConfig
169+
serverConnectionConfig() >> deviceConnectionConfig
170170
client() >> Stub(UnsafeMqttClient) {
171171
connectionConfig() >> deviceConnectionConfig
172172
sessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
173-
receiveMax() >> deviceConnectionConfig.receiveMaximum()
174-
maximumPacketSize() >> deviceConnectionConfig.maximumPacketSize()
173+
receiveMaxPublishes() >> deviceConnectionConfig.receiveMaxPublishes()
174+
maxPacketSize() >> deviceConnectionConfig.maxPacketSize()
175175
clientId() >> IntegrationSpecification.clientId
176176
keepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
177-
topicAliasMaximum() >> deviceConnectionConfig.topicAliasMaximum()
177+
topicAliasMaxValue() >> deviceConnectionConfig.topicAliasMaxValue()
178178
}
179179
}
180180
}

application/src/test/groovy/javasabr/mqtt/application/model/TopicSubscriberTest.groovy

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,17 @@ class TopicSubscriberTest extends NetworkUnitSpecification {
2323
TopicName topicNames,
2424
QoS[] subscriberQos,
2525
QoS[] matchedQos,
26-
MqttClient[] mqttClients
27-
) {
26+
MqttClient[] mqttClients) {
2827
given:
29-
def subscribeTopicFilter = Mock(SubscribeTopicFilter) {
30-
getQos() >>> subscriberQos
31-
getTopicFilter() >>> topicFilters
28+
SubscribeTopicFilter[] subscribeFilters = new SubscribeTopicFilter[mqttClients.length]
29+
mqttClients.eachWithIndex { MqttClient entry, int i ->
30+
subscribeFilters[i] = new SubscribeTopicFilter(topicFilters[i], subscriberQos[i])
3231
}
3332
def topicSubscriber = new TopicSubscribers()
3433
when:
35-
topicSubscriber.addSubscriber(mqttClients[0], subscribeTopicFilter)
36-
topicSubscriber.addSubscriber(mqttClients[1], subscribeTopicFilter)
37-
topicSubscriber.addSubscriber(mqttClients[2], subscribeTopicFilter)
34+
topicSubscriber.addSubscriber(mqttClients[0], subscribeFilters[0])
35+
topicSubscriber.addSubscriber(mqttClients[1], subscribeFilters[1])
36+
topicSubscriber.addSubscriber(mqttClients[2], subscribeFilters[2])
3837
then:
3938
def subscribers = topicSubscriber.matches(topicNames)
4039
subscribers.size() == matchedQos.size()
@@ -67,10 +66,10 @@ class TopicSubscriberTest extends NetworkUnitSpecification {
6766
[AT_LEAST_ONCE, AT_MOST_ONCE, EXACTLY_ONCE]
6867
]
6968
mqttClients << [
70-
[defaultMqttClient, defaultMqttClient, defaultMqttClient],
71-
[defaultMqttClient, defaultMqttClient, defaultMqttClient],
72-
[defaultMqttClient, defaultMqttClient, defaultMqttClient],
73-
[defaultMqttClient(), defaultMqttClient(), defaultMqttClient()]
69+
[defaultMqtt311Client, defaultMqtt311Client, defaultMqtt311Client],
70+
[defaultMqtt311Client, defaultMqtt311Client, defaultMqtt311Client],
71+
[defaultMqtt311Client, defaultMqtt311Client, defaultMqtt311Client],
72+
[newMqtt311Client(), newMqtt311Client(), newMqtt311Client()]
7473
]
7574
}
7675
}

0 commit comments

Comments
 (0)