diff --git a/application/src/main/java/javasabr/mqtt/application/config/MqttNetworkConfig.java b/application/src/main/java/javasabr/mqtt/application/config/MqttNetworkConfig.java index a37b3a58..3198d858 100644 --- a/application/src/main/java/javasabr/mqtt/application/config/MqttNetworkConfig.java +++ b/application/src/main/java/javasabr/mqtt/application/config/MqttNetworkConfig.java @@ -13,6 +13,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import javasabr.mqtt.model.MqttConnectionConfig; +import javasabr.mqtt.network.packet.in.MqttReadablePacket; import javasabr.rlib.network.BufferAllocator; import javasabr.rlib.network.Network; import javasabr.rlib.network.NetworkFactory; @@ -20,13 +21,14 @@ import javasabr.rlib.network.ServerNetworkConfig.SimpleServerNetworkConfig; import javasabr.rlib.network.impl.DefaultBufferAllocator; import javasabr.rlib.network.server.ServerNetwork; +import lombok.CustomLog; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; -@Log4j2 +@CustomLog @Configuration @RequiredArgsConstructor public class MqttNetworkConfig { @@ -44,7 +46,7 @@ ServerNetworkConfig internalNetworkConfig() { .pendingBufferSize(env.getProperty("mqtt.internal.network.pending.buffer.size", int.class, 4096)) .writeBufferSize(env.getProperty("mqtt.internal.network.write.buffer.size", int.class, 2048)) .threadGroupName("InternalNetwork") - .threadGroupSize(env.getProperty("mqtt.internal.network.thread.count", int.class, 1)) + .threadGroupMaxSize(env.getProperty("mqtt.internal.network.thread.count", int.class, 1)) .build(); } @@ -56,7 +58,7 @@ ServerNetworkConfig externalNetworkConfig() { .pendingBufferSize(env.getProperty("mqtt.external.network.pending.buffer.size", int.class, 200)) .writeBufferSize(env.getProperty("mqtt.external.network.write.buffer.size", int.class, 100)) .threadGroupName("ExternalNetwork") - .threadGroupSize(env.getProperty("mqtt.external.network.thread.count", int.class, 1)) + .threadGroupMaxSize(env.getProperty("mqtt.external.network.thread.count", int.class, 1)) .build(); } @@ -77,7 +79,7 @@ ServerNetwork externalNetwork( MqttConnectionConfig externalConnectionConfig, PacketInHandler[] packetHandlers, MqttClientReleaseHandler mqttClientReleaseHandler) { - return NetworkFactory.newServerNetwork( + return NetworkFactory.serverNetwork( externalNetworkConfig, externalConnectionFactory( externalBufferAllocator, @@ -93,7 +95,7 @@ ServerNetwork internalNetwork( MqttConnectionConfig internalConnectionConfig, PacketInHandler[] packetHandlers, MqttClientReleaseHandler mqttClientReleaseHandler) { - return NetworkFactory.newServerNetwork( + return NetworkFactory.serverNetwork( internalNetworkConfig, internalConnectionFactory( internalBufferAllocator, @@ -135,18 +137,18 @@ InetSocketAddress internalNetworkAddress( @Bean Consumer externalConnectionConsumer() { return mqttConnection -> { - log.info("Accepted external connection: {}", mqttConnection); - var client = (UnsafeMqttClient) mqttConnection.getClient(); - mqttConnection.onReceive((conn, packet) -> client.handle(packet)); + log.info(mqttConnection.remoteAddress(), "[%s] Accepted external connection"::formatted); + var client = (UnsafeMqttClient) mqttConnection.client(); + mqttConnection.onReceive((conn, packet) -> client.handle((MqttReadablePacket) packet)); }; } @Bean Consumer internalConnectionConsumer() { return mqttConnection -> { - log.info("Accepted internal connection: {}", mqttConnection); - var client = (UnsafeMqttClient) mqttConnection.getClient(); - mqttConnection.onReceive((conn, packet) -> client.handle(packet)); + log.info(mqttConnection.remoteAddress(), "[%s] Accepted internal connection"::formatted); + var client = (UnsafeMqttClient) mqttConnection.client(); + mqttConnection.onReceive((conn, packet) -> client.handle((MqttReadablePacket) packet)); }; } diff --git a/application/src/test/groovy/javasabr/mqtt/application/extension/SpecificationExtensions.groovy b/application/src/test/groovy/javasabr/mqtt/application/extension/SpecificationExtensions.groovy index 2eb6da91..8f1e2d06 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/extension/SpecificationExtensions.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/extension/SpecificationExtensions.groovy @@ -1,5 +1,6 @@ package javasabr.mqtt.application.extension +import javasabr.mqtt.network.MqttConnection import javasabr.mqtt.network.packet.out.MqttWritablePacket import javasabr.mqtt.network.utils.MqttDataUtils import javasabr.mqtt.model.PacketProperty @@ -16,7 +17,7 @@ class SpecificationExtensions extends Specification { static final writer = new MqttWritablePacket() { @Override - protected void writeImpl(ByteBuffer buffer) {} + protected void writeImpl(MqttConnection connection, ByteBuffer buffer) {} } static ByteBuffer putMbi(ByteBuffer self, int value) { diff --git a/application/src/test/groovy/javasabr/mqtt/application/integration/ConnectSubscribePublishTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/integration/ConnectSubscribePublishTest.groovy index a55f4898..4cc64905 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/integration/ConnectSubscribePublishTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/integration/ConnectSubscribePublishTest.groovy @@ -24,22 +24,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { when: subscriber.connect().join() publisher.connect().join() - def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received) def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE) - Thread.sleep(100) then: noExceptionThrown() - subscribeResult != null subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_0) subscribeResult.type == Mqtt3MessageType.SUBACK - publishResult != null publishResult.qos == MqttQos.AT_MOST_ONCE publishResult.type == Mqtt3MessageType.PUBLISH - received.get() != null received.get().qos == MqttQos.AT_MOST_ONCE received.get().type == Mqtt3MessageType.PUBLISH @@ -57,22 +52,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { when: subscriber.connect().join() publisher.connect().join() - def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received) def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE) - Thread.sleep(100) then: noExceptionThrown() - subscribeResult != null subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_0) subscribeResult.type == Mqtt5MessageType.SUBACK - publishResult != null publishResult.publish.qos == MqttQos.AT_MOST_ONCE publishResult.publish.type == Mqtt5MessageType.PUBLISH - received.get() != null received.get().qos == MqttQos.AT_MOST_ONCE received.get().type == Mqtt5MessageType.PUBLISH @@ -90,22 +80,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { when: subscriber.connect().join() publisher.connect().join() - def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received) def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE) - Thread.sleep(100) then: noExceptionThrown() - subscribeResult != null subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_1) subscribeResult.type == Mqtt3MessageType.SUBACK - publishResult != null publishResult.qos == MqttQos.AT_LEAST_ONCE publishResult.type == Mqtt3MessageType.PUBLISH - received.get() != null received.get().qos == MqttQos.AT_LEAST_ONCE received.get().type == Mqtt3MessageType.PUBLISH @@ -121,25 +106,19 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { def subscriberId = subscriber.getConfig().clientIdentifier.get().toString() def publisher = buildExternalMqtt5Client() when: - subscriber.connect().join() publisher.connect().join() - def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received) def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE) - Thread.sleep(100) then: noExceptionThrown() - subscribeResult != null subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1) subscribeResult.type == Mqtt5MessageType.SUBACK - publishResult != null publishResult.publish.qos == MqttQos.AT_LEAST_ONCE publishResult.publish.type == Mqtt5MessageType.PUBLISH - received.get() != null received.get().qos == MqttQos.AT_LEAST_ONCE received.get().type == Mqtt5MessageType.PUBLISH @@ -157,28 +136,20 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { when: subscriber.connect().join() publisher.connect().join() - def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received) def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE) - Thread.sleep(100) then: noExceptionThrown() - subscribeResult != null subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_2) subscribeResult.type == Mqtt3MessageType.SUBACK - publishResult != null publishResult.qos == MqttQos.EXACTLY_ONCE publishResult.type == Mqtt3MessageType.PUBLISH - received.get() != null received.get().qos == MqttQos.EXACTLY_ONCE received.get().type == Mqtt3MessageType.PUBLISH - cleanup: - subscriber.disconnect().join() - publisher.disconnect().join() } def "publisher should publish message QoS 2 using mqtt 5"() { @@ -188,25 +159,19 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { def subscriberId = subscriber.getConfig().clientIdentifier.get().toString() def publisher = buildExternalMqtt5Client() when: - subscriber.connect().join() publisher.connect().join() - def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received) def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE) - Thread.sleep(100) then: noExceptionThrown() - subscribeResult != null subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_2) subscribeResult.type == Mqtt5MessageType.SUBACK - publishResult != null publishResult.publish.qos == MqttQos.EXACTLY_ONCE publishResult.publish.type == Mqtt5MessageType.PUBLISH - received.get() != null received.get().qos == MqttQos.EXACTLY_ONCE received.get().type == Mqtt5MessageType.PUBLISH @@ -229,8 +194,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { Mqtt5AsyncClient subscriber, String subscriberId, MqttQos qos, - AtomicReference received - ) { + AtomicReference received) { return subscriber.subscribeWith() .topicFilter("test/$subscriberId") .qos(qos) @@ -252,8 +216,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { Mqtt3AsyncClient subscriber, String subscriberId, MqttQos qos, - AtomicReference received - ) { + AtomicReference received) { return subscriber.subscribeWith() .topicFilter("test/$subscriberId") .qos(qos) diff --git a/application/src/test/groovy/javasabr/mqtt/application/integration/IntegrationSpecification.groovy b/application/src/test/groovy/javasabr/mqtt/application/integration/IntegrationSpecification.groovy index 8f0493ec..95cab6a7 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/integration/IntegrationSpecification.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/integration/IntegrationSpecification.groovy @@ -70,6 +70,9 @@ class IntegrationSpecification extends Specification { .serverHost(address.getHostName()) .serverPort(address.getPort()) .useMqttVersion3() + .addDisconnectedListener { + println "[${clientId}|mqtt311] disconnected:[$it.cause]" + } .build() .toAsync() } @@ -90,6 +93,9 @@ class IntegrationSpecification extends Specification { .serverHost(address.getHostName()) .serverPort(address.getPort()) .useMqttVersion5() + .addDisconnectedListener { + println "[${clientId}|mqtt5] disconnected:[$it.cause]" + } .build() .toAsync() } @@ -143,15 +149,15 @@ class IntegrationSpecification extends Specification { return Stub(MqttConnection) { isSupported(MqttVersion.MQTT_5) >> true isSupported(MqttVersion.MQTT_3_1_1) >> true - getConfig() >> deviceConnectionConfig - getClient() >> Stub(UnsafeMqttClient) { - getConnectionConfig() >> deviceConnectionConfig - getSessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED - getReceiveMax() >> deviceConnectionConfig.getReceiveMaximum() - getMaximumPacketSize() >> deviceConnectionConfig.getMaximumPacketSize() - getClientId() >> IntegrationSpecification.clientId - getKeepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT - getTopicAliasMaximum() >> deviceConnectionConfig.getTopicAliasMaximum() + config() >> deviceConnectionConfig + client() >> Stub(UnsafeMqttClient) { + connectionConfig() >> deviceConnectionConfig + sessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED + receiveMax() >> deviceConnectionConfig.receiveMaximum() + maximumPacketSize() >> deviceConnectionConfig.maximumPacketSize() + clientId() >> IntegrationSpecification.clientId + keepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT + topicAliasMaximum() >> deviceConnectionConfig.topicAliasMaximum() } } } @@ -160,15 +166,15 @@ class IntegrationSpecification extends Specification { return Stub(MqttConnection) { isSupported(MqttVersion.MQTT_5) >> false isSupported(MqttVersion.MQTT_3_1_1) >> true - getConfig() >> deviceConnectionConfig - getClient() >> Stub(UnsafeMqttClient) { - getConnectionConfig() >> deviceConnectionConfig - getSessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED - getReceiveMax() >> deviceConnectionConfig.getReceiveMaximum() - getMaximumPacketSize() >> deviceConnectionConfig.getMaximumPacketSize() - getClientId() >> IntegrationSpecification.clientId - getKeepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT - getTopicAliasMaximum() >> deviceConnectionConfig.getTopicAliasMaximum() + config() >> deviceConnectionConfig + client() >> Stub(UnsafeMqttClient) { + connectionConfig() >> deviceConnectionConfig + sessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED + receiveMax() >> deviceConnectionConfig.receiveMaximum() + maximumPacketSize() >> deviceConnectionConfig.maximumPacketSize() + clientId() >> IntegrationSpecification.clientId + keepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT + topicAliasMaximum() >> deviceConnectionConfig.topicAliasMaximum() } } } diff --git a/application/src/test/groovy/javasabr/mqtt/application/integration/PublishRetryTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/integration/PublishRetryTest.groovy index 5024a309..51c33073 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/integration/PublishRetryTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/integration/PublishRetryTest.groovy @@ -21,6 +21,9 @@ import javasabr.mqtt.model.reason.code.PublishReceivedReasonCode import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode import javasabr.mqtt.model.subscriber.SubscribeTopicFilter import javasabr.rlib.collections.array.Array +import javasabr.rlib.logger.api.LoggerLevel +import javasabr.rlib.logger.api.LoggerManager +import javasabr.rlib.network.packet.impl.AbstractNetworkPacketReader import org.springframework.beans.factory.annotation.Autowired class PublishRetryTest extends IntegrationSpecification { @@ -34,51 +37,38 @@ class PublishRetryTest extends IntegrationSpecification { def subscriber = buildMqtt311MockClient() def subscriberId = generateClientId() when: - publisher.connect().join() - subscriber.connect() subscriber.send(new Connect311OutPacket(subscriberId, keepAlive)) - def connectAck = subscriber.readNext() as ConnectAckInPacket - then: connectAck.reasonCode == ConnectAckReasonCode.SUCCESS when: - subscriber.send(new Subscribe311OutPacket( Array.of(new SubscribeTopicFilter("test/retry/$subscriberId", QoS.AT_LEAST_ONCE)), 1 )) - def subscribeAck = subscriber.readNext() as SubscribeAckInPacket - then: subscribeAck.reasonCodes.stream() .allMatch({ it == SubscribeAckReasonCode.GRANTED_QOS_1 }) when: - publisher.publishWith() .topic("test/retry/$subscriberId") .qos(MqttQos.AT_MOST_ONCE) .payload(publishPayload) .send() .join() - def receivedPublish = subscriber.readNext() as PublishInPacket - then: receivedPublish.payload == publishPayload when: - - subscriber.close() - + subscriber.disconnect() + Thread.sleep(1_000) subscriber.connect() subscriber.send(new Connect311OutPacket(subscriberId, keepAlive)) - connectAck = subscriber.readNext() as ConnectAckInPacket def receivedDupPublish = subscriber.readNext() as PublishInPacket - then: connectAck.reasonCode == ConnectAckReasonCode.SUCCESS receivedDupPublish.duplicate @@ -95,30 +85,22 @@ class PublishRetryTest extends IntegrationSpecification { def subscriber = buildMqtt5MockClient() def subscriberId = generateClientId() when: - publisher.connect().join() - subscriber.connect() subscriber.send(new Connect5OutPacket(subscriberId, keepAlive)) - def connectAck = subscriber.readNext() as ConnectAckInPacket - then: connectAck.reasonCode == ConnectAckReasonCode.SUCCESS when: - subscriber.send(new Subscribe5OutPacket( Array.of(new SubscribeTopicFilter("test/retry/$subscriberId", QoS.AT_LEAST_ONCE)), 1 )) - def subscribeAck = subscriber.readNext() as SubscribeAckInPacket - then: subscribeAck.reasonCodes.stream() .allMatch({ it == SubscribeAckReasonCode.GRANTED_QOS_1 }) when: - publisher.publishWith() .topic("test/retry/$subscriberId") .qos(MqttQos.AT_MOST_ONCE) @@ -127,19 +109,14 @@ class PublishRetryTest extends IntegrationSpecification { .join() def receivedPublish = subscriber.readNext() as PublishInPacket - then: receivedPublish.payload == publishPayload when: - - subscriber.close() - + subscriber.disconnect() subscriber.connect() subscriber.send(new Connect5OutPacket(subscriberId, keepAlive)) - connectAck = subscriber.readNext() as ConnectAckInPacket def receivedDupPublish = subscriber.readNext() as PublishInPacket - then: connectAck.reasonCode == ConnectAckReasonCode.SUCCESS receivedDupPublish.duplicate @@ -156,30 +133,22 @@ class PublishRetryTest extends IntegrationSpecification { def subscriber = buildMqtt311MockClient() def subscriberId = generateClientId() when: - publisher.connect().join() - subscriber.connect() subscriber.send(new Connect311OutPacket(subscriberId, keepAlive)) - def connectAck = subscriber.readNext() as ConnectAckInPacket - then: connectAck.reasonCode == ConnectAckReasonCode.SUCCESS when: - subscriber.send(new Subscribe311OutPacket( Array.of(new SubscribeTopicFilter("test/retry/$subscriberId", QoS.EXACTLY_ONCE)), 1 )) - def subscribeAck = subscriber.readNext() as SubscribeAckInPacket - then: subscribeAck.reasonCodes.stream() .allMatch({ it == SubscribeAckReasonCode.GRANTED_QOS_2 }) when: - publisher.publishWith() .topic("test/retry/$subscriberId") .qos(MqttQos.AT_MOST_ONCE) @@ -188,39 +157,28 @@ class PublishRetryTest extends IntegrationSpecification { .join() def receivedPublish = subscriber.readNext() as PublishInPacket - then: receivedPublish.payload == publishPayload when: - - subscriber.close() - + subscriber.disconnect() subscriber.connect() subscriber.send(new Connect311OutPacket(subscriberId, keepAlive)) - connectAck = subscriber.readNext() as ConnectAckInPacket def receivedDupPublish = subscriber.readNext() as PublishInPacket - then: connectAck.reasonCode == ConnectAckReasonCode.SUCCESS receivedDupPublish.duplicate receivedDupPublish.packetId == receivedPublish.packetId receivedDupPublish.payload == publishPayload when: - - subscriber.close() - + subscriber.disconnect() subscriber.connect() subscriber.send(new Connect311OutPacket(subscriberId, keepAlive)) - connectAck = subscriber.readNext() as ConnectAckInPacket receivedDupPublish = subscriber.readNext() as PublishInPacket - subscriber.send(new PublishReceived311OutPacket(receivedDupPublish.getPacketId())) def releaseAck = subscriber.readNext() as PublishReleaseInPacket - subscriber.send(new PublishComplete311OutPacket(receivedDupPublish.getPacketId())) - then: connectAck.reasonCode == ConnectAckReasonCode.SUCCESS receivedDupPublish.duplicate @@ -238,30 +196,22 @@ class PublishRetryTest extends IntegrationSpecification { def subscriber = buildMqtt5MockClient() def subscriberId = generateClientId() when: - publisher.connect().join() - subscriber.connect() subscriber.send(new Connect5OutPacket(subscriberId, keepAlive)) - def connectAck = subscriber.readNext() as ConnectAckInPacket - then: connectAck.reasonCode == ConnectAckReasonCode.SUCCESS when: - subscriber.send(new Subscribe5OutPacket( Array.of(new SubscribeTopicFilter("test/retry/$subscriberId", QoS.EXACTLY_ONCE)), 1 )) - def subscribeAck = subscriber.readNext() as SubscribeAckInPacket - then: subscribeAck.reasonCodes.stream() .allMatch({ it == SubscribeAckReasonCode.GRANTED_QOS_2 }) when: - publisher.publishWith() .topic("test/retry/$subscriberId") .qos(MqttQos.AT_MOST_ONCE) @@ -270,46 +220,35 @@ class PublishRetryTest extends IntegrationSpecification { .join() def receivedPublish = subscriber.readNext() as PublishInPacket - then: receivedPublish.payload == publishPayload when: - - subscriber.close() - + subscriber.disconnect() subscriber.connect() subscriber.send(new Connect5OutPacket(subscriberId, keepAlive)) - connectAck = subscriber.readNext() as ConnectAckInPacket def receivedDupPublish = subscriber.readNext() as PublishInPacket - then: connectAck.reasonCode == ConnectAckReasonCode.SUCCESS receivedDupPublish.duplicate receivedDupPublish.packetId == receivedPublish.packetId receivedDupPublish.payload == publishPayload when: - - subscriber.close() - + subscriber.disconnect() subscriber.connect() subscriber.send(new Connect5OutPacket(subscriberId, keepAlive)) - connectAck = subscriber.readNext() as ConnectAckInPacket receivedDupPublish = subscriber.readNext() as PublishInPacket - subscriber.send(new PublishReceived5OutPacket( receivedDupPublish.getPacketId(), PublishReceivedReasonCode.SUCCESS )) def releaseAck = subscriber.readNext() as PublishReleaseInPacket - subscriber.send(new PublishComplete5OutPacket( receivedDupPublish.getPacketId(), PublishCompletedReasonCode.SUCCESS )) - then: connectAck.reasonCode == ConnectAckReasonCode.SUCCESS receivedDupPublish.duplicate diff --git a/application/src/test/groovy/javasabr/mqtt/application/integration/service/MqttSessionServiceTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/integration/service/MqttSessionServiceTest.groovy index abb674b6..b6376090 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/integration/service/MqttSessionServiceTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/integration/service/MqttSessionServiceTest.groovy @@ -32,7 +32,7 @@ class MqttSessionServiceTest extends IntegrationSpecification { then: restored != null when: - mqttSessionService.store(clientId, restored, externalConnectionConfig.getDefaultSessionExpiryInterval()).block() + mqttSessionService.store(clientId, restored, externalConnectionConfig.defaultSessionExpiryInterval()).block() client.connect().join() shouldNoSession = mqttSessionService.restore(clientId).block() then: diff --git a/application/src/test/groovy/javasabr/mqtt/application/integration/service/SubscribtionServiceTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/integration/service/SubscribtionServiceTest.groovy index ece76cc0..737c021f 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/integration/service/SubscribtionServiceTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/integration/service/SubscribtionServiceTest.groovy @@ -51,7 +51,7 @@ class SubscribtionServiceTest extends IntegrationSpecification { def actionResult = subscriptionService.forEachTopicSubscriber(topicName, null, action) then: matchesCount == 1 - matchedSubscriber.user.getClientId() == clientId + matchedSubscriber.user.clientId() == clientId matchedSubscriber.subscribe.topicFilter.getRawTopic() == topicFilter actionResult == ActionResult.SUCCESS when: @@ -64,7 +64,7 @@ class SubscribtionServiceTest extends IntegrationSpecification { actionResult = subscriptionService.forEachTopicSubscriber(topicName, clientId, action) then: matchesCount == 2 - matchedSubscriber.user.getClientId() == clientId + matchedSubscriber.user.clientId() == clientId matchedSubscriber.subscribe.topicFilter.getRawTopic() == topicFilter actionResult == ActionResult.SUCCESS cleanup: diff --git a/application/src/test/groovy/javasabr/mqtt/application/mock/MqttMockClient.groovy b/application/src/test/groovy/javasabr/mqtt/application/mock/MqttMockClient.groovy index 995002c2..71db6538 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/mock/MqttMockClient.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/mock/MqttMockClient.groovy @@ -7,6 +7,7 @@ import javasabr.mqtt.network.packet.in.MqttReadablePacket import javasabr.mqtt.network.packet.in.PublishInPacket import javasabr.mqtt.network.packet.in.PublishReleaseInPacket import javasabr.mqtt.network.packet.in.SubscribeAckInPacket +import javasabr.mqtt.network.packet.out.Disconnect311OutPacket import javasabr.mqtt.network.packet.out.MqttWritablePacket import javasabr.mqtt.network.utils.MqttDataUtils import javasabr.rlib.common.util.NumberUtils @@ -30,24 +31,20 @@ class MqttMockClient { } void connect() { - if (socket != null) { return } - socket = new Socket(brokerHost, brokerPort) } void send(MqttWritablePacket packet) { def dataBuffer = ByteBuffer.allocate(1024) - - packet.write(dataBuffer) - + packet.write(connection, dataBuffer) dataBuffer.flip() def finalBuffer = ByteBuffer.allocate(1024) - finalBuffer.put((byte) packet.getPacketTypeAndFlags()) + finalBuffer.put((byte) packet.packetTypeAndFlags()) MqttDataUtils.writeMbi(dataBuffer.remaining(), finalBuffer) @@ -62,10 +59,8 @@ class MqttMockClient { MqttReadablePacket readNext() { if (received.position() == 0) { - def input = socket.getInputStream() def readBytes = input.read(received.array(), received.position(), received.capacity() - received.position()) - if (readBytes > 0) { received.position(received.position() + readBytes) } @@ -112,8 +107,12 @@ class MqttMockClient { return packet } - def close() { + def disconnect() { + send(new Disconnect311OutPacket()) + close() + } + def close() { if (socket != null) { socket.close() socket = null diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/NetworkUnitSpecification.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/NetworkUnitSpecification.groovy index 23378244..2bdd4b33 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/NetworkUnitSpecification.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/NetworkUnitSpecification.groovy @@ -223,13 +223,13 @@ class NetworkUnitSpecification extends UnitSpecification { int topicAliasMaximum ) { return Stub(MqttClient.UnsafeMqttClient) { - getConnectionConfig() >> mqttConnectionConfig - getSessionExpiryInterval() >> sessionExpiryInterval - getReceiveMax() >> receiveMaximum - getMaximumPacketSize() >> maximumPacketSize - getClientId() >> clientId - getKeepAlive() >> serverKeepAlive - getTopicAliasMaximum() >> topicAliasMaximum + connectionConfig() >> mqttConnectionConfig + sessionExpiryInterval() >> sessionExpiryInterval + receiveMax() >> receiveMaximum + maximumPacketSize() >> maximumPacketSize + clientId() >> clientId + keepAlive() >> serverKeepAlive + topicAliasMaximum() >> topicAliasMaximum } } } diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/Authentication5OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/Authentication5OutPacketTest.groovy index 02b90490..c5bfa5c5 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/Authentication5OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/Authentication5OutPacketTest.groovy @@ -22,7 +22,7 @@ class Authentication5OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } def reader = new AuthenticationInPacket(0b1111_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/BaseOutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/BaseOutPacketTest.groovy index a00e7772..188377f9 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/BaseOutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/BaseOutPacketTest.groovy @@ -8,23 +8,23 @@ class BaseOutPacketTest extends NetworkUnitSpecification { @Shared MqttClient mqtt5Client = Stub(MqttClient.UnsafeMqttClient) { - getConnectionConfig() >> mqttConnectionConfig - getSessionExpiryInterval() >> NetworkUnitSpecification.sessionExpiryInterval - getReceiveMax() >> NetworkUnitSpecification.receiveMaximum - getMaximumPacketSize() >> NetworkUnitSpecification.maximumPacketSize - getClientId() >> clientId - getKeepAlive() >> serverKeepAlive - getTopicAliasMaximum() >> NetworkUnitSpecification.topicAliasMaximum + connectionConfig() >> mqttConnectionConfig + sessionExpiryInterval() >> NetworkUnitSpecification.sessionExpiryInterval + receiveMax() >> NetworkUnitSpecification.receiveMaximum + maximumPacketSize() >> NetworkUnitSpecification.maximumPacketSize + clientId() >> clientId + keepAlive() >> serverKeepAlive + topicAliasMaximum() >> NetworkUnitSpecification.topicAliasMaximum } @Shared MqttClient mqtt311Client = Stub(MqttClient.UnsafeMqttClient) { - getConnectionConfig() >> mqttConnectionConfig - getSessionExpiryInterval() >> NetworkUnitSpecification.sessionExpiryInterval - getReceiveMax() >> NetworkUnitSpecification.receiveMaximum - getMaximumPacketSize() >> NetworkUnitSpecification.maximumPacketSize - getClientId() >> clientId - getKeepAlive() >> serverKeepAlive - getTopicAliasMaximum() >> NetworkUnitSpecification.topicAliasMaximum + connectionConfig() >> mqttConnectionConfig + sessionExpiryInterval() >> NetworkUnitSpecification.sessionExpiryInterval + receiveMax() >> NetworkUnitSpecification.receiveMaximum + maximumPacketSize() >> NetworkUnitSpecification.maximumPacketSize + clientId() >> clientId + keepAlive() >> serverKeepAlive + topicAliasMaximum() >> NetworkUnitSpecification.topicAliasMaximum } } diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/Connect311OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/Connect311OutPacketTest.groovy index eb1c38d2..9734a9eb 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/Connect311OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/Connect311OutPacketTest.groovy @@ -28,7 +28,7 @@ class Connect311OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt311Connection, it) } def reader = new ConnectInPacket(0b0001_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/Connect5OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/Connect5OutPacketTest.groovy index 6c5d92ee..73ecddcd 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/Connect5OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/Connect5OutPacketTest.groovy @@ -36,7 +36,7 @@ class Connect5OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } def reader = new ConnectInPacket(0b0001_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/ConnectAck311OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/ConnectAck311OutPacketTest.groovy index 82194aed..acc4a2c8 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/ConnectAck311OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/ConnectAck311OutPacketTest.groovy @@ -22,7 +22,7 @@ class ConnectAck311OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt311Connection, it) } def reader = new ConnectAckInPacket(0b0010_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/ConnectAck5OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/ConnectAck5OutPacketTest.groovy index bd390743..9c3f4d2a 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/ConnectAck5OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/ConnectAck5OutPacketTest.groovy @@ -41,7 +41,7 @@ class ConnectAck5OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } def reader = new ConnectAckInPacket(0b0010_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/DisconnectAck5OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/DisconnectAck5OutPacketTest.groovy index 1bfeded2..036cdab5 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/DisconnectAck5OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/DisconnectAck5OutPacketTest.groovy @@ -22,7 +22,7 @@ class DisconnectAck5OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } def reader = new DisconnectInPacket(0b1110_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/Publish311OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/Publish311OutPacketTest.groovy index 3182329d..9d096a75 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/Publish311OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/Publish311OutPacketTest.groovy @@ -17,12 +17,11 @@ class Publish311OutPacketTest extends BaseOutPacketTest { true, true, publishTopic.toString(), - publishPayload - ) + publishPayload) when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt311Connection, it) } def reader = new PublishInPacket(0b0011_1101 as byte) @@ -49,7 +48,7 @@ class Publish311OutPacketTest extends BaseOutPacketTest { ) dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt311Connection, it) } reader = new PublishInPacket(0b0011_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/Publish5OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/Publish5OutPacketTest.groovy index e26b6230..12890e90 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/Publish5OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/Publish5OutPacketTest.groovy @@ -26,7 +26,7 @@ class Publish5OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } def reader = new PublishInPacket(0b0011_1101 as byte) @@ -62,7 +62,7 @@ class Publish5OutPacketTest extends BaseOutPacketTest { ) dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } reader = new PublishInPacket(0b0011_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishAck311OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishAck311OutPacketTest.groovy index 3d698572..a5d7a70a 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishAck311OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishAck311OutPacketTest.groovy @@ -15,7 +15,7 @@ class PublishAck311OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt311Connection, it) } def reader = new PublishAckInPacket(0b0100_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishAck5OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishAck5OutPacketTest.groovy index 99e18be6..f07270b7 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishAck5OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishAck5OutPacketTest.groovy @@ -21,7 +21,7 @@ class PublishAck5OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } def reader = new PublishAckInPacket(0b0100_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishComplete311OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishComplete311OutPacketTest.groovy index 85ef588a..2e552699 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishComplete311OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishComplete311OutPacketTest.groovy @@ -15,7 +15,7 @@ class PublishComplete311OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt311Connection, it) } def reader = new PublishCompleteInPacket(0b0111_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishComplete5OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishComplete5OutPacketTest.groovy index 43dc74ee..6e5a55b3 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishComplete5OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishComplete5OutPacketTest.groovy @@ -21,7 +21,7 @@ class PublishComplete5OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } def reader = new PublishCompleteInPacket(0b0111_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishReceived311OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishReceived311OutPacketTest.groovy index 68268b61..b0d0c94f 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishReceived311OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishReceived311OutPacketTest.groovy @@ -16,7 +16,7 @@ class PublishReceived311OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt311Connection, it) } def reader = new PublishReceivedInPacket(0b0101_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishReceived5OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishReceived5OutPacketTest.groovy index 30df350a..941b966e 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishReceived5OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishReceived5OutPacketTest.groovy @@ -21,7 +21,7 @@ class PublishReceived5OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } def reader = new PublishReceivedInPacket(0b0101_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishRelease311OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishRelease311OutPacketTest.groovy index 8a18a0d7..8e5dfc65 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishRelease311OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishRelease311OutPacketTest.groovy @@ -16,7 +16,7 @@ class PublishRelease311OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt311Connection, it) } def reader = new PublishReleaseInPacket(0b0110_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishRelease5OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishRelease5OutPacketTest.groovy index e8185583..9b9e3b04 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishRelease5OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/PublishRelease5OutPacketTest.groovy @@ -21,7 +21,7 @@ class PublishRelease5OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } def reader = new PublishReleaseInPacket(0b0110_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/Subscribe311OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/Subscribe311OutPacketTest.groovy index 87531007..30076825 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/Subscribe311OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/Subscribe311OutPacketTest.groovy @@ -21,7 +21,7 @@ class Subscribe311OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt311Connection, it) } def reader = new SubscribeInPacket(0b1000_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/Subscribe5OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/Subscribe5OutPacketTest.groovy index 66522731..1333ca24 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/Subscribe5OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/Subscribe5OutPacketTest.groovy @@ -21,7 +21,7 @@ class Subscribe5OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } def reader = new SubscribeInPacket(0b1000_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/SubscribeAck311OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/SubscribeAck311OutPacketTest.groovy index 95afb7b7..bf408faf 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/SubscribeAck311OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/SubscribeAck311OutPacketTest.groovy @@ -15,7 +15,7 @@ class SubscribeAck311OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt311Connection, it) } def reader = new SubscribeAckInPacket(0b1001_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/SubscribeAck5OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/SubscribeAck5OutPacketTest.groovy index e42c33f5..6fb204d3 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/SubscribeAck5OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/SubscribeAck5OutPacketTest.groovy @@ -20,7 +20,7 @@ class SubscribeAck5OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } def reader = new SubscribeAckInPacket(0b1001_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/UnsubscribeAck311OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/UnsubscribeAck311OutPacketTest.groovy index e655e112..a840e145 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/UnsubscribeAck311OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/UnsubscribeAck311OutPacketTest.groovy @@ -16,7 +16,7 @@ class UnsubscribeAck311OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt311Connection, it) } def reader = new UnsubscribeAckInPacket(0b1011_0000 as byte) diff --git a/application/src/test/groovy/javasabr/mqtt/application/network/out/UnsubscribeAck5OutPacketTest.groovy b/application/src/test/groovy/javasabr/mqtt/application/network/out/UnsubscribeAck5OutPacketTest.groovy index 20205587..306ebad8 100644 --- a/application/src/test/groovy/javasabr/mqtt/application/network/out/UnsubscribeAck5OutPacketTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/application/network/out/UnsubscribeAck5OutPacketTest.groovy @@ -20,7 +20,7 @@ class UnsubscribeAck5OutPacketTest extends BaseOutPacketTest { when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(it) + packet.write(mqtt5Connection, it) } def reader = new UnsubscribeAckInPacket(0b1011_0000 as byte) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2dadc352..3d1bd955 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,6 +1,6 @@ [versions] # https://gitlab.com/JavaSaBr/maven-repo/-/packages -rlib = "10.0.alpha1" +rlib = "10.0.alpha3" # https://mvnrepository.com/artifact/org.jetbrains/annotations jetbrains-annotations = "26.0.2" # https://mvnrepository.com/artifact/org.projectlombok/lombok @@ -30,11 +30,10 @@ byte-buddy = "1.17.6" # https://mvnrepository.com/artifact/org.objenesis/objenesis objenesis = "3.4" # https://mvnrepository.com/artifact/com.hivemq/hivemq-mqtt-client -hivemq-mqtt-client = "1.3.7" +hivemq-mqtt-client = "1.3.10" # https://mvnrepository.com/artifact/io.moquette/moquette-broker moquette-broker = "0.17" - [libraries] rlib-network = { module = "javasabr.rlib:rlib-network", version.ref = "rlib" } rlib-logger-api = { module = "javasabr.rlib:rlib-logger-api", version.ref = "rlib" } diff --git a/model/src/main/java/javasabr/mqtt/model/MqttConnectionConfig.java b/model/src/main/java/javasabr/mqtt/model/MqttConnectionConfig.java index adff53d8..a81e3450 100644 --- a/model/src/main/java/javasabr/mqtt/model/MqttConnectionConfig.java +++ b/model/src/main/java/javasabr/mqtt/model/MqttConnectionConfig.java @@ -2,9 +2,11 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.experimental.Accessors; @Getter @RequiredArgsConstructor +@Accessors(fluent = true, chain = false) public class MqttConnectionConfig { private final QoS maxQos; diff --git a/network/src/main/java/javasabr/mqtt/network/DefaultMqttSession.java b/network/src/main/java/javasabr/mqtt/network/DefaultMqttSession.java index ecebaf81..1c2aaf03 100644 --- a/network/src/main/java/javasabr/mqtt/network/DefaultMqttSession.java +++ b/network/src/main/java/javasabr/mqtt/network/DefaultMqttSession.java @@ -1,13 +1,13 @@ package javasabr.mqtt.network; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; import javasabr.mqtt.model.MqttProperties; -import javasabr.mqtt.network.MqttSession.UnsafeMqttSession; import javasabr.mqtt.model.subscriber.SubscribeTopicFilter; import javasabr.mqtt.model.topic.TopicFilter; +import javasabr.mqtt.network.MqttSession.UnsafeMqttSession; import javasabr.mqtt.network.packet.HasPacketId; import javasabr.mqtt.network.packet.in.PublishInPacket; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicInteger; import javasabr.rlib.collections.array.ArrayFactory; import javasabr.rlib.collections.array.LockableArray; import javasabr.rlib.functions.TriConsumer; @@ -17,11 +17,12 @@ import lombok.Getter; import lombok.Setter; import lombok.ToString; -import lombok.extern.log4j.Log4j2; +import lombok.experimental.Accessors; @CustomLog @ToString(of = "clientId") @EqualsAndHashCode(of = "clientId") +@Accessors(fluent = true, chain = false) public class DefaultMqttSession implements UnsafeMqttSession { @Getter @@ -49,7 +50,7 @@ private static void updatePendingPacket( LockableArray pendingPublishes, String clientId) { - int packetId = response.getPacketId(); + int packetId = response.packetId(); PendingPublish pendingPublish; long stamp = pendingPublishes.readLock(); @@ -81,8 +82,9 @@ private static void updatePendingPacket( private final AtomicInteger packetIdGenerator; private final LockableArray topicFilters; - private volatile @Getter - @Setter long expirationTime = -1; + @Getter + @Setter + private volatile long expirationTime = -1; public DefaultMqttSession(String clientId) { this.clientId = clientId; @@ -106,7 +108,7 @@ public int nextPacketId() { } @Override - public String getClientId() { + public String clientId() { return clientId; } diff --git a/network/src/main/java/javasabr/mqtt/network/MqttClient.java b/network/src/main/java/javasabr/mqtt/network/MqttClient.java index a3406cbd..01e5a715 100644 --- a/network/src/main/java/javasabr/mqtt/network/MqttClient.java +++ b/network/src/main/java/javasabr/mqtt/network/MqttClient.java @@ -1,12 +1,12 @@ package javasabr.mqtt.network; -import javasabr.mqtt.network.out.MqttPacketOutFactory; -import javasabr.mqtt.network.packet.out.MqttWritablePacket; +import java.util.concurrent.CompletableFuture; import javasabr.mqtt.model.MqttConnectionConfig; import javasabr.mqtt.model.MqttUser; import javasabr.mqtt.model.reason.code.ConnectAckReasonCode; +import javasabr.mqtt.network.out.MqttPacketOutFactory; import javasabr.mqtt.network.packet.in.MqttReadablePacket; -import java.util.concurrent.CompletableFuture; +import javasabr.mqtt.network.packet.out.MqttWritablePacket; import org.jspecify.annotations.Nullable; import reactor.core.publisher.Mono; @@ -14,7 +14,7 @@ public interface MqttClient extends MqttUser { interface UnsafeMqttClient extends MqttClient { - MqttConnection getConnection(); + MqttConnection connection(); void handle(MqttReadablePacket packet); @@ -27,33 +27,33 @@ void configure( boolean requestResponseInformation, boolean requestProblemInformation); - void setClientId(String clientId); + void clientId(String clientId); - void setSession(@Nullable MqttSession session); + void session(@Nullable MqttSession session); void reject(ConnectAckReasonCode reasonCode); Mono release(); } - MqttPacketOutFactory getPacketOutFactory(); + MqttPacketOutFactory packetOutFactory(); - MqttConnectionConfig getConnectionConfig(); + MqttConnectionConfig connectionConfig(); - String getClientId(); + String clientId(); @Nullable - MqttSession getSession(); + MqttSession session(); - int getKeepAlive(); + int keepAlive(); - int getMaximumPacketSize(); + int maximumPacketSize(); - int getReceiveMax(); + int receiveMax(); - int getTopicAliasMaximum(); + int topicAliasMaximum(); - long getSessionExpiryInterval(); + long sessionExpiryInterval(); void send(MqttWritablePacket packet); diff --git a/network/src/main/java/javasabr/mqtt/network/MqttConnection.java b/network/src/main/java/javasabr/mqtt/network/MqttConnection.java index 2b9113bf..a542367b 100644 --- a/network/src/main/java/javasabr/mqtt/network/MqttConnection.java +++ b/network/src/main/java/javasabr/mqtt/network/MqttConnection.java @@ -1,34 +1,33 @@ package javasabr.mqtt.network; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.function.Function; import javasabr.mqtt.model.MqttConnectionConfig; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.network.MqttClient.UnsafeMqttClient; import javasabr.mqtt.network.handler.packet.in.PacketInHandler; import javasabr.mqtt.network.packet.MqttPacketReader; import javasabr.mqtt.network.packet.MqttPacketWriter; -import javasabr.mqtt.network.packet.in.MqttReadablePacket; -import javasabr.mqtt.network.packet.out.MqttWritablePacket; -import java.nio.channels.AsynchronousSocketChannel; -import java.util.function.Function; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; import javasabr.rlib.network.Network; import javasabr.rlib.network.impl.AbstractConnection; -import javasabr.rlib.network.packet.PacketReader; -import javasabr.rlib.network.packet.PacketWriter; +import javasabr.rlib.network.packet.NetworkPacketReader; +import javasabr.rlib.network.packet.NetworkPacketWriter; import lombok.AccessLevel; import lombok.CustomLog; import lombok.Getter; import lombok.Setter; +import lombok.experimental.Accessors; @CustomLog -public class MqttConnection extends AbstractConnection { +@Accessors(fluent = true, chain = false) +public class MqttConnection extends AbstractConnection { @Getter(AccessLevel.PROTECTED) - private final PacketReader packetReader; + private final NetworkPacketReader packetReader; @Getter(AccessLevel.PROTECTED) - private final PacketWriter packetWriter; + private final NetworkPacketWriter packetWriter; @Getter private final PacketInHandler[] packetHandlers; @@ -47,7 +46,7 @@ public class MqttConnection extends AbstractConnection> network, + Network network, AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, int maxPacketsByRead, @@ -67,7 +66,7 @@ public boolean isSupported(MqttVersion mqttVersion) { return this.mqttVersion.ordinal() >= mqttVersion.ordinal(); } - private PacketReader createPacketReader() { + private NetworkPacketReader createPacketReader() { return new MqttPacketReader( this, channel, @@ -77,20 +76,20 @@ private PacketReader createPacketReader() { maxPacketsByRead); } - private PacketWriter createPacketWriter() { + private NetworkPacketWriter createPacketWriter() { return new MqttPacketWriter( this, channel, bufferAllocator, this::updateLastActivity, this::nextPacketToWrite, - this::onWrittenPacket, - this::onSentPacket); + this::serializedPacket, + this::handleSentPacket); } @Override public String toString() { - return getRemoteAddress(); + return remoteAddress; } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/MqttSession.java b/network/src/main/java/javasabr/mqtt/network/MqttSession.java index a19ca0af..93934ac7 100644 --- a/network/src/main/java/javasabr/mqtt/network/MqttSession.java +++ b/network/src/main/java/javasabr/mqtt/network/MqttSession.java @@ -1,16 +1,16 @@ package javasabr.mqtt.network; -import javasabr.mqtt.network.packet.HasPacketId; -import javasabr.mqtt.network.packet.in.PublishInPacket; import javasabr.mqtt.model.subscriber.SubscribeTopicFilter; import javasabr.mqtt.model.topic.TopicFilter; +import javasabr.mqtt.network.packet.HasPacketId; +import javasabr.mqtt.network.packet.in.PublishInPacket; import javasabr.rlib.functions.TriConsumer; public interface MqttSession { interface UnsafeMqttSession extends MqttSession { - void setExpirationTime(long expirationTime); + void expirationTime(long expirationTime); void clear(); @@ -26,18 +26,17 @@ interface PendingPacketHandler { */ boolean handleResponse(MqttClient client, HasPacketId response); - default void resend(MqttClient client, PublishInPacket packet, int packetId) { - } + default void resend(MqttClient client, PublishInPacket packet, int packetId) {} } - String getClientId(); + String clientId(); int nextPacketId(); /** * @return the expiration time in ms or -1 if it should not be expired now. */ - long getExpirationTime(); + long expirationTime(); void resendPendingPackets(MqttClient client); @@ -57,10 +56,10 @@ default void resend(MqttClient client, PublishInPacket packet, int packetId) { void updateInPendingPacket(MqttClient client, HasPacketId response); - void forEachTopicFilter( - F first, - S second, - TriConsumer consumer); + void forEachTopicFilter( + A arg1, + B arg2, + TriConsumer consumer); void addSubscriber(SubscribeTopicFilter subscribe); diff --git a/network/src/main/java/javasabr/mqtt/network/client/AbstractMqttClient.java b/network/src/main/java/javasabr/mqtt/network/client/AbstractMqttClient.java index e7164c4e..96ee8a43 100644 --- a/network/src/main/java/javasabr/mqtt/network/client/AbstractMqttClient.java +++ b/network/src/main/java/javasabr/mqtt/network/client/AbstractMqttClient.java @@ -1,76 +1,80 @@ package javasabr.mqtt.network.client; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.MqttConnectionConfig; -import javasabr.mqtt.network.MqttSession; import javasabr.mqtt.model.reason.code.ConnectAckReasonCode; -import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.MqttClient.UnsafeMqttClient; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.MqttSession; import javasabr.mqtt.network.handler.client.MqttClientReleaseHandler; import javasabr.mqtt.network.handler.packet.in.PacketInHandler; import javasabr.mqtt.network.out.MqttPacketOutFactories; import javasabr.mqtt.network.out.MqttPacketOutFactory; import javasabr.mqtt.network.packet.in.MqttReadablePacket; import javasabr.mqtt.network.packet.out.MqttWritablePacket; -import javasabr.mqtt.base.utils.DebugUtils; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import javasabr.rlib.common.util.StringUtils; +import lombok.AccessLevel; import lombok.CustomLog; import lombok.Getter; import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.experimental.FieldDefaults; import org.jspecify.annotations.Nullable; import reactor.core.publisher.Mono; @Getter @CustomLog +@Accessors(fluent = true, chain = false) +@FieldDefaults(level = AccessLevel.PROTECTED) public abstract class AbstractMqttClient implements UnsafeMqttClient { static { DebugUtils.registerIncludedFields("clientId"); } - protected final MqttConnection connection; - protected final MqttClientReleaseHandler releaseHandler; - protected final AtomicBoolean released; + final MqttConnection connection; + final MqttClientReleaseHandler releaseHandler; + final AtomicBoolean released; @Setter - private volatile String clientId; + volatile String clientId; @Setter @Getter @Nullable - private volatile MqttSession session; + volatile MqttSession session; - private volatile long sessionExpiryInterval; - private volatile int receiveMax; - private volatile int maximumPacketSize; - private volatile int topicAliasMaximum; - private volatile int keepAlive; + volatile long sessionExpiryInterval; + volatile int receiveMax; + volatile int maximumPacketSize; + volatile int topicAliasMaximum; + volatile int keepAlive; - private volatile boolean requestResponseInformation = false; - private volatile boolean requestProblemInformation = false; + volatile boolean requestResponseInformation = false; + volatile boolean requestProblemInformation = false; public AbstractMqttClient(MqttConnection connection, MqttClientReleaseHandler releaseHandler) { + MqttConnectionConfig config = connection.config(); this.connection = connection; this.releaseHandler = releaseHandler; this.released = new AtomicBoolean(false); - this.clientId = StringUtils.EMPTY; - var config = connection.getConfig(); - this.sessionExpiryInterval = config.getDefaultSessionExpiryInterval(); - this.receiveMax = config.getReceiveMaximum(); - this.maximumPacketSize = config.getMaximumPacketSize(); - this.topicAliasMaximum = config.getTopicAliasMaximum(); - this.keepAlive = config.getMinKeepAliveTime(); + this.clientId = connection.remoteAddress(); + this.sessionExpiryInterval = config.defaultSessionExpiryInterval(); + this.receiveMax = config.receiveMaximum(); + this.maximumPacketSize = config.maximumPacketSize(); + this.topicAliasMaximum = config.topicAliasMaximum(); + this.keepAlive = config.minKeepAliveTime(); } @Override public void handle(MqttReadablePacket packet) { - log.debug(clientId, packet.getName(), packet, "Client:[%s] received packet:[%s]:[%s]"::formatted); - PacketInHandler packetHandler = connection.getPacketHandlers()[packet.getPacketType()]; + log.debug(clientId, packet.name(), packet, "[%s] Received packet:[%s] %s"::formatted); + PacketInHandler packetHandler = connection.packetHandlers()[packet.packetType()]; if (packetHandler != null) { packetHandler.handle(this, packet); } else { - log.warning(this, packet, "No packet handler in client:[%s] for packet:[%s]"::formatted); + log.warning(clientId, packet.name(), packet, "[%s] No packet handler for packet:[%s] %s"::formatted); } } @@ -94,30 +98,30 @@ public void configure( @Override public void send(MqttWritablePacket packet) { - log.debug(clientId, packet.getName(), packet, "Send to client:[%s] packet:[%s]:[%s]"::formatted); + log.debug(clientId, packet.name(), packet, "[%s] Send to client packet:[%s] %s"::formatted); connection.send(packet); } @Override public CompletableFuture sendWithFeedback(MqttWritablePacket packet) { - log.debug(clientId, packet.getName(), packet, "Send to client:[%s] packet:[%s]:[%s]"::formatted); + log.debug(clientId, packet.name(), packet, "[%s] Send to client packet:[%s] %s"::formatted); return connection.sendWithFeedback(packet); } public void reject(ConnectAckReasonCode reasonCode) { connection - .sendWithFeedback(getPacketOutFactory().newConnectAck(this, reasonCode)) - .thenAccept(sent -> connection.close()); + .sendWithFeedback(packetOutFactory().newConnectAck(this, reasonCode)) + .thenAccept(_ -> connection.close()); } @Override - public MqttPacketOutFactory getPacketOutFactory() { - return MqttPacketOutFactories.of(connection.getMqttVersion()); + public MqttPacketOutFactory packetOutFactory() { + return MqttPacketOutFactories.of(connection.mqttVersion()); } @Override - public MqttConnectionConfig getConnectionConfig() { - return connection.getConfig(); + public MqttConnectionConfig connectionConfig() { + return connection.config(); } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/client/ExternalMqttClient.java b/network/src/main/java/javasabr/mqtt/network/client/ExternalMqttClient.java index b9dcfe92..ebe321c4 100644 --- a/network/src/main/java/javasabr/mqtt/network/client/ExternalMqttClient.java +++ b/network/src/main/java/javasabr/mqtt/network/client/ExternalMqttClient.java @@ -1,7 +1,7 @@ package javasabr.mqtt.network.client; -import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.base.utils.DebugUtils; +import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.handler.client.MqttClientReleaseHandler; public class ExternalMqttClient extends AbstractMqttClient { diff --git a/network/src/main/java/javasabr/mqtt/network/client/InternalMqttClient.java b/network/src/main/java/javasabr/mqtt/network/client/InternalMqttClient.java index b4c70bc0..94159851 100644 --- a/network/src/main/java/javasabr/mqtt/network/client/InternalMqttClient.java +++ b/network/src/main/java/javasabr/mqtt/network/client/InternalMqttClient.java @@ -1,7 +1,7 @@ package javasabr.mqtt.network.client; -import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.base.utils.DebugUtils; +import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.handler.client.MqttClientReleaseHandler; public class InternalMqttClient extends AbstractMqttClient { diff --git a/network/src/main/java/javasabr/mqtt/network/handler/packet/in/PacketInHandler.java b/network/src/main/java/javasabr/mqtt/network/handler/packet/in/PacketInHandler.java index 3a0b11c7..01a05d77 100644 --- a/network/src/main/java/javasabr/mqtt/network/handler/packet/in/PacketInHandler.java +++ b/network/src/main/java/javasabr/mqtt/network/handler/packet/in/PacketInHandler.java @@ -1,11 +1,11 @@ package javasabr.mqtt.network.handler.packet.in; -import javasabr.mqtt.network.MqttClient; +import javasabr.mqtt.network.MqttClient.UnsafeMqttClient; import javasabr.mqtt.network.packet.in.MqttReadablePacket; public interface PacketInHandler { PacketInHandler EMPTY = (client, packet) -> {}; - void handle(MqttClient.UnsafeMqttClient client, MqttReadablePacket packet); + void handle(UnsafeMqttClient client, MqttReadablePacket packet); } diff --git a/network/src/main/java/javasabr/mqtt/network/handler/packet/in/package-info.java b/network/src/main/java/javasabr/mqtt/network/handler/packet/in/package-info.java new file mode 100644 index 00000000..b5042af1 --- /dev/null +++ b/network/src/main/java/javasabr/mqtt/network/handler/packet/in/package-info.java @@ -0,0 +1,4 @@ +@NullMarked +package javasabr.mqtt.network.handler.packet.in; + +import org.jspecify.annotations.NullMarked; \ No newline at end of file diff --git a/network/src/main/java/javasabr/mqtt/network/out/Mqtt311PacketOutFactory.java b/network/src/main/java/javasabr/mqtt/network/out/Mqtt311PacketOutFactory.java index f3435ab8..ec006d02 100644 --- a/network/src/main/java/javasabr/mqtt/network/out/Mqtt311PacketOutFactory.java +++ b/network/src/main/java/javasabr/mqtt/network/out/Mqtt311PacketOutFactory.java @@ -78,7 +78,7 @@ public MqttWritablePacket newSubscribeAck( int packetId, Array reasonCodes, String reason, - MutableArray userProperties) { + Array userProperties) { return new SubscribeAck311OutPacket(reasonCodes, packetId); } @@ -86,7 +86,7 @@ public MqttWritablePacket newSubscribeAck( public MqttWritablePacket newUnsubscribeAck( int packetId, Array reasonCodes, - MutableArray userProperties, + Array userProperties, String reason) { return new UnsubscribeAck311OutPacket(packetId); } diff --git a/network/src/main/java/javasabr/mqtt/network/out/Mqtt5PacketOutFactory.java b/network/src/main/java/javasabr/mqtt/network/out/Mqtt5PacketOutFactory.java index 6f047c6d..2dba1d66 100644 --- a/network/src/main/java/javasabr/mqtt/network/out/Mqtt5PacketOutFactory.java +++ b/network/src/main/java/javasabr/mqtt/network/out/Mqtt5PacketOutFactory.java @@ -44,7 +44,7 @@ public MqttWritablePacket newConnectAck( String authenticationMethod, byte[] authenticationData, MutableArray userProperties) { - var config = client.getConnectionConfig(); + var config = client.connectionConfig(); return new ConnectAck5OutPacket( reasonCode, sessionPresent, @@ -58,17 +58,17 @@ public MqttWritablePacket newConnectAck( authenticationMethod, authenticationData, userProperties, - client.getClientId(), - config.getMaxQos(), - client.getSessionExpiryInterval(), - client.getMaximumPacketSize(), - client.getReceiveMax(), - client.getTopicAliasMaximum(), - client.getKeepAlive(), - config.isRetainAvailable(), - config.isWildcardSubscriptionAvailable(), - config.isSubscriptionIdAvailable(), - config.isSharedSubscriptionAvailable()); + client.clientId(), + config.maxQos(), + client.sessionExpiryInterval(), + client.maximumPacketSize(), + client.receiveMax(), + client.topicAliasMaximum(), + client.keepAlive(), + config.retainAvailable(), + config.wildcardSubscriptionAvailable(), + config.subscriptionIdAvailable(), + config.sharedSubscriptionAvailable()); } @Override @@ -112,7 +112,7 @@ public MqttWritablePacket newSubscribeAck( int packetId, Array reasonCodes, String reason, - MutableArray userProperties) { + Array userProperties) { return new SubscribeAck5OutPacket(packetId, reasonCodes, userProperties, reason); } @@ -120,7 +120,7 @@ public MqttWritablePacket newSubscribeAck( public MqttWritablePacket newUnsubscribeAck( int packetId, Array reasonCodes, - MutableArray userProperties, + Array userProperties, String reason) { return new UnsubscribeAck5OutPacket(packetId, reasonCodes, userProperties, reason); } @@ -137,7 +137,7 @@ public MqttWritablePacket newDisconnect( userProperties, reason, serverReference, - client.getSessionExpiryInterval()); + client.sessionExpiryInterval()); } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/out/MqttPacketOutFactory.java b/network/src/main/java/javasabr/mqtt/network/out/MqttPacketOutFactory.java index 65590204..5ddcc5af 100644 --- a/network/src/main/java/javasabr/mqtt/network/out/MqttPacketOutFactory.java +++ b/network/src/main/java/javasabr/mqtt/network/out/MqttPacketOutFactory.java @@ -66,9 +66,9 @@ public MqttWritablePacket newConnectAck(MqttClient client, ConnectAckReasonCode reasonCode, false, StringUtils.EMPTY, - client.getSessionExpiryInterval(), - client.getKeepAlive(), - client.getReceiveMax(), + client.sessionExpiryInterval(), + client.keepAlive(), + client.receiveMax(), StringUtils.EMPTY, StringUtils.EMPTY, StringUtils.EMPTY, @@ -125,20 +125,20 @@ public abstract MqttWritablePacket newSubscribeAck( int packetId, Array reasonCodes, String reason, - MutableArray userProperties); + Array userProperties); public MqttWritablePacket newSubscribeAck(int packetId, Array reasonCodes) { - return newSubscribeAck(packetId, reasonCodes, StringUtils.EMPTY, MutableArray.ofType(StringPair.class)); + return newSubscribeAck(packetId, reasonCodes, StringUtils.EMPTY, Array.empty(StringPair.class)); } public abstract MqttWritablePacket newUnsubscribeAck( int packetId, Array reasonCodes, - MutableArray userProperties, + Array userProperties, String reason); public MqttWritablePacket newUnsubscribeAck(int packetId, Array reasonCodes) { - return newUnsubscribeAck(packetId, reasonCodes, MutableArray.ofType(StringPair.class), StringUtils.EMPTY); + return newUnsubscribeAck(packetId, reasonCodes, Array.empty(StringPair.class), StringUtils.EMPTY); } public abstract MqttWritablePacket newDisconnect( diff --git a/network/src/main/java/javasabr/mqtt/network/packet/HasPacketId.java b/network/src/main/java/javasabr/mqtt/network/packet/HasPacketId.java index 3bac603a..6d3d3c6e 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/HasPacketId.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/HasPacketId.java @@ -1,8 +1,9 @@ package javasabr.mqtt.network.packet; -import javasabr.rlib.network.packet.Packet; +import javasabr.mqtt.network.MqttConnection; +import javasabr.rlib.network.packet.NetworkPacket; -public interface HasPacketId extends Packet { +public interface HasPacketId extends NetworkPacket { - int getPacketId(); + int packetId(); } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/MqttPacketReader.java b/network/src/main/java/javasabr/mqtt/network/packet/MqttPacketReader.java index be3fb5ba..5e1cba8c 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/MqttPacketReader.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/MqttPacketReader.java @@ -1,5 +1,9 @@ package javasabr.mqtt.network.packet; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.NoSuchElementException; +import java.util.function.Consumer; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.in.AuthenticationInPacket; import javasabr.mqtt.network.packet.in.ConnectAckInPacket; @@ -17,19 +21,15 @@ import javasabr.mqtt.network.packet.in.SubscribeInPacket; import javasabr.mqtt.network.packet.in.UnsubscribeAckInPacket; import javasabr.mqtt.network.packet.in.UnsubscribeInPacket; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; -import java.util.NoSuchElementException; -import java.util.function.Consumer; import javasabr.mqtt.network.utils.MqttDataUtils; import javasabr.rlib.common.util.ArrayUtils; import javasabr.rlib.common.util.NumberUtils; import javasabr.rlib.functions.ByteFunction; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.packet.impl.AbstractPacketReader; +import javasabr.rlib.network.packet.impl.AbstractNetworkPacketReader; import org.jspecify.annotations.Nullable; -public class MqttPacketReader extends AbstractPacketReader { +public class MqttPacketReader extends AbstractNetworkPacketReader { private static final int PACKET_LENGTH_START_BYTE = 2; @@ -67,26 +67,18 @@ protected boolean canStartReadPacket(ByteBuffer buffer) { } @Override - protected int getDataLength(int packetLength, int readBytes, ByteBuffer buffer) { - return packetLength - readBytes; - } - - @Override - protected int readPacketLength(ByteBuffer buffer) { - + protected int readFullPacketLength(ByteBuffer buffer) { // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901021 - var prevPos = buffer.position(); + int prevPos = buffer.position(); // skip first byte of packet type buffer.get(); - - var dataSize = MqttDataUtils.readMbi(buffer); + int dataSize = MqttDataUtils.readMbi(buffer); if (dataSize == -1) { return -1; } - var readBytes = buffer.position() - prevPos; - + int readBytes = buffer.position() - prevPos; return dataSize + readBytes; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/MqttPacketWriter.java b/network/src/main/java/javasabr/mqtt/network/packet/MqttPacketWriter.java index a576fcd3..011cff23 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/MqttPacketWriter.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/MqttPacketWriter.java @@ -1,28 +1,28 @@ package javasabr.mqtt.network.packet; -import javasabr.mqtt.network.MqttConnection; -import javasabr.mqtt.network.packet.out.MqttWritablePacket; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.packet.out.MqttWritablePacket; import javasabr.mqtt.network.utils.MqttDataUtils; +import javasabr.rlib.functions.ObjBoolConsumer; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.packet.WritablePacket; -import javasabr.rlib.network.packet.impl.AbstractPacketWriter; +import javasabr.rlib.network.packet.WritableNetworkPacket; +import javasabr.rlib.network.packet.impl.AbstractNetworkPacketWriter; import org.jspecify.annotations.Nullable; -public class MqttPacketWriter extends AbstractPacketWriter { +public class MqttPacketWriter extends AbstractNetworkPacketWriter { public MqttPacketWriter( MqttConnection connection, AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, Runnable updateActivityFunction, - Supplier<@Nullable WritablePacket> nextWritePacketSupplier, - Consumer writtenPacketHandler, - BiConsumer sentPacketHandler) { + Supplier<@Nullable WritableNetworkPacket> nextWritePacketSupplier, + Consumer> writtenPacketHandler, + ObjBoolConsumer> sentPacketHandler) { super( connection, channel, @@ -34,12 +34,12 @@ public MqttPacketWriter( } @Override - protected int getTotalSize(WritablePacket packet, int expectedLength) { + protected int totalSize(WritableNetworkPacket packet, int expectedLength) { return 1 + MqttDataUtils.sizeOfMbi(expectedLength) + expectedLength; } @Override - protected boolean onBeforeWrite( + protected boolean onBeforeSerialize( MqttWritablePacket packet, int expectedLength, int totalSize, @@ -51,28 +51,27 @@ protected boolean onBeforeWrite( } @Override - protected boolean onWrite( + protected boolean doSerialize( MqttWritablePacket packet, int expectedLength, int totalSize, ByteBuffer firstBuffer, ByteBuffer secondBuffer) { - if (!packet.write(secondBuffer)) { + if (!packet.write(connection, secondBuffer)) { return false; - } else { - secondBuffer.flip(); - return true; } + secondBuffer.flip(); + return true; } @Override - protected boolean onAfterWrite( + protected boolean onAfterSerialize( MqttWritablePacket packet, int expectedLength, int totalSize, ByteBuffer firstBuffer, ByteBuffer secondBuffer) { - firstBuffer.put((byte) packet.getPacketTypeAndFlags()); + firstBuffer.put((byte) packet.packetTypeAndFlags()); MqttDataUtils.writeMbi(secondBuffer.remaining(), firstBuffer); firstBuffer .put(secondBuffer) diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/AuthenticationInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/AuthenticationInPacket.java index 29ae02d0..e01f55d3 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/AuthenticationInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/AuthenticationInPacket.java @@ -1,12 +1,12 @@ package javasabr.mqtt.network.packet.in; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.reason.code.AuthenticateReasonCode; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.PacketType; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.rlib.common.util.ArrayUtils; import javasabr.rlib.common.util.StringUtils; import lombok.Getter; @@ -67,7 +67,7 @@ public AuthenticationInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/ConnectAckInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/ConnectAckInPacket.java index 3734c3f5..2978f199 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/ConnectAckInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/ConnectAckInPacket.java @@ -1,5 +1,8 @@ package javasabr.mqtt.network.packet.in; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.PacketProperty; @@ -8,9 +11,6 @@ import javasabr.mqtt.model.reason.code.ConnectAckReasonCode; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.PacketType; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.rlib.collections.array.MutableArray; import javasabr.rlib.common.util.ArrayUtils; import javasabr.rlib.common.util.NumberUtils; @@ -288,7 +288,7 @@ public ConnectAckInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/ConnectInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/ConnectInPacket.java index e8901f54..f78b9db3 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/ConnectInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/ConnectInPacket.java @@ -1,5 +1,9 @@ package javasabr.mqtt.network.packet.in; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.PacketProperty; @@ -8,10 +12,6 @@ import javasabr.mqtt.model.reason.code.ConnectAckReasonCode; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.PacketType; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.rlib.collections.array.MutableArray; import javasabr.rlib.common.util.ArrayUtils; import javasabr.rlib.common.util.NumberUtils; @@ -27,7 +27,7 @@ public class ConnectInPacket extends MqttReadablePacket { private static final byte PACKET_TYPE = (byte) PacketType.CONNECT.ordinal(); static { - DebugUtils.registerIncludedFields("clientId", "keepAlive", "cleanStart"); + DebugUtils.registerIncludedFields("clientId", "keepAlive", "cleanStart", "mqttVersion"); } private static final Set AVAILABLE_PROPERTIES = EnumSet.of( @@ -238,7 +238,7 @@ public ConnectInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } @@ -246,7 +246,7 @@ public byte getPacketType() { protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer) { // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030 - var protocolName = readString(buffer); + var protocolName = readString(buffer, Integer.MAX_VALUE); var protocolLevel = buffer.get(); mqttVersion = MqttVersion.of(protocolName, protocolLevel); @@ -310,14 +310,14 @@ protected void readPayload(MqttConnection connection, ByteBuffer buffer) { If the Server rejects the ClientID it MAY respond to the CONNECT packet with a CONNACK using Reason Code 0x85 (Client Identifier not valid) and then it MUST close the Network Connection */ - clientId = readString(buffer); + clientId = readString(buffer, Integer.MAX_VALUE); if (willFlag && mqttVersion.ordinal() >= MqttVersion.MQTT_5.ordinal()) { readProperties(buffer, WILL_PROPERTIES); } if (willFlag) { - willTopic = readString(buffer); + willTopic = readString(buffer, Integer.MAX_VALUE); } if (willFlag) { @@ -325,7 +325,7 @@ protected void readPayload(MqttConnection connection, ByteBuffer buffer) { } if (hasUserName) { - username = readString(buffer); + username = readString(buffer, Integer.MAX_VALUE); } if (hasPassword) { diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/DisconnectInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/DisconnectInPacket.java index e42f1efc..8b801408 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/DisconnectInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/DisconnectInPacket.java @@ -1,15 +1,15 @@ package javasabr.mqtt.network.packet.in; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.reason.code.DisconnectReasonCode; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.PacketType; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.rlib.common.util.StringUtils; import lombok.Getter; @@ -74,15 +74,15 @@ public DisconnectInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } @Override protected void readImpl(MqttConnection connection, ByteBuffer buffer) { this.sessionExpiryInterval = connection - .getClient() - .getSessionExpiryInterval(); + .client() + .sessionExpiryInterval(); super.readImpl(connection, buffer); } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/MqttReadablePacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/MqttReadablePacket.java index aad49d05..5c0e6df5 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/MqttReadablePacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/MqttReadablePacket.java @@ -1,5 +1,13 @@ package javasabr.mqtt.network.packet.in; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Set; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.data.type.StringPair; @@ -8,23 +16,15 @@ import javasabr.mqtt.model.exception.MqttException; import javasabr.mqtt.model.reason.code.ConnectAckReasonCode; import javasabr.mqtt.network.MqttConnection; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharsetDecoder; -import java.nio.charset.CodingErrorAction; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.Set; import javasabr.mqtt.network.utils.MqttDataUtils; import javasabr.rlib.collections.array.MutableArray; import javasabr.rlib.common.util.ArrayUtils; -import javasabr.rlib.network.packet.impl.AbstractReadablePacket; +import javasabr.rlib.network.packet.impl.AbstractReadableNetworkPacket; import lombok.Getter; import lombok.RequiredArgsConstructor; import org.jspecify.annotations.Nullable; -public abstract class MqttReadablePacket extends AbstractReadablePacket { +public abstract class MqttReadablePacket extends AbstractReadableNetworkPacket { static { DebugUtils.registerIncludedFields("userProperties"); @@ -67,7 +67,7 @@ protected MqttReadablePacket(byte info) { this.userProperties = EMPTY_PROPERTIES; } - public abstract byte getPacketType(); + public abstract byte packetType(); @Override protected void readImpl(MqttConnection connection, ByteBuffer buffer) { @@ -81,8 +81,8 @@ protected void readImpl(MqttConnection connection, ByteBuffer buffer) { } @Override - protected void handleException(ByteBuffer buffer, Exception exception) { - super.handleException(buffer, exception); + protected void handleException(MqttConnection connection, ByteBuffer buffer, Exception exception) { + super.handleException(connection, buffer, exception); if (!(exception instanceof MqttException)) { exception = new ConnectionRejectException(exception, ConnectAckReasonCode.PROTOCOL_ERROR); @@ -139,10 +139,10 @@ protected void readProperties(ByteBuffer buffer, Set availablePr applyProperty(property, MqttDataUtils.readMbi(buffer)); break; case UTF_8_STRING: - applyProperty(property, readString(buffer)); + applyProperty(property, readString(buffer, Integer.MAX_VALUE)); break; case UTF_8_STRING_PAIR: - applyProperty(property, new StringPair(readString(buffer), readString(buffer))); + applyProperty(property, new StringPair(readString(buffer, Integer.MAX_VALUE), readString(buffer, Integer.MAX_VALUE))); break; case BINARY: applyProperty(property, readBytes(buffer)); @@ -190,7 +190,7 @@ protected long readUnsignedInt(ByteBuffer buffer) { } @Override - protected String readString(ByteBuffer buffer) { + protected String readString(ByteBuffer buffer, int maxLength) { var utf8Decoder = LOCAL_DECODER.get(); var inBuffer = utf8Decoder.getInBuffer(); diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/PingRequestInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/PingRequestInPacket.java index b30be46f..6afc8214 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/PingRequestInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/PingRequestInPacket.java @@ -14,7 +14,7 @@ public PingRequestInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/PingResponseInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/PingResponseInPacket.java index 1da4fa23..bacd6b87 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/PingResponseInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/PingResponseInPacket.java @@ -14,7 +14,7 @@ public PingResponseInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/PublishAckInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/PublishAckInPacket.java index 81b46b83..c9762f5f 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/PublishAckInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/PublishAckInPacket.java @@ -1,21 +1,23 @@ package javasabr.mqtt.network.packet.in; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.reason.code.PublishAckReasonCode; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.HasPacketId; import javasabr.mqtt.network.packet.PacketType; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import lombok.Getter; +import lombok.experimental.Accessors; /** * Publish acknowledgment (QoS 1). */ @Getter +@Accessors(fluent = true, chain = false) public class PublishAckInPacket extends MqttReadablePacket implements HasPacketId { private static final int PACKET_TYPE = PacketType.PUBLISH_ACK.ordinal(); @@ -58,7 +60,7 @@ public PublishAckInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return (byte) PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/PublishCompleteInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/PublishCompleteInPacket.java index d4053685..2cfabd0e 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/PublishCompleteInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/PublishCompleteInPacket.java @@ -1,21 +1,23 @@ package javasabr.mqtt.network.packet.in; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.reason.code.PublishCompletedReasonCode; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.HasPacketId; import javasabr.mqtt.network.packet.PacketType; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import lombok.Getter; +import lombok.experimental.Accessors; /** * Publish complete (QoS 2 delivery part 3). */ @Getter +@Accessors(fluent = true, chain = false) public class PublishCompleteInPacket extends MqttReadablePacket implements HasPacketId { private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH_COMPLETED.ordinal(); @@ -58,7 +60,7 @@ public PublishCompleteInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/PublishInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/PublishInPacket.java index 5fa74710..b990e5f9 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/PublishInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/PublishInPacket.java @@ -3,16 +3,16 @@ import static javasabr.mqtt.model.utils.TopicUtils.EMPTY_TOPIC_NAME; import static javasabr.mqtt.model.utils.TopicUtils.buildTopicName; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.QoS; import javasabr.mqtt.model.topic.TopicName; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.PacketType; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.rlib.collections.array.ArrayFactory; import javasabr.rlib.collections.array.IntArray; import javasabr.rlib.collections.array.MutableIntArray; @@ -280,14 +280,14 @@ public PublishInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } @Override protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer) { // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718039 - topicName = buildTopicName(readString(buffer)); + topicName = buildTopicName(readString(buffer, Integer.MAX_VALUE)); packetId = qos != QoS.AT_MOST_ONCE ? readUnsignedShort(buffer) : 0; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/PublishReceivedInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/PublishReceivedInPacket.java index 01362ab1..b0974c8a 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/PublishReceivedInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/PublishReceivedInPacket.java @@ -1,22 +1,24 @@ package javasabr.mqtt.network.packet.in; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.reason.code.PublishReceivedReasonCode; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.HasPacketId; import javasabr.mqtt.network.packet.PacketType; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.rlib.common.util.StringUtils; import lombok.Getter; +import lombok.experimental.Accessors; /** * Publish received (QoS 2 delivery part 1). */ @Getter +@Accessors(fluent = true, chain = false) public class PublishReceivedInPacket extends MqttReadablePacket implements HasPacketId { private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH_RECEIVED.ordinal(); @@ -78,7 +80,7 @@ protected boolean isPropertiesSupported(MqttConnection connection, ByteBuffer bu } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/PublishReleaseInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/PublishReleaseInPacket.java index ce252242..af81b98a 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/PublishReleaseInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/PublishReleaseInPacket.java @@ -1,21 +1,23 @@ package javasabr.mqtt.network.packet.in; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.reason.code.PublishReleaseReasonCode; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.HasPacketId; import javasabr.mqtt.network.packet.PacketType; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import lombok.Getter; +import lombok.experimental.Accessors; /** * Publish release (QoS 2 delivery part 2). */ @Getter +@Accessors(fluent = true, chain = false) public class PublishReleaseInPacket extends MqttReadablePacket implements HasPacketId { private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH_RELEASED.ordinal(); @@ -58,7 +60,7 @@ public PublishReleaseInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/SubscribeAckInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/SubscribeAckInPacket.java index e2be7231..4809d729 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/SubscribeAckInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/SubscribeAckInPacket.java @@ -1,12 +1,12 @@ package javasabr.mqtt.network.packet.in; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.PacketType; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.rlib.collections.array.ArrayFactory; import javasabr.rlib.collections.array.MutableArray; import javasabr.rlib.common.util.StringUtils; @@ -53,7 +53,7 @@ public SubscribeAckInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/SubscribeInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/SubscribeInPacket.java index ac9bc0ba..3e4bc3c7 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/SubscribeInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/SubscribeInPacket.java @@ -2,6 +2,10 @@ import static javasabr.mqtt.model.utils.TopicUtils.buildTopicFilter; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.PacketProperty; @@ -10,10 +14,6 @@ import javasabr.mqtt.model.subscriber.SubscribeTopicFilter; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.PacketType; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.rlib.collections.array.ArrayFactory; import javasabr.rlib.collections.array.MutableArray; import javasabr.rlib.common.util.NumberUtils; @@ -62,7 +62,7 @@ public SubscribeInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } @@ -85,7 +85,7 @@ protected void readPayload(MqttConnection connection, ByteBuffer buffer) { // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901168 while (buffer.hasRemaining()) { - var topicFilter = readString(buffer); + var topicFilter = readString(buffer, Integer.MAX_VALUE); var options = readUnsignedByte(buffer); var qos = QoS.of(options & 0x03); diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/UnsubscribeAckInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/UnsubscribeAckInPacket.java index 6a65ce5f..2c4c1764 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/UnsubscribeAckInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/UnsubscribeAckInPacket.java @@ -1,13 +1,13 @@ package javasabr.mqtt.network.packet.in; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.PacketType; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.rlib.collections.array.ArrayFactory; import javasabr.rlib.collections.array.MutableArray; import javasabr.rlib.common.util.StringUtils; @@ -55,7 +55,7 @@ public UnsubscribeAckInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/in/UnsubscribeInPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/in/UnsubscribeInPacket.java index 2d18cf2e..3339a362 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/in/UnsubscribeInPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/in/UnsubscribeInPacket.java @@ -2,13 +2,13 @@ import static javasabr.mqtt.model.utils.TopicUtils.buildTopicFilter; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.topic.TopicFilter; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.PacketType; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.rlib.collections.array.ArrayFactory; import javasabr.rlib.collections.array.MutableArray; import lombok.Getter; @@ -37,7 +37,7 @@ public UnsubscribeInPacket(byte info) { } @Override - public byte getPacketType() { + public byte packetType() { return PACKET_TYPE; } @@ -54,7 +54,7 @@ protected void readPayload(MqttConnection connection, ByteBuffer buffer) { } while (buffer.hasRemaining()) { - topicFilters.add(buildTopicFilter(readString(buffer))); + topicFilters.add(buildTopicFilter(readString(buffer, Integer.MAX_VALUE))); } } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/Authentication5OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/Authentication5OutPacket.java index 67562ebe..7c6994e9 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/Authentication5OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/Authentication5OutPacket.java @@ -1,12 +1,12 @@ package javasabr.mqtt.network.packet.out; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.data.type.StringPair; import javasabr.mqtt.model.reason.code.AuthenticateReasonCode; import javasabr.mqtt.network.packet.PacketType; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.rlib.collections.array.Array; import lombok.RequiredArgsConstructor; @@ -59,7 +59,7 @@ public class Authentication5OutPacket extends MqttWritablePacket { private final byte[] authenticateData; @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/Connect311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/Connect311OutPacket.java index 57f46dd9..01abfe82 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/Connect311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/Connect311OutPacket.java @@ -1,9 +1,9 @@ package javasabr.mqtt.network.packet.out; +import java.nio.ByteBuffer; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.QoS; import javasabr.mqtt.network.packet.PacketType; -import java.nio.ByteBuffer; import javasabr.rlib.common.util.ArrayUtils; import javasabr.rlib.common.util.StringUtils; import lombok.RequiredArgsConstructor; @@ -48,7 +48,7 @@ protected MqttVersion getMqttVersion() { } @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/Connect5OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/Connect5OutPacket.java index 15d0bf72..260a84ef 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/Connect5OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/Connect5OutPacket.java @@ -1,13 +1,13 @@ package javasabr.mqtt.network.packet.out; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.QoS; import javasabr.mqtt.model.data.type.StringPair; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.mqtt.network.utils.MqttDataUtils; import javasabr.rlib.collections.array.Array; import javasabr.rlib.common.util.ArrayUtils; diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/ConnectAck311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/ConnectAck311OutPacket.java index bfa41235..f17f1083 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/ConnectAck311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/ConnectAck311OutPacket.java @@ -1,9 +1,10 @@ package javasabr.mqtt.network.packet.out; +import java.nio.ByteBuffer; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.reason.code.ConnectAckReasonCode; +import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.PacketType; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; import lombok.RequiredArgsConstructor; /** @@ -35,23 +36,23 @@ public class ConnectAck311OutPacket extends MqttWritablePacket { private final boolean sessionPresent; @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } @Override - public int getExpectedLength() { - return 2; + public int expectedLength(MqttConnection connection) { + return PACKET_ID_SIZE; } @Override protected void writeVariableHeader(ByteBuffer buffer) { // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718035 buffer.put((byte) (sessionPresent ? 0x01 : 0x00)); - buffer.put(getReasonCodeValue()); + buffer.put(reasonCodeValue()); } - protected byte getReasonCodeValue() { + protected byte reasonCodeValue() { return reasonCode.getMqtt311(); } } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/ConnectAck5OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/ConnectAck5OutPacket.java index 2d094f0d..2d39e102 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/ConnectAck5OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/ConnectAck5OutPacket.java @@ -1,14 +1,15 @@ package javasabr.mqtt.network.packet.out; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.QoS; import javasabr.mqtt.model.data.type.StringPair; import javasabr.mqtt.model.reason.code.ConnectAckReasonCode; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; +import javasabr.mqtt.network.MqttConnection; import javasabr.rlib.collections.array.Array; /** @@ -298,12 +299,12 @@ public ConnectAck5OutPacket( } @Override - public int getExpectedLength() { - return -1; + public int expectedLength(MqttConnection connection) { + return UNKNOWN_EXPECTED_BYTES; } @Override - protected byte getReasonCodeValue() { + protected byte reasonCodeValue() { return reasonCode.getMqtt5(); } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/Disconnect311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/Disconnect311OutPacket.java index 855221ae..27bc4442 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/Disconnect311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/Disconnect311OutPacket.java @@ -1,5 +1,6 @@ package javasabr.mqtt.network.packet.out; +import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.PacketType; /** @@ -10,12 +11,12 @@ public class Disconnect311OutPacket extends MqttWritablePacket { private static final byte PACKET_TYPE = (byte) PacketType.DISCONNECT.ordinal(); @Override - public int getExpectedLength() { + public int expectedLength(MqttConnection connection) { return 0; } @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/Disconnect5OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/Disconnect5OutPacket.java index b98ec2bf..87fa8599 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/Disconnect5OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/Disconnect5OutPacket.java @@ -1,12 +1,12 @@ package javasabr.mqtt.network.packet.out; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.data.type.StringPair; import javasabr.mqtt.model.reason.code.DisconnectReasonCode; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; import javasabr.rlib.collections.array.Array; import lombok.RequiredArgsConstructor; @@ -56,11 +56,6 @@ public class Disconnect5OutPacket extends Disconnect311OutPacket { private final long sessionExpiryInterval; - @Override - public int getExpectedLength() { - return -1; - } - @Override protected void writeVariableHeader(ByteBuffer buffer) { // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901207 diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/MqttWritablePacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/MqttWritablePacket.java index da551548..97590566 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/MqttWritablePacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/MqttWritablePacket.java @@ -1,24 +1,27 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.model.PacketProperty; -import javasabr.mqtt.model.data.type.StringPair; -import javasabr.mqtt.base.utils.DebugUtils; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import javasabr.mqtt.base.utils.DebugUtils; +import javasabr.mqtt.model.PacketProperty; +import javasabr.mqtt.model.data.type.StringPair; +import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.utils.MqttDataUtils; import javasabr.rlib.collections.array.Array; import javasabr.rlib.common.util.NumberUtils; -import javasabr.rlib.network.packet.impl.AbstractWritablePacket; +import javasabr.rlib.network.packet.impl.AbstractWritableNetworkPacket; import lombok.RequiredArgsConstructor; @RequiredArgsConstructor -public abstract class MqttWritablePacket extends AbstractWritablePacket { +public abstract class MqttWritablePacket extends AbstractWritableNetworkPacket { private static final ThreadLocal LOCAL_BUFFER = ThreadLocal.withInitial(() -> ByteBuffer.allocate( 1024 * 1024)); + protected static final int PACKET_ID_SIZE = 2; + @Override - protected void writeImpl(ByteBuffer buffer) { + protected void writeImpl(MqttConnection connection, ByteBuffer buffer) { writeVariableHeader(buffer); if (isPropertiesSupported()) { @@ -28,32 +31,27 @@ protected void writeImpl(ByteBuffer buffer) { writePayload(buffer); } - protected void writeVariableHeader(ByteBuffer buffer) { - } + protected void writeVariableHeader(ByteBuffer buffer) {} - protected void writePayload(ByteBuffer buffer) { - } + protected void writePayload(ByteBuffer buffer) {} protected boolean isPropertiesSupported() { return false; } - protected void writeProperties(ByteBuffer buffer) { - } - - public final int getPacketTypeAndFlags() { - - var type = getPacketType(); - var controlFlags = getPacketFlags(); + protected void writeProperties(ByteBuffer buffer) {} + public final int packetTypeAndFlags() { + byte type = packetType(); + byte controlFlags = packetFlags(); return NumberUtils.setHighByteBits(controlFlags, type); } - protected byte getPacketType() { + protected byte packetType() { throw new UnsupportedOperationException(); } - protected byte getPacketFlags() { + protected byte packetFlags() { return 0; } @@ -129,7 +127,6 @@ public void writeProperty( } public void writeProperty(ByteBuffer buffer, PacketProperty property, StringPair value) { - buffer.put(property.getId()); writeString(buffer, value.getName()); writeString(buffer, value.getValue()); @@ -174,7 +171,7 @@ public void writeStringPairProperties( return; } - for (var pair : pairs) { + for (StringPair pair : pairs) { buffer.put(property.getId()); writeStringPair(buffer, pair); } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/PingRequest311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/PingRequest311OutPacket.java index c8af6b19..85d59bc6 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/PingRequest311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/PingRequest311OutPacket.java @@ -10,7 +10,7 @@ public class PingRequest311OutPacket extends MqttWritablePacket { private static final byte PACKET_TYPE = (byte) PacketType.PING_REQUEST.ordinal(); @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/PingResponse311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/PingResponse311OutPacket.java index 34761038..b708d12a 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/PingResponse311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/PingResponse311OutPacket.java @@ -10,7 +10,7 @@ public class PingResponse311OutPacket extends MqttWritablePacket { private static final byte PACKET_TYPE = (byte) PacketType.PING_RESPONSE.ordinal(); @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/Publish311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/Publish311OutPacket.java index 83df2bb7..45f35174 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/Publish311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/Publish311OutPacket.java @@ -1,8 +1,9 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.model.QoS; -import javasabr.mqtt.base.utils.DebugUtils; import java.nio.ByteBuffer; +import javasabr.mqtt.base.utils.DebugUtils; +import javasabr.mqtt.model.QoS; +import javasabr.mqtt.network.MqttConnection; public class Publish311OutPacket extends PublishOutPacket { @@ -33,12 +34,12 @@ public Publish311OutPacket( } @Override - public int getExpectedLength() { + public int expectedLength(MqttConnection connection) { return 7 + payload.length; } @Override - protected byte getPacketFlags() { + protected byte packetFlags() { byte info = (byte) (qos.ordinal() << 1); diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/Publish5OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/Publish5OutPacket.java index b065718f..18f3c879 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/Publish5OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/Publish5OutPacket.java @@ -1,13 +1,14 @@ package javasabr.mqtt.network.packet.out; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.PacketProperty; import javasabr.mqtt.model.QoS; import javasabr.mqtt.model.data.type.StringPair; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; +import javasabr.mqtt.network.MqttConnection; import javasabr.rlib.collections.array.Array; public class Publish5OutPacket extends Publish311OutPacket { @@ -163,6 +164,11 @@ public Publish5OutPacket( this.userProperties = userProperties; } + @Override + public int expectedLength(MqttConnection connection) { + return UNKNOWN_EXPECTED_BYTES; + } + @Override protected boolean isPropertiesSupported() { return true; diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishAck311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishAck311OutPacket.java index 5b5122c9..f72ff602 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishAck311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishAck311OutPacket.java @@ -1,7 +1,8 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.network.packet.PacketType; import java.nio.ByteBuffer; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.packet.PacketType; import lombok.RequiredArgsConstructor; /** @@ -18,12 +19,12 @@ public class PublishAck311OutPacket extends MqttWritablePacket { private final int packetId; @Override - public int getExpectedLength() { + public int expectedLength(MqttConnection connection) { return 2; } @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishAck5OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishAck5OutPacket.java index d8018762..8147c731 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishAck5OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishAck5OutPacket.java @@ -1,11 +1,11 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.model.PacketProperty; -import javasabr.mqtt.model.data.type.StringPair; -import javasabr.mqtt.model.reason.code.PublishAckReasonCode; import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.Set; +import javasabr.mqtt.model.PacketProperty; +import javasabr.mqtt.model.data.type.StringPair; +import javasabr.mqtt.model.reason.code.PublishAckReasonCode; import javasabr.rlib.collections.array.Array; /** @@ -49,11 +49,6 @@ public PublishAck5OutPacket( this.reason = reason; } - @Override - public int getExpectedLength() { - return -1; - } - @Override protected boolean isPropertiesSupported() { return true; diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishComplete311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishComplete311OutPacket.java index 0cb1923b..51ebfcd3 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishComplete311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishComplete311OutPacket.java @@ -1,7 +1,8 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.network.packet.PacketType; import java.nio.ByteBuffer; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.packet.PacketType; import lombok.RequiredArgsConstructor; /** @@ -15,12 +16,12 @@ public class PublishComplete311OutPacket extends MqttWritablePacket { private final int packetId; @Override - public int getExpectedLength() { + public int expectedLength(MqttConnection connection) { return 2; } @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishComplete5OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishComplete5OutPacket.java index 2f50b113..38d38efc 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishComplete5OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishComplete5OutPacket.java @@ -1,11 +1,12 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.model.PacketProperty; -import javasabr.mqtt.model.data.type.StringPair; -import javasabr.mqtt.model.reason.code.PublishCompletedReasonCode; import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.Set; +import javasabr.mqtt.model.PacketProperty; +import javasabr.mqtt.model.data.type.StringPair; +import javasabr.mqtt.model.reason.code.PublishCompletedReasonCode; +import javasabr.mqtt.network.MqttConnection; import javasabr.rlib.collections.array.Array; import javasabr.rlib.common.util.StringUtils; @@ -55,8 +56,8 @@ public PublishComplete5OutPacket( } @Override - public int getExpectedLength() { - return -1; + public int expectedLength(MqttConnection connection) { + return UNKNOWN_EXPECTED_BYTES; } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishOutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishOutPacket.java index 872613d0..88efef7b 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishOutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishOutPacket.java @@ -4,7 +4,9 @@ import javasabr.mqtt.network.packet.PacketType; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.experimental.Accessors; +@Accessors(fluent = true) @RequiredArgsConstructor public abstract class PublishOutPacket extends MqttWritablePacket implements HasPacketId { @@ -14,7 +16,7 @@ public abstract class PublishOutPacket extends MqttWritablePacket implements Has protected final int packetId; @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishReceived311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishReceived311OutPacket.java index 37819ae3..cb5e2a76 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishReceived311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishReceived311OutPacket.java @@ -1,7 +1,8 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.network.packet.PacketType; import java.nio.ByteBuffer; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.packet.PacketType; import lombok.RequiredArgsConstructor; /** @@ -15,12 +16,12 @@ public class PublishReceived311OutPacket extends MqttWritablePacket { private final int packetId; @Override - public int getExpectedLength() { + public int expectedLength(MqttConnection connection) { return 2; } @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishReceived5OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishReceived5OutPacket.java index d4773b1b..b276c20a 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishReceived5OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishReceived5OutPacket.java @@ -1,11 +1,12 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.model.PacketProperty; -import javasabr.mqtt.model.data.type.StringPair; -import javasabr.mqtt.model.reason.code.PublishReceivedReasonCode; import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.Set; +import javasabr.mqtt.model.PacketProperty; +import javasabr.mqtt.model.data.type.StringPair; +import javasabr.mqtt.model.reason.code.PublishReceivedReasonCode; +import javasabr.mqtt.network.MqttConnection; import javasabr.rlib.collections.array.Array; import javasabr.rlib.common.util.StringUtils; @@ -55,8 +56,8 @@ public PublishReceived5OutPacket( } @Override - public int getExpectedLength() { - return -1; + public int expectedLength(MqttConnection connection) { + return UNKNOWN_EXPECTED_BYTES; } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishRelease311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishRelease311OutPacket.java index df92b5f8..bddcffca 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishRelease311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishRelease311OutPacket.java @@ -1,7 +1,8 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.network.packet.PacketType; import java.nio.ByteBuffer; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.packet.PacketType; import lombok.RequiredArgsConstructor; /** @@ -15,18 +16,18 @@ public class PublishRelease311OutPacket extends MqttWritablePacket { private final int packetId; @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } @Override - protected byte getPacketFlags() { + protected byte packetFlags() { return 2; } @Override - public int getExpectedLength() { - return 2; + public int expectedLength(MqttConnection connection) { + return PACKET_ID_SIZE; } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishRelease5OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishRelease5OutPacket.java index 2ac17087..f91bc27c 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/PublishRelease5OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/PublishRelease5OutPacket.java @@ -1,11 +1,12 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.model.PacketProperty; -import javasabr.mqtt.model.data.type.StringPair; -import javasabr.mqtt.model.reason.code.PublishReleaseReasonCode; import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.Set; +import javasabr.mqtt.model.PacketProperty; +import javasabr.mqtt.model.data.type.StringPair; +import javasabr.mqtt.model.reason.code.PublishReleaseReasonCode; +import javasabr.mqtt.network.MqttConnection; import javasabr.rlib.collections.array.Array; /** @@ -50,8 +51,8 @@ public PublishRelease5OutPacket( } @Override - public int getExpectedLength() { - return -1; + public int expectedLength(MqttConnection connection) { + return UNKNOWN_EXPECTED_BYTES; } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/Subscribe311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/Subscribe311OutPacket.java index 5595c593..3ab13cdd 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/Subscribe311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/Subscribe311OutPacket.java @@ -1,8 +1,8 @@ package javasabr.mqtt.network.packet.out; +import java.nio.ByteBuffer; import javasabr.mqtt.model.subscriber.SubscribeTopicFilter; import javasabr.mqtt.network.packet.PacketType; -import java.nio.ByteBuffer; import javasabr.rlib.collections.array.Array; import lombok.RequiredArgsConstructor; @@ -18,7 +18,7 @@ public class Subscribe311OutPacket extends MqttWritablePacket { private final int packetId; @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/Subscribe5OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/Subscribe5OutPacket.java index c517ab60..e618c11e 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/Subscribe5OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/Subscribe5OutPacket.java @@ -1,12 +1,12 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.model.MqttProperties; -import javasabr.mqtt.model.PacketProperty; -import javasabr.mqtt.model.subscriber.SubscribeTopicFilter; -import javasabr.mqtt.model.data.type.StringPair; import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.Set; +import javasabr.mqtt.model.MqttProperties; +import javasabr.mqtt.model.PacketProperty; +import javasabr.mqtt.model.data.type.StringPair; +import javasabr.mqtt.model.subscriber.SubscribeTopicFilter; import javasabr.rlib.collections.array.Array; /** diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/SubscribeAck311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/SubscribeAck311OutPacket.java index dc80f280..407ffcfc 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/SubscribeAck311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/SubscribeAck311OutPacket.java @@ -1,16 +1,20 @@ package javasabr.mqtt.network.packet.out; +import java.nio.ByteBuffer; +import javasabr.mqtt.base.utils.DebugUtils; import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode; +import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.PacketType; -import javasabr.mqtt.base.utils.DebugUtils; -import java.nio.ByteBuffer; import javasabr.rlib.collections.array.Array; +import lombok.AccessLevel; import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; /** * Subscribe acknowledgement. */ @RequiredArgsConstructor +@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class SubscribeAck311OutPacket extends MqttWritablePacket { private static final byte PACKET_TYPE = (byte) PacketType.SUBSCRIBE_ACK.ordinal(); @@ -22,20 +26,20 @@ public class SubscribeAck311OutPacket extends MqttWritablePacket { /** * The order of Reason Codes in the SUBACK packet MUST match the order of Topic Filters in the SUBSCRIBE packet. */ - private final Array reasonCodes; + Array reasonCodes; /** * The Packet Identifier from the SUBSCRIBE. */ - private final int packetId; + int packetId; @Override - public int getExpectedLength() { + public int expectedLength(MqttConnection connection) { return 2 + reasonCodes.size(); } @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/SubscribeAck5OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/SubscribeAck5OutPacket.java index daa4be01..6b48b501 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/SubscribeAck5OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/SubscribeAck5OutPacket.java @@ -1,12 +1,13 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.model.PacketProperty; -import javasabr.mqtt.model.data.type.StringPair; -import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode; -import javasabr.mqtt.base.utils.DebugUtils; import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.Set; +import javasabr.mqtt.base.utils.DebugUtils; +import javasabr.mqtt.model.PacketProperty; +import javasabr.mqtt.model.data.type.StringPair; +import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode; +import javasabr.mqtt.network.MqttConnection; import javasabr.rlib.collections.array.Array; /** @@ -52,13 +53,13 @@ public SubscribeAck5OutPacket( } @Override - protected boolean isPropertiesSupported() { - return true; + public int expectedLength(MqttConnection connection) { + return UNKNOWN_EXPECTED_BYTES; } @Override - public int getExpectedLength() { - return -1; + protected boolean isPropertiesSupported() { + return true; } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/UnsubscribeAck311OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/UnsubscribeAck311OutPacket.java index 562dfb97..9c4706c3 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/UnsubscribeAck311OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/UnsubscribeAck311OutPacket.java @@ -1,7 +1,8 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.network.packet.PacketType; import java.nio.ByteBuffer; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.packet.PacketType; import lombok.RequiredArgsConstructor; /** @@ -15,12 +16,12 @@ public class UnsubscribeAck311OutPacket extends MqttWritablePacket { private final int packetId; @Override - public int getExpectedLength() { + public int expectedLength(MqttConnection connection) { return 2; } @Override - protected byte getPacketType() { + protected byte packetType() { return PACKET_TYPE; } diff --git a/network/src/main/java/javasabr/mqtt/network/packet/out/UnsubscribeAck5OutPacket.java b/network/src/main/java/javasabr/mqtt/network/packet/out/UnsubscribeAck5OutPacket.java index e10f1774..233916c4 100644 --- a/network/src/main/java/javasabr/mqtt/network/packet/out/UnsubscribeAck5OutPacket.java +++ b/network/src/main/java/javasabr/mqtt/network/packet/out/UnsubscribeAck5OutPacket.java @@ -1,11 +1,11 @@ package javasabr.mqtt.network.packet.out; -import javasabr.mqtt.model.PacketProperty; -import javasabr.mqtt.model.data.type.StringPair; -import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode; import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.Set; +import javasabr.mqtt.model.PacketProperty; +import javasabr.mqtt.model.data.type.StringPair; +import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode; import javasabr.rlib.collections.array.Array; /** @@ -54,11 +54,6 @@ protected boolean isPropertiesSupported() { return true; } - @Override - public int getExpectedLength() { - return -1; - } - @Override protected void writeProperties(ByteBuffer buffer) { diff --git a/network/src/main/java/javasabr/mqtt/network/utils/MqttDataUtils.java b/network/src/main/java/javasabr/mqtt/network/utils/MqttDataUtils.java index 7d70ddb5..4cff8a80 100644 --- a/network/src/main/java/javasabr/mqtt/network/utils/MqttDataUtils.java +++ b/network/src/main/java/javasabr/mqtt/network/utils/MqttDataUtils.java @@ -14,15 +14,15 @@ public class MqttDataUtils { */ public static ByteBuffer writeMbi(int number, ByteBuffer buffer) { - var sizeInBytes = 0; - var valueToWrite = number; + int sizeInBytes = 0; + int valueToWrite = number; do { var digit = (byte) (valueToWrite % 128); valueToWrite = valueToWrite / 128; if (valueToWrite > 0) { - digit |= 0x80; + digit |= (byte) 0x80; } buffer.put(digit); @@ -45,7 +45,7 @@ public static ByteBuffer writeMbi(int number, ByteBuffer buffer) { */ public static int readMbi(ByteBuffer buffer) { - var originalPos = buffer.position(); + int originalPos = buffer.position(); int result = 0; int multiplier = 1; diff --git a/service/src/main/java/javasabr/mqtt/service/handler/client/AbstractMqttClientReleaseHandler.java b/service/src/main/java/javasabr/mqtt/service/handler/client/AbstractMqttClientReleaseHandler.java index d70b8754..ae66975c 100644 --- a/service/src/main/java/javasabr/mqtt/service/handler/client/AbstractMqttClientReleaseHandler.java +++ b/service/src/main/java/javasabr/mqtt/service/handler/client/AbstractMqttClientReleaseHandler.java @@ -22,32 +22,30 @@ public abstract class AbstractMqttClientReleaseHandler release(UnsafeMqttClient client) { - var clientId = client.getClientId(); + var clientId = client.clientId(); //noinspection unchecked - return releaseImpl((T) client).doOnNext(aVoid -> log.info(clientId, "Client:[%s] was released"::formatted)); + return releaseImpl((T) client).doOnNext(aVoid -> log.info(clientId, "[%s] Client was released"::formatted)); } protected Mono releaseImpl(T client) { - var clientId = client.getClientId(); - client.setClientId(StringUtils.EMPTY); + var clientId = client.clientId(); + client.clientId(StringUtils.EMPTY); if (StringUtils.isEmpty(clientId)) { - log.warning(client, "This client:[%s] is already released or rejected"::formatted); + log.warning(client.clientId(), "[%s] This client is already released or rejected"::formatted); return Mono.empty(); } - var session = client.getSession(); + var session = client.session(); Mono asyncActions = null; if (session != null) { subscriptionService.cleanSubscriptions(client, session); - if (client - .getConnectionConfig() - .isSessionsEnabled()) { - asyncActions = sessionService.store(clientId, session, client.getSessionExpiryInterval()); - client.setSession(null); + if (client.connectionConfig().sessionsEnabled()) { + asyncActions = sessionService.store(clientId, session, client.sessionExpiryInterval()); + client.session(null); } } diff --git a/service/src/main/java/javasabr/mqtt/service/handler/in/ConnectInPacketHandler.java b/service/src/main/java/javasabr/mqtt/service/handler/in/ConnectInPacketHandler.java index 33de33dc..e3ec90bc 100644 --- a/service/src/main/java/javasabr/mqtt/service/handler/in/ConnectInPacketHandler.java +++ b/service/src/main/java/javasabr/mqtt/service/handler/in/ConnectInPacketHandler.java @@ -25,7 +25,6 @@ import javasabr.rlib.common.util.StringUtils; import lombok.CustomLog; import lombok.RequiredArgsConstructor; -import lombok.extern.log4j.Log4j2; import reactor.core.publisher.Mono; @CustomLog @@ -40,8 +39,8 @@ public class ConnectInPacketHandler extends AbstractPacketHandler registerClient(UnsafeMqttClient client, ConnectInPacket pa if (StringUtils.isNotEmpty(requestedClientId)) { return clientIdRegistry .register(requestedClientId) - .map(ifTrue(requestedClientId, client::setClientId)); + .map(ifTrue(requestedClientId, client::clientId)); } else { var mqttVersion = client - .getConnection() - .getMqttVersion(); + .connection() + .mqttVersion(); // we can't assign generated client if for mqtt version less than 5 if (mqttVersion.ordinal() < MqttVersion.MQTT_5.ordinal()) { @@ -77,7 +76,7 @@ private Mono registerClient(UnsafeMqttClient client, ConnectInPacket pa .generate() .flatMap(newClientId -> clientIdRegistry .register(newClientId) - .map(ifTrue(newClientId, client::setClientId))); + .map(ifTrue(newClientId, client::clientId))); } } @@ -85,14 +84,14 @@ private Mono restoreSession(UnsafeMqttClient client, ConnectInPacket pa if (packet.isCleanStart()) { return mqttSessionService - .create(client.getClientId()) + .create(client.clientId()) .flatMap(session -> onConnected(client, packet, session, false)); } else { return mqttSessionService - .restore(client.getClientId()) + .restore(client.clientId()) .flatMap(session -> onConnected(client, packet, session, true)) .switchIfEmpty(Mono.defer(() -> mqttSessionService - .create(client.getClientId()) + .create(client.clientId()) .flatMap(session -> onConnected(client, packet, session, false)))); } } @@ -103,44 +102,44 @@ private Mono onConnected( MqttSession session, boolean sessionRestored) { - var connection = client.getConnection(); - var config = connection.getConfig(); + var connection = client.connection(); + var config = connection.config(); // if it was closed in parallel - if (connection.isClosed() && config.isSessionsEnabled()) { + if (connection.closed() && config.sessionsEnabled()) { // store the session again - return mqttSessionService.store(client.getClientId(), session, config.getDefaultSessionExpiryInterval()); + return mqttSessionService.store(client.clientId(), session, config.defaultSessionExpiryInterval()); } // select result keep alive time - var minimalKeepAliveTime = Math.max(config.getMinKeepAliveTime(), packet.getKeepAlive()); - var keepAlive = config.isKeepAliveEnabled() ? minimalKeepAliveTime : SERVER_KEEP_ALIVE_DISABLED; + var minimalKeepAliveTime = Math.max(config.minKeepAliveTime(), packet.getKeepAlive()); + var keepAlive = config.keepAliveEnabled() ? minimalKeepAliveTime : SERVER_KEEP_ALIVE_DISABLED; // select result session expiry interval - var sessionExpiryInterval = config.isSessionsEnabled() + var sessionExpiryInterval = config.sessionsEnabled() ? packet.getSessionExpiryInterval() : SESSION_EXPIRY_INTERVAL_DISABLED; if (sessionExpiryInterval == SESSION_EXPIRY_INTERVAL_UNDEFINED) { - sessionExpiryInterval = config.getDefaultSessionExpiryInterval(); + sessionExpiryInterval = config.defaultSessionExpiryInterval(); } // select result receive max var receiveMax = packet.getReceiveMax() == RECEIVE_MAXIMUM_UNDEFINED - ? config.getReceiveMaximum() - : Math.min(packet.getReceiveMax(), config.getReceiveMaximum()); + ? config.receiveMaximum() + : Math.min(packet.getReceiveMax(), config.receiveMaximum()); // select result maximum packet size var maximumPacketSize = packet.getMaximumPacketSize() == MAXIMUM_PACKET_SIZE_UNDEFINED - ? config.getMaximumPacketSize() - : Math.min(packet.getMaximumPacketSize(), config.getMaximumPacketSize()); + ? config.maximumPacketSize() + : Math.min(packet.getMaximumPacketSize(), config.maximumPacketSize()); // select result topic alias maximum var topicAliasMaximum = packet.getTopicAliasMaximum() == TOPIC_ALIAS_MAXIMUM_UNDEFINED ? TOPIC_ALIAS_MAXIMUM_DISABLED - : Math.min(packet.getTopicAliasMaximum(), config.getTopicAliasMaximum()); + : Math.min(packet.getTopicAliasMaximum(), config.topicAliasMaximum()); - client.setSession(session); + client.session(session); client.configure( sessionExpiryInterval, receiveMax, @@ -151,7 +150,7 @@ private Mono onConnected( packet.isRequestProblemInformation()); var connectAck = client - .getPacketOutFactory() + .packetOutFactory() .newConnectAck( client, ConnectAckReasonCode.SUCCESS, @@ -171,7 +170,7 @@ private Mono onConnected( private boolean onSentConnAck(UnsafeMqttClient client, MqttSession session, boolean result) { if (!result) { - log.warning(client.getClientId(), "Was issue with sending conn ack packet to client:[%s]"::formatted); + log.warning(client.clientId(), "Was issue with sending conn ack packet to client:[%s]"::formatted); return false; } diff --git a/service/src/main/java/javasabr/mqtt/service/handler/in/DisconnetInPacketHandler.java b/service/src/main/java/javasabr/mqtt/service/handler/in/DisconnetInPacketHandler.java index 173bb274..32b171e8 100644 --- a/service/src/main/java/javasabr/mqtt/service/handler/in/DisconnetInPacketHandler.java +++ b/service/src/main/java/javasabr/mqtt/service/handler/in/DisconnetInPacketHandler.java @@ -20,7 +20,7 @@ protected void handleImpl(UnsafeMqttClient client, DisconnectInPacket packet) { } client - .getConnection() + .connection() .close(); } } diff --git a/service/src/main/java/javasabr/mqtt/service/handler/in/PendingOutResponseInPacketHandler.java b/service/src/main/java/javasabr/mqtt/service/handler/in/PendingOutResponseInPacketHandler.java index f5d85331..58524481 100644 --- a/service/src/main/java/javasabr/mqtt/service/handler/in/PendingOutResponseInPacketHandler.java +++ b/service/src/main/java/javasabr/mqtt/service/handler/in/PendingOutResponseInPacketHandler.java @@ -1,6 +1,7 @@ package javasabr.mqtt.service.handler.in; import javasabr.mqtt.network.MqttClient.UnsafeMqttClient; +import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.packet.HasPacketId; import javasabr.mqtt.network.packet.in.MqttReadablePacket; import lombok.RequiredArgsConstructor; @@ -11,7 +12,7 @@ public class PendingOutResponseInPacketHandler ackReasonCodes = subscriptionService.subscribe(client, packet.getTopicFilters()); MqttWritablePacket subscribeAck = client - .getPacketOutFactory() + .packetOutFactory() .newSubscribeAck(packet.getPacketId(), ackReasonCodes); client.send(subscribeAck); @@ -40,13 +40,13 @@ protected void handleImpl(UnsafeMqttClient client, SubscribeInPacket packet) { if (anyReason != null) { var disconnectReasonCode = DisconnectReasonCode.of(toUnsignedInt(anyReason.getValue())); MqttWritablePacket disconnect = client - .getPacketOutFactory() + .packetOutFactory() .newDisconnect(client, disconnectReasonCode); client .sendWithFeedback(disconnect) .thenAccept(result -> client - .getConnection() + .connection() .close()); } } diff --git a/service/src/main/java/javasabr/mqtt/service/handler/in/UnsubscribeInPacketHandler.java b/service/src/main/java/javasabr/mqtt/service/handler/in/UnsubscribeInPacketHandler.java index ea98e487..0215dcd1 100644 --- a/service/src/main/java/javasabr/mqtt/service/handler/in/UnsubscribeInPacketHandler.java +++ b/service/src/main/java/javasabr/mqtt/service/handler/in/UnsubscribeInPacketHandler.java @@ -14,7 +14,7 @@ public class UnsubscribeInPacketHandler extends AbstractPacketHandler create(String clientId) { public Mono store(String clientId, MqttSession session, long expiryInterval) { var unsafe = (UnsafeMqttSession) session; - unsafe.setExpirationTime(System.currentTimeMillis() + (expiryInterval * 1000)); + unsafe.expirationTime(System.currentTimeMillis() + (expiryInterval * 1000)); unsafe.onPersisted(); storedSession @@ -115,16 +115,16 @@ private static void removeExpiredSessions( MutableArray expired) { long time = System.currentTimeMillis(); for (UnsafeMqttSession session : expired) { - if (session.getExpirationTime() <= time) { + if (session.expirationTime() <= time) { continue; } - UnsafeMqttSession removed = sessions.remove(session.getClientId()); - log.debug(session.getClientId(), "Removed expired session for client:[%]"::formatted); + UnsafeMqttSession removed = sessions.remove(session.clientId()); + log.debug(session.clientId(), "Removed expired session for client:[%]"::formatted); // if we already have new session under the same client id if (removed != null && removed != session) { - sessions.put(session.getClientId(), removed); + sessions.put(session.clientId(), removed); } else if (removed != null) { removed.clear(); } @@ -136,7 +136,7 @@ private boolean findToRemove(MutableArray toCheck, MutableArr var currentTime = System.currentTimeMillis(); for (UnsafeMqttSession session : toCheck) { - if (session.getExpirationTime() > currentTime) { + if (session.expirationTime() > currentTime) { toRemove.add(session); } } diff --git a/service/src/main/java/javasabr/mqtt/service/impl/SimpleSubscriptionService.java b/service/src/main/java/javasabr/mqtt/service/impl/SimpleSubscriptionService.java index 22bab583..78265cf3 100644 --- a/service/src/main/java/javasabr/mqtt/service/impl/SimpleSubscriptionService.java +++ b/service/src/main/java/javasabr/mqtt/service/impl/SimpleSubscriptionService.java @@ -63,17 +63,17 @@ public Array subscribe(MqttClient mqttClient, Array unsubscribe(MqttClient mqttClient, Array< @Nullable private UnsubscribeAckReasonCode removeSubscription(TopicFilter topic, MqttClient client) { - var session = client.getSession(); + var session = client.session(); if (session == null) { return null; } else if (isInvalid(topic)) {