Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package javasabr.mqtt.application.config;

import javasabr.mqtt.service.handler.client.DefaultMqttClientReleaseHandler;
import javasabr.mqtt.network.handler.client.MqttClientReleaseHandler;
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
import javasabr.mqtt.service.handler.in.ConnectInPacketHandler;
import javasabr.mqtt.service.handler.in.DisconnetInPacketHandler;
import javasabr.mqtt.network.handler.packet.in.PacketInHandler;
import javasabr.mqtt.network.handler.PacketInHandler;
import javasabr.mqtt.service.handler.in.PublishAckInPacketHandler;
import javasabr.mqtt.service.handler.in.PublishCompleteInPacketHandler;
import javasabr.mqtt.service.handler.in.PublishInPacketHandler;
import javasabr.mqtt.service.handler.in.PublishReceiveInPacketHandler;
import javasabr.mqtt.service.handler.in.PublishReleaseInPacketHandler;
import javasabr.mqtt.service.handler.in.SubscribeInPacketHandler;
import javasabr.mqtt.service.handler.in.UnsubscribeInPacketHandler;
import javasabr.mqtt.network.handler.publish.PublishInHandler;
import javasabr.mqtt.network.handler.PublishInHandler;
import javasabr.mqtt.service.handler.publish.in.Qos0PublishInHandler;
import javasabr.mqtt.service.handler.publish.in.Qos1PublishInHandler;
import javasabr.mqtt.service.handler.publish.in.Qos2PublishInHandler;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package javasabr.mqtt.application.config;

import javasabr.mqtt.network.handler.client.MqttClientReleaseHandler;
import javasabr.mqtt.network.handler.packet.in.PacketInHandler;
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
import javasabr.mqtt.network.handler.PacketInHandler;
import javasabr.mqtt.model.MqttProperties;
import javasabr.mqtt.model.QoS;
import javasabr.mqtt.network.MqttConnection;
Expand All @@ -12,7 +12,7 @@
import java.nio.channels.AsynchronousSocketChannel;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javasabr.mqtt.model.MqttConnectionConfig;
import javasabr.mqtt.model.MqttServerConnectionConfig;
import javasabr.mqtt.network.packet.in.MqttReadablePacket;
import javasabr.rlib.network.BufferAllocator;
import javasabr.rlib.network.Network;
Expand All @@ -23,7 +23,6 @@
import javasabr.rlib.network.server.ServerNetwork;
import lombok.CustomLog;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
Expand Down Expand Up @@ -76,7 +75,7 @@ BufferAllocator externalBufferAllocator(ServerNetworkConfig externalNetworkConfi
ServerNetwork<MqttConnection> externalNetwork(
ServerNetworkConfig externalNetworkConfig,
BufferAllocator externalBufferAllocator,
MqttConnectionConfig externalConnectionConfig,
MqttServerConnectionConfig externalConnectionConfig,
PacketInHandler[] packetHandlers,
MqttClientReleaseHandler mqttClientReleaseHandler) {
return NetworkFactory.serverNetwork(
Expand All @@ -92,7 +91,7 @@ ServerNetwork<MqttConnection> externalNetwork(
ServerNetwork<MqttConnection> internalNetwork(
ServerNetworkConfig internalNetworkConfig,
BufferAllocator internalBufferAllocator,
MqttConnectionConfig internalConnectionConfig,
MqttServerConnectionConfig internalConnectionConfig,
PacketInHandler[] packetHandlers,
MqttClientReleaseHandler mqttClientReleaseHandler) {
return NetworkFactory.serverNetwork(
Expand Down Expand Up @@ -153,13 +152,21 @@ Consumer<MqttConnection> internalConnectionConsumer() {
}

@Bean
MqttConnectionConfig externalConnectionConfig() {
return new MqttConnectionConfig(
MqttServerConnectionConfig externalConnectionConfig() {
return new MqttServerConnectionConfig(
QoS.of(env.getProperty("mqtt.connection.max.qos", int.class, 2)),
env.getProperty(
"mqtt.external.connection.max.packet.size",
int.class,
MqttProperties.MAXIMUM_PACKET_SIZE_DEFAULT),
env.getProperty(
"mqtt.external.connection.max.string.length",
int.class,
MqttProperties.MAXIMUM_STRING_LENGTH),
env.getProperty(
"mqtt.external.connection.max.binary.size",
int.class,
MqttProperties.MAXIMUM_BINARY_SIZE),
env.getProperty(
"mqtt.external.connection.min.keep.alive",
int.class,
Expand Down Expand Up @@ -203,13 +210,21 @@ MqttConnectionConfig externalConnectionConfig() {
}

@Bean
MqttConnectionConfig internalConnectionConfig() {
return new MqttConnectionConfig(
MqttServerConnectionConfig internalConnectionConfig() {
return new MqttServerConnectionConfig(
QoS.of(env.getProperty("mqtt.internal.connection.max.qos", int.class, 2)),
env.getProperty(
"mqtt.internal.connection.max.packet.size",
int.class,
MqttProperties.MAXIMUM_PACKET_SIZE_DEFAULT),
env.getProperty(
"mqtt.internal.connection.max.string.length",
int.class,
MqttProperties.MAXIMUM_STRING_LENGTH),
env.getProperty(
"mqtt.internal.connection.max.binary.size",
int.class,
MqttProperties.MAXIMUM_BINARY_SIZE),
env.getProperty(
"mqtt.internal.connection.min.keep.alive",
int.class,
Expand Down Expand Up @@ -254,7 +269,7 @@ MqttConnectionConfig internalConnectionConfig() {

private ChannelFactory externalConnectionFactory(
BufferAllocator bufferAllocator,
MqttConnectionConfig connectionConfig,
MqttServerConnectionConfig connectionConfig,
PacketInHandler[] packetHandlers,
MqttClientReleaseHandler releaseHandler) {
return connectionFactory(
Expand All @@ -267,7 +282,7 @@ private ChannelFactory externalConnectionFactory(

private ChannelFactory internalConnectionFactory(
BufferAllocator bufferAllocator,
MqttConnectionConfig connectionConfig,
MqttServerConnectionConfig connectionConfig,
PacketInHandler[] packetHandlers,
MqttClientReleaseHandler releaseHandler) {
return connectionFactory(
Expand All @@ -280,7 +295,7 @@ private ChannelFactory internalConnectionFactory(

private ChannelFactory connectionFactory(
BufferAllocator bufferAllocator,
MqttConnectionConfig connectionConfig,
MqttServerConnectionConfig connectionConfig,
PacketInHandler[] packetHandlers,
MqttClientReleaseHandler releaseHandler,
BiFunction<MqttConnection, MqttClientReleaseHandler, UnsafeMqttClient> clientFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ class SpecificationExtensions extends Specification {

static ByteBuffer putProperty(ByteBuffer self, PacketProperty property, Array<?> values) {

switch (property.getDataType()) {
case PacketDataType.UTF_8_STRING_PAIR:
switch (property.dataType()) {
case PacketDataType.UTF_8_STRING_PAIR: {
writer.writeStringPairProperties(self, property, values as Array<StringPair>)
break
default:
}
default: {
throw new IllegalStateException()
}
}

return self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
import javasabr.mqtt.network.MqttConnection
import javasabr.mqtt.application.integration.config.MqttBrokerTestConfig
import javasabr.mqtt.application.mock.MqttMockClient
import javasabr.mqtt.model.MqttConnectionConfig
import javasabr.mqtt.model.MqttServerConnectionConfig
import javasabr.mqtt.model.MqttProperties
import javasabr.mqtt.model.MqttVersion
import org.springframework.beans.factory.annotation.Autowired
Expand Down Expand Up @@ -36,7 +36,7 @@ class IntegrationSpecification extends Specification {
InetSocketAddress internalNetworkAddress

@Autowired
MqttConnectionConfig externalConnectionConfig
MqttServerConnectionConfig externalConnectionConfig

def buildExternalMqtt311Client() {
return buildMqtt311Client(generateClientId(), externalNetworkAddress)
Expand Down Expand Up @@ -144,37 +144,37 @@ class IntegrationSpecification extends Specification {
)
}

def mqtt5MockedConnection(MqttConnectionConfig deviceConnectionConfig) {
def mqtt5MockedConnection(MqttServerConnectionConfig deviceConnectionConfig) {

return Stub(MqttConnection) {
isSupported(MqttVersion.MQTT_5) >> true
isSupported(MqttVersion.MQTT_3_1_1) >> true
config() >> deviceConnectionConfig
serverConnectionConfig() >> deviceConnectionConfig
client() >> Stub(UnsafeMqttClient) {
connectionConfig() >> deviceConnectionConfig
sessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
receiveMax() >> deviceConnectionConfig.receiveMaximum()
maximumPacketSize() >> deviceConnectionConfig.maximumPacketSize()
receiveMaxPublishes() >> deviceConnectionConfig.receiveMaxPublishes()
maxPacketSize() >> deviceConnectionConfig.maxPacketSize()
clientId() >> IntegrationSpecification.clientId
keepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
topicAliasMaximum() >> deviceConnectionConfig.topicAliasMaximum()
topicAliasMaxValue() >> deviceConnectionConfig.topicAliasMaxValue()
}
}
}

def mqtt311MockedConnection(MqttConnectionConfig deviceConnectionConfig) {
def mqtt311MockedConnection(MqttServerConnectionConfig deviceConnectionConfig) {
return Stub(MqttConnection) {
isSupported(MqttVersion.MQTT_5) >> false
isSupported(MqttVersion.MQTT_3_1_1) >> true
config() >> deviceConnectionConfig
serverConnectionConfig() >> deviceConnectionConfig
client() >> Stub(UnsafeMqttClient) {
connectionConfig() >> deviceConnectionConfig
sessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
receiveMax() >> deviceConnectionConfig.receiveMaximum()
maximumPacketSize() >> deviceConnectionConfig.maximumPacketSize()
receiveMaxPublishes() >> deviceConnectionConfig.receiveMaxPublishes()
maxPacketSize() >> deviceConnectionConfig.maxPacketSize()
clientId() >> IntegrationSpecification.clientId
keepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
topicAliasMaximum() >> deviceConnectionConfig.topicAliasMaximum()
topicAliasMaxValue() >> deviceConnectionConfig.topicAliasMaxValue()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ class TopicSubscriberTest extends NetworkUnitSpecification {
TopicName topicNames,
QoS[] subscriberQos,
QoS[] matchedQos,
MqttClient[] mqttClients
) {
MqttClient[] mqttClients) {
given:
def subscribeTopicFilter = Mock(SubscribeTopicFilter) {
getQos() >>> subscriberQos
getTopicFilter() >>> topicFilters
SubscribeTopicFilter[] subscribeFilters = new SubscribeTopicFilter[mqttClients.length]
mqttClients.eachWithIndex { MqttClient entry, int i ->
subscribeFilters[i] = new SubscribeTopicFilter(topicFilters[i], subscriberQos[i])
}
def topicSubscriber = new TopicSubscribers()
when:
topicSubscriber.addSubscriber(mqttClients[0], subscribeTopicFilter)
topicSubscriber.addSubscriber(mqttClients[1], subscribeTopicFilter)
topicSubscriber.addSubscriber(mqttClients[2], subscribeTopicFilter)
topicSubscriber.addSubscriber(mqttClients[0], subscribeFilters[0])
topicSubscriber.addSubscriber(mqttClients[1], subscribeFilters[1])
topicSubscriber.addSubscriber(mqttClients[2], subscribeFilters[2])
then:
def subscribers = topicSubscriber.matches(topicNames)
subscribers.size() == matchedQos.size()
Expand Down Expand Up @@ -67,10 +66,10 @@ class TopicSubscriberTest extends NetworkUnitSpecification {
[AT_LEAST_ONCE, AT_MOST_ONCE, EXACTLY_ONCE]
]
mqttClients << [
[defaultMqttClient, defaultMqttClient, defaultMqttClient],
[defaultMqttClient, defaultMqttClient, defaultMqttClient],
[defaultMqttClient, defaultMqttClient, defaultMqttClient],
[defaultMqttClient(), defaultMqttClient(), defaultMqttClient()]
[defaultMqtt311Client, defaultMqtt311Client, defaultMqtt311Client],
[defaultMqtt311Client, defaultMqtt311Client, defaultMqtt311Client],
[defaultMqtt311Client, defaultMqtt311Client, defaultMqtt311Client],
[newMqtt311Client(), newMqtt311Client(), newMqtt311Client()]
]
}
}
Loading
Loading