Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@
import javasabr.mqtt.service.PublishDeliveringService;
import javasabr.mqtt.service.PublishReceivingService;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.TopicService;
import javasabr.mqtt.service.handler.client.ExternalMqttClientReleaseHandler;
import javasabr.mqtt.service.impl.DefaultConnectionService;
import javasabr.mqtt.service.impl.DefaultMessageOutFactoryService;
import javasabr.mqtt.service.impl.DefaultMqttConnectionFactory;
import javasabr.mqtt.service.impl.DefaultPublishDeliveringService;
import javasabr.mqtt.service.impl.DefaultPublishReceivingService;
import javasabr.mqtt.service.impl.DefaultTopicService;
import javasabr.mqtt.service.impl.ExternalMqttClientFactory;
import javasabr.mqtt.service.impl.FileCredentialsSource;
import javasabr.mqtt.service.impl.InMemoryClientIdRegistry;
import javasabr.mqtt.service.impl.InMemorySubscriptionService;
import javasabr.mqtt.service.impl.SimpleAuthenticationService;
import javasabr.mqtt.service.impl.SimpleSubscriptionService;
import javasabr.mqtt.service.message.handler.MqttInMessageHandler;
import javasabr.mqtt.service.message.handler.impl.ConnectInMqttInMessageHandler;
import javasabr.mqtt.service.message.handler.impl.DisconnectMqttInMessageHandler;
Expand Down Expand Up @@ -96,7 +98,7 @@ AuthenticationService authenticationService(

@Bean
SubscriptionService subscriptionService() {
return new SimpleSubscriptionService();
return new InMemorySubscriptionService();
}

@Bean
Expand All @@ -115,6 +117,11 @@ MessageOutFactoryService mqttMessageOutFactoryService(
return new DefaultMessageOutFactoryService(knownFactories);
}

@Bean
TopicService topicService() {
return new DefaultTopicService();
}

@Bean
MqttInMessageHandler connectInMqttInMessageHandler(
ClientIdRegistry clientIdRegistry,
Expand All @@ -131,47 +138,58 @@ MqttInMessageHandler connectInMqttInMessageHandler(
}

@Bean
MqttInMessageHandler publishAckMqttInMessageHandler() {
return new PublishAckMqttInMessageHandler();
MqttInMessageHandler publishAckMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new PublishAckMqttInMessageHandler(messageOutFactoryService);
}

@Bean
MqttInMessageHandler publishCompleteMqttInMessageHandler() {
return new PublishCompleteMqttInMessageHandler();
MqttInMessageHandler publishCompleteMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new PublishCompleteMqttInMessageHandler(messageOutFactoryService);
}

@Bean
MqttInMessageHandler publishMqttInMessageHandler(PublishReceivingService publishReceivingService) {
return new PublishMqttInMessageHandler(publishReceivingService);
MqttInMessageHandler publishMqttInMessageHandler(
PublishReceivingService publishReceivingService,
MessageOutFactoryService messageOutFactoryService,
TopicService topicService) {
return new PublishMqttInMessageHandler(
publishReceivingService,
messageOutFactoryService,
topicService);
}

@Bean
MqttInMessageHandler publishReceiveMqttInMessageHandler() {
return new PublishReceiveMqttInMessageHandler();
MqttInMessageHandler publishReceiveMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new PublishReceiveMqttInMessageHandler(messageOutFactoryService);
}

@Bean
MqttInMessageHandler publishReleaseMqttInMessageHandler() {
return new PublishReleaseMqttInMessageHandler();
MqttInMessageHandler publishReleaseMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new PublishReleaseMqttInMessageHandler(messageOutFactoryService);
}

@Bean
MqttInMessageHandler disconnectMqttInMessageHandler() {
return new DisconnectMqttInMessageHandler();
MqttInMessageHandler disconnectMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new DisconnectMqttInMessageHandler(messageOutFactoryService);
}

@Bean
MqttInMessageHandler subscribeMqttInMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new SubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService);
MessageOutFactoryService messageOutFactoryService,
TopicService topicService) {
return new SubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService, topicService);
}

@Bean
MqttInMessageHandler unsubscribeMqttInMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new UnsubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService);
MessageOutFactoryService messageOutFactoryService,
TopicService topicService) {
return new UnsubscribeMqttInMessageHandler(
subscriptionService,
messageOutFactoryService,
topicService);
}

@Bean
Expand Down Expand Up @@ -252,11 +270,11 @@ MqttClientReleaseHandler externalMqttClientReleaseHandler(
@Bean
MqttServerConnectionConfig externalConnectionConfig(Environment env) {
return new MqttServerConnectionConfig(
QoS.of(env.getProperty("mqtt.connection.max.qos", int.class, 2)),
QoS.ofCode(env.getProperty("mqtt.connection.max.qos", int.class, 2)),
env.getProperty(
"mqtt.external.connection.max.packet.size",
"mqtt.external.connection.max.message.size",
int.class,
MqttProperties.MAXIMUM_PACKET_SIZE_DEFAULT),
MqttProperties.MAXIMUM_MESSAGE_SIZE_DEFAULT),
env.getProperty(
"mqtt.external.connection.max.string.length",
int.class,
Expand All @@ -265,18 +283,22 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
"mqtt.external.connection.max.binary.size",
int.class,
MqttProperties.MAXIMUM_BINARY_SIZE),
env.getProperty(
"mqtt.external.connection.max.topic.levels",
int.class,
MqttProperties.MAXIMUM_TOPIC_LEVELS),
env.getProperty(
"mqtt.external.connection.min.keep.alive",
int.class,
MqttProperties.SERVER_KEEP_ALIVE_DEFAULT),
env.getProperty(
"mqtt.external.connection.receive.maximum",
int.class,
MqttProperties.RECEIVE_MAXIMUM_DEFAULT),
MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_DEFAULT),
env.getProperty(
"mqtt.external.connection.topic.alias.maximum",
int.class,
MqttProperties.TOPIC_ALIAS_MAXIMUM_DISABLED),
MqttProperties.TOPIC_ALIAS_DEFAULT),
env.getProperty(
"mqtt.external.connection.default.session.expiration.time",
long.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode

import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.CompletableFuture

class ConnectSubscribePublishTest extends IntegrationSpecification {

def "publisher should publish message QoS 0 using mqtt 3.1.1"() {
def "should deliver publish message QoS 0 using mqtt 3.1.1"() {
given:
def received = new AtomicReference<Mqtt3Publish>()
def received = new CompletableFuture<Mqtt3Publish>()
def subscriber = buildExternalMqtt311Client()
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildExternalMqtt311Client()
Expand All @@ -26,7 +26,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publisher.connect().join()
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
Thread.sleep(100)
then:
noExceptionThrown()
subscribeResult != null
Expand All @@ -35,17 +34,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publishResult != null
publishResult.qos == MqttQos.AT_MOST_ONCE
publishResult.type == Mqtt3MessageType.PUBLISH
received.get() != null
received.get().qos == MqttQos.AT_MOST_ONCE
received.get().type == Mqtt3MessageType.PUBLISH
received.join() != null
received.join().qos == MqttQos.AT_MOST_ONCE
received.join().type == Mqtt3MessageType.PUBLISH
cleanup:
subscriber.disconnect().join()
publisher.disconnect().join()
}

def "publisher should publish message QoS 0 using mqtt 5"() {
def "should deliver publish message QoS 0 using mqtt 5"() {
given:
def received = new AtomicReference<Mqtt5Publish>()
def received = new CompletableFuture<Mqtt5Publish>()
def subscriber = buildExternalMqtt5Client()
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildExternalMqtt5Client()
Expand All @@ -54,7 +53,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publisher.connect().join()
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)
Thread.sleep(100)
then:
noExceptionThrown()
subscribeResult != null
Expand All @@ -63,17 +61,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publishResult != null
publishResult.publish.qos == MqttQos.AT_MOST_ONCE
publishResult.publish.type == Mqtt5MessageType.PUBLISH
received.get() != null
received.get().qos == MqttQos.AT_MOST_ONCE
received.get().type == Mqtt5MessageType.PUBLISH
received.join() != null
received.join().qos == MqttQos.AT_MOST_ONCE
received.join().type == Mqtt5MessageType.PUBLISH
cleanup:
subscriber.disconnect().join()
publisher.disconnect().join()
}

def "publisher should publish message QoS 1 using mqtt 3.1.1"() {
def "should deliver publish message QoS 1 using mqtt 3.1.1"() {
given:
def received = new AtomicReference<Mqtt3Publish>()
def received = new CompletableFuture<Mqtt3Publish>()
def subscriber = buildExternalMqtt311Client()
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildExternalMqtt311Client()
Expand All @@ -82,7 +80,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publisher.connect().join()
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)
Thread.sleep(100)
then:
noExceptionThrown()
subscribeResult != null
Expand All @@ -91,17 +88,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publishResult != null
publishResult.qos == MqttQos.AT_LEAST_ONCE
publishResult.type == Mqtt3MessageType.PUBLISH
received.get() != null
received.get().qos == MqttQos.AT_LEAST_ONCE
received.get().type == Mqtt3MessageType.PUBLISH
received.join() != null
received.join().qos == MqttQos.AT_LEAST_ONCE
received.join().type == Mqtt3MessageType.PUBLISH
cleanup:
subscriber.disconnect().join()
publisher.disconnect().join()
}

def "publisher should publish message QoS 1 using mqtt 5"() {
def "should deliver publish message QoS 1 using mqtt 5"() {
given:
def received = new AtomicReference<Mqtt5Publish>()
def received = new CompletableFuture<Mqtt5Publish>()
def subscriber = buildExternalMqtt5Client()
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildExternalMqtt5Client()
Expand All @@ -110,7 +107,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publisher.connect().join()
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)
Thread.sleep(100)
then:
noExceptionThrown()
subscribeResult != null
Expand All @@ -119,17 +115,17 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publishResult != null
publishResult.publish.qos == MqttQos.AT_LEAST_ONCE
publishResult.publish.type == Mqtt5MessageType.PUBLISH
received.get() != null
received.get().qos == MqttQos.AT_LEAST_ONCE
received.get().type == Mqtt5MessageType.PUBLISH
received.join() != null
received.join().qos == MqttQos.AT_LEAST_ONCE
received.join().type == Mqtt5MessageType.PUBLISH
cleanup:
subscriber.disconnect().join()
publisher.disconnect().join()
}

def "publisher should publish message QoS 2 using mqtt 3.1.1"() {
def "should deliver publish message QoS 2 using mqtt 3.1.1"() {
given:
def received = new AtomicReference<Mqtt3Publish>()
def received = new CompletableFuture<Mqtt3Publish>()
def subscriber = buildExternalMqtt311Client()
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildExternalMqtt311Client()
Expand All @@ -138,7 +134,6 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publisher.connect().join()
def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)
Thread.sleep(100)
then:
noExceptionThrown()
subscribeResult != null
Expand All @@ -147,14 +142,14 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publishResult != null
publishResult.qos == MqttQos.EXACTLY_ONCE
publishResult.type == Mqtt3MessageType.PUBLISH
received.get() != null
received.get().qos == MqttQos.EXACTLY_ONCE
received.get().type == Mqtt3MessageType.PUBLISH
received.join() != null
received.join().qos == MqttQos.EXACTLY_ONCE
received.join().type == Mqtt3MessageType.PUBLISH
}

def "publisher should publish message QoS 2 using mqtt 5"() {
def "should deliver publish message QoS 2 using mqtt 5"() {
given:
def received = new AtomicReference<Mqtt5Publish>()
def received = new CompletableFuture<Mqtt5Publish>()
def subscriber = buildExternalMqtt5Client()
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildExternalMqtt5Client()
Expand All @@ -172,9 +167,9 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publishResult != null
publishResult.publish.qos == MqttQos.EXACTLY_ONCE
publishResult.publish.type == Mqtt5MessageType.PUBLISH
received.get() != null
received.get().qos == MqttQos.EXACTLY_ONCE
received.get().type == Mqtt5MessageType.PUBLISH
received.join() != null
received.join().qos == MqttQos.EXACTLY_ONCE
received.join().type == Mqtt5MessageType.PUBLISH
cleanup:
subscriber.disconnect().join()
publisher.disconnect().join()
Expand All @@ -194,11 +189,11 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
Mqtt5AsyncClient subscriber,
String subscriberId,
MqttQos qos,
AtomicReference<Mqtt5Publish> received) {
CompletableFuture<Mqtt5Publish> received) {
return subscriber.subscribeWith()
.topicFilter("test/$subscriberId")
.qos(qos)
.callback({ publish -> received.set(publish) })
.callback({ publish -> received.complete(publish) })
.send()
.join()
}
Expand All @@ -216,11 +211,11 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
Mqtt3AsyncClient subscriber,
String subscriberId,
MqttQos qos,
AtomicReference<Mqtt3Publish> received) {
CompletableFuture<Mqtt3Publish> received) {
return subscriber.subscribeWith()
.topicFilter("test/$subscriberId")
.qos(qos)
.callback({ publish -> received.set(publish) })
.callback({ publish -> received.complete(publish) })
.send()
.join()
}
Expand Down
Loading
Loading