Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javasabr.mqtt.model.MqttConnectionConfig;
import javasabr.mqtt.network.packet.in.MqttReadablePacket;
import javasabr.rlib.network.BufferAllocator;
import javasabr.rlib.network.Network;
import javasabr.rlib.network.NetworkFactory;
import javasabr.rlib.network.ServerNetworkConfig;
import javasabr.rlib.network.ServerNetworkConfig.SimpleServerNetworkConfig;
import javasabr.rlib.network.impl.DefaultBufferAllocator;
import javasabr.rlib.network.server.ServerNetwork;
import lombok.CustomLog;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

@Log4j2
@CustomLog
@Configuration
@RequiredArgsConstructor
public class MqttNetworkConfig {
Expand All @@ -44,7 +46,7 @@ ServerNetworkConfig internalNetworkConfig() {
.pendingBufferSize(env.getProperty("mqtt.internal.network.pending.buffer.size", int.class, 4096))
.writeBufferSize(env.getProperty("mqtt.internal.network.write.buffer.size", int.class, 2048))
.threadGroupName("InternalNetwork")
.threadGroupSize(env.getProperty("mqtt.internal.network.thread.count", int.class, 1))
.threadGroupMaxSize(env.getProperty("mqtt.internal.network.thread.count", int.class, 1))
.build();
}

Expand All @@ -56,7 +58,7 @@ ServerNetworkConfig externalNetworkConfig() {
.pendingBufferSize(env.getProperty("mqtt.external.network.pending.buffer.size", int.class, 200))
.writeBufferSize(env.getProperty("mqtt.external.network.write.buffer.size", int.class, 100))
.threadGroupName("ExternalNetwork")
.threadGroupSize(env.getProperty("mqtt.external.network.thread.count", int.class, 1))
.threadGroupMaxSize(env.getProperty("mqtt.external.network.thread.count", int.class, 1))
.build();
}

Expand All @@ -77,7 +79,7 @@ ServerNetwork<MqttConnection> externalNetwork(
MqttConnectionConfig externalConnectionConfig,
PacketInHandler[] packetHandlers,
MqttClientReleaseHandler mqttClientReleaseHandler) {
return NetworkFactory.newServerNetwork(
return NetworkFactory.serverNetwork(
externalNetworkConfig,
externalConnectionFactory(
externalBufferAllocator,
Expand All @@ -93,7 +95,7 @@ ServerNetwork<MqttConnection> internalNetwork(
MqttConnectionConfig internalConnectionConfig,
PacketInHandler[] packetHandlers,
MqttClientReleaseHandler mqttClientReleaseHandler) {
return NetworkFactory.newServerNetwork(
return NetworkFactory.serverNetwork(
internalNetworkConfig,
internalConnectionFactory(
internalBufferAllocator,
Expand Down Expand Up @@ -135,18 +137,18 @@ InetSocketAddress internalNetworkAddress(
@Bean
Consumer<MqttConnection> externalConnectionConsumer() {
return mqttConnection -> {
log.info("Accepted external connection: {}", mqttConnection);
var client = (UnsafeMqttClient) mqttConnection.getClient();
mqttConnection.onReceive((conn, packet) -> client.handle(packet));
log.info(mqttConnection.remoteAddress(), "[%s] Accepted external connection"::formatted);
var client = (UnsafeMqttClient) mqttConnection.client();
mqttConnection.onReceive((conn, packet) -> client.handle((MqttReadablePacket) packet));
};
}

@Bean
Consumer<MqttConnection> internalConnectionConsumer() {
return mqttConnection -> {
log.info("Accepted internal connection: {}", mqttConnection);
var client = (UnsafeMqttClient) mqttConnection.getClient();
mqttConnection.onReceive((conn, packet) -> client.handle(packet));
log.info(mqttConnection.remoteAddress(), "[%s] Accepted internal connection"::formatted);
var client = (UnsafeMqttClient) mqttConnection.client();
mqttConnection.onReceive((conn, packet) -> client.handle((MqttReadablePacket) packet));
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package javasabr.mqtt.application.extension

import javasabr.mqtt.network.MqttConnection
import javasabr.mqtt.network.packet.out.MqttWritablePacket
import javasabr.mqtt.network.utils.MqttDataUtils
import javasabr.mqtt.model.PacketProperty
Expand All @@ -16,7 +17,7 @@ class SpecificationExtensions extends Specification {
static final writer = new MqttWritablePacket() {

@Override
protected void writeImpl(ByteBuffer buffer) {}
protected void writeImpl(MqttConnection connection, ByteBuffer buffer) {}
}

static ByteBuffer putMbi(ByteBuffer self, int value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
when:
subscriber.connect().join()
publisher.connect().join()

def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)

Thread.sleep(100)
then:
noExceptionThrown()

subscribeResult != null
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_0)
subscribeResult.type == Mqtt3MessageType.SUBACK

publishResult != null
publishResult.qos == MqttQos.AT_MOST_ONCE
publishResult.type == Mqtt3MessageType.PUBLISH

received.get() != null
received.get().qos == MqttQos.AT_MOST_ONCE
received.get().type == Mqtt3MessageType.PUBLISH
Expand All @@ -57,22 +52,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
when:
subscriber.connect().join()
publisher.connect().join()

def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)

Thread.sleep(100)
then:
noExceptionThrown()

subscribeResult != null
subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_0)
subscribeResult.type == Mqtt5MessageType.SUBACK

publishResult != null
publishResult.publish.qos == MqttQos.AT_MOST_ONCE
publishResult.publish.type == Mqtt5MessageType.PUBLISH

received.get() != null
received.get().qos == MqttQos.AT_MOST_ONCE
received.get().type == Mqtt5MessageType.PUBLISH
Expand All @@ -90,22 +80,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
when:
subscriber.connect().join()
publisher.connect().join()

def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)

Thread.sleep(100)
then:
noExceptionThrown()

subscribeResult != null
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_1)
subscribeResult.type == Mqtt3MessageType.SUBACK

publishResult != null
publishResult.qos == MqttQos.AT_LEAST_ONCE
publishResult.type == Mqtt3MessageType.PUBLISH

received.get() != null
received.get().qos == MqttQos.AT_LEAST_ONCE
received.get().type == Mqtt3MessageType.PUBLISH
Expand All @@ -121,25 +106,19 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildExternalMqtt5Client()
when:

subscriber.connect().join()
publisher.connect().join()

def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)

Thread.sleep(100)
then:
noExceptionThrown()

subscribeResult != null
subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1)
subscribeResult.type == Mqtt5MessageType.SUBACK

publishResult != null
publishResult.publish.qos == MqttQos.AT_LEAST_ONCE
publishResult.publish.type == Mqtt5MessageType.PUBLISH

received.get() != null
received.get().qos == MqttQos.AT_LEAST_ONCE
received.get().type == Mqtt5MessageType.PUBLISH
Expand All @@ -157,28 +136,20 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
when:
subscriber.connect().join()
publisher.connect().join()

def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)

Thread.sleep(100)
then:
noExceptionThrown()

subscribeResult != null
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_2)
subscribeResult.type == Mqtt3MessageType.SUBACK

publishResult != null
publishResult.qos == MqttQos.EXACTLY_ONCE
publishResult.type == Mqtt3MessageType.PUBLISH

received.get() != null
received.get().qos == MqttQos.EXACTLY_ONCE
received.get().type == Mqtt3MessageType.PUBLISH
cleanup:
subscriber.disconnect().join()
publisher.disconnect().join()
}

def "publisher should publish message QoS 2 using mqtt 5"() {
Expand All @@ -188,25 +159,19 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildExternalMqtt5Client()
when:

subscriber.connect().join()
publisher.connect().join()

def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)

Thread.sleep(100)
then:
noExceptionThrown()

subscribeResult != null
subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_2)
subscribeResult.type == Mqtt5MessageType.SUBACK

publishResult != null
publishResult.publish.qos == MqttQos.EXACTLY_ONCE
publishResult.publish.type == Mqtt5MessageType.PUBLISH

received.get() != null
received.get().qos == MqttQos.EXACTLY_ONCE
received.get().type == Mqtt5MessageType.PUBLISH
Expand All @@ -229,8 +194,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
Mqtt5AsyncClient subscriber,
String subscriberId,
MqttQos qos,
AtomicReference<Mqtt5Publish> received
) {
AtomicReference<Mqtt5Publish> received) {
return subscriber.subscribeWith()
.topicFilter("test/$subscriberId")
.qos(qos)
Expand All @@ -252,8 +216,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
Mqtt3AsyncClient subscriber,
String subscriberId,
MqttQos qos,
AtomicReference<Mqtt3Publish> received
) {
AtomicReference<Mqtt3Publish> received) {
return subscriber.subscribeWith()
.topicFilter("test/$subscriberId")
.qos(qos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class IntegrationSpecification extends Specification {
.serverHost(address.getHostName())
.serverPort(address.getPort())
.useMqttVersion3()
.addDisconnectedListener {
println "[${clientId}|mqtt311] disconnected:[$it.cause]"
}
.build()
.toAsync()
}
Expand All @@ -90,6 +93,9 @@ class IntegrationSpecification extends Specification {
.serverHost(address.getHostName())
.serverPort(address.getPort())
.useMqttVersion5()
.addDisconnectedListener {
println "[${clientId}|mqtt5] disconnected:[$it.cause]"
}
.build()
.toAsync()
}
Expand Down Expand Up @@ -143,15 +149,15 @@ class IntegrationSpecification extends Specification {
return Stub(MqttConnection) {
isSupported(MqttVersion.MQTT_5) >> true
isSupported(MqttVersion.MQTT_3_1_1) >> true
getConfig() >> deviceConnectionConfig
getClient() >> Stub(UnsafeMqttClient) {
getConnectionConfig() >> deviceConnectionConfig
getSessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
getReceiveMax() >> deviceConnectionConfig.getReceiveMaximum()
getMaximumPacketSize() >> deviceConnectionConfig.getMaximumPacketSize()
getClientId() >> IntegrationSpecification.clientId
getKeepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
getTopicAliasMaximum() >> deviceConnectionConfig.getTopicAliasMaximum()
config() >> deviceConnectionConfig
client() >> Stub(UnsafeMqttClient) {
connectionConfig() >> deviceConnectionConfig
sessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
receiveMax() >> deviceConnectionConfig.receiveMaximum()
maximumPacketSize() >> deviceConnectionConfig.maximumPacketSize()
clientId() >> IntegrationSpecification.clientId
keepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
topicAliasMaximum() >> deviceConnectionConfig.topicAliasMaximum()
}
}
}
Expand All @@ -160,15 +166,15 @@ class IntegrationSpecification extends Specification {
return Stub(MqttConnection) {
isSupported(MqttVersion.MQTT_5) >> false
isSupported(MqttVersion.MQTT_3_1_1) >> true
getConfig() >> deviceConnectionConfig
getClient() >> Stub(UnsafeMqttClient) {
getConnectionConfig() >> deviceConnectionConfig
getSessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
getReceiveMax() >> deviceConnectionConfig.getReceiveMaximum()
getMaximumPacketSize() >> deviceConnectionConfig.getMaximumPacketSize()
getClientId() >> IntegrationSpecification.clientId
getKeepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
getTopicAliasMaximum() >> deviceConnectionConfig.getTopicAliasMaximum()
config() >> deviceConnectionConfig
client() >> Stub(UnsafeMqttClient) {
connectionConfig() >> deviceConnectionConfig
sessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
receiveMax() >> deviceConnectionConfig.receiveMaximum()
maximumPacketSize() >> deviceConnectionConfig.maximumPacketSize()
clientId() >> IntegrationSpecification.clientId
keepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
topicAliasMaximum() >> deviceConnectionConfig.topicAliasMaximum()
}
}
}
Expand Down
Loading
Loading