Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions application/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
implementation libs.springboot.starter.log4j2

testImplementation projects.testSupport
testImplementation testFixtures(projects.network)
}

tasks.withType(GroovyCompile).configureEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.PublishDeliveringService;
import javasabr.mqtt.service.PublishReceivingService;
import javasabr.mqtt.service.SessionService;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.handler.client.ExternalMqttClientReleaseHandler;
import javasabr.mqtt.service.impl.DefaultConnectionService;
Expand All @@ -27,7 +26,6 @@
import javasabr.mqtt.service.impl.ExternalMqttClientFactory;
import javasabr.mqtt.service.impl.FileCredentialsSource;
import javasabr.mqtt.service.impl.InMemoryClientIdRegistry;
import javasabr.mqtt.service.impl.InMemorySessionService;
import javasabr.mqtt.service.impl.SimpleAuthenticationService;
import javasabr.mqtt.service.impl.SimpleSubscriptionService;
import javasabr.mqtt.service.message.handler.MqttInMessageHandler;
Expand All @@ -51,6 +49,8 @@
import javasabr.mqtt.service.publish.handler.impl.Qos1MqttPublishOutMessageHandler;
import javasabr.mqtt.service.publish.handler.impl.Qos2MqttPublishInMessageHandler;
import javasabr.mqtt.service.publish.handler.impl.Qos2MqttPublishOutMessageHandler;
import javasabr.mqtt.service.session.MqttSessionService;
import javasabr.mqtt.service.session.impl.InMemoryMqttSessionService;
import javasabr.rlib.network.NetworkFactory;
import javasabr.rlib.network.ServerNetworkConfig;
import javasabr.rlib.network.server.ServerNetwork;
Expand All @@ -76,9 +76,9 @@ ClientIdRegistry clientIdRegistry(Environment env) {
}

@Bean
SessionService mqttSessionService(
MqttSessionService mqttSessionService(
@Value("${sessions.clean.thread.interval:60000}") int cleanInterval) {
return new InMemorySessionService(cleanInterval);
return new InMemoryMqttSessionService(cleanInterval);
}

@Bean
Expand Down Expand Up @@ -119,7 +119,7 @@ MessageOutFactoryService mqttMessageOutFactoryService(
MqttInMessageHandler connectInMqttInMessageHandler(
ClientIdRegistry clientIdRegistry,
AuthenticationService authenticationService,
SessionService sessionService,
MqttSessionService sessionService,
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new ConnectInMqttInMessageHandler(
Expand Down Expand Up @@ -244,7 +244,7 @@ PublishReceivingService publishReceivingService(
@Bean
MqttClientReleaseHandler externalMqttClientReleaseHandler(
ClientIdRegistry clientIdRegistry,
SessionService sessionService,
MqttSessionService sessionService,
SubscriptionService subscriptionService) {
return new ExternalMqttClientReleaseHandler(clientIdRegistry, sessionService, subscriptionService);
}
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package javasabr.mqtt.application.integration
package javasabr.mqtt.broker.application

import com.hivemq.client.mqtt.datatypes.MqttQos
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package javasabr.mqtt.application.integration
package javasabr.mqtt.broker.application

import com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3ConnAckException
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAckReturnCode
Expand All @@ -7,8 +7,8 @@ import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCo
import javasabr.mqtt.model.MqttProperties
import javasabr.mqtt.model.QoS
import javasabr.mqtt.model.reason.code.ConnectAckReasonCode
import javasabr.mqtt.network.packet.in.ConnectAckInPacket
import javasabr.mqtt.network.packet.out.Connect311OutPacket
import javasabr.mqtt.network.message.in.ConnectAckMqttInMessage
import javasabr.mqtt.network.message.out.ConnectMqtt311OutMessage
import javasabr.rlib.common.util.ArrayUtils
import spock.lang.Ignore

Expand Down Expand Up @@ -148,7 +148,7 @@ class ExternalConnectionTest extends IntegrationSpecification {
when:

client.connect()
client.send(new Connect311OutPacket(
client.send(new ConnectMqtt311OutMessage(
"",
"",
clientId,
Expand All @@ -160,7 +160,7 @@ class ExternalConnectionTest extends IntegrationSpecification {
false
))

def connectAck = client.readNext() as ConnectAckInPacket
def connectAck = client.readNext() as ConnectAckMqttInMessage

then:
connectAck.reasonCode == ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package javasabr.mqtt.application.integration
package javasabr.mqtt.broker.application

import com.hivemq.client.mqtt.MqttClient
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
import javasabr.mqtt.application.integration.config.MqttBrokerTestConfig
import javasabr.mqtt.application.mock.MqttMockClient
import javasabr.mqtt.broker.application.config.MqttBrokerTestConfig
import javasabr.mqtt.model.MqttClientConnectionConfig
import javasabr.mqtt.model.MqttProperties
import javasabr.mqtt.model.MqttServerConnectionConfig
import javasabr.mqtt.model.MqttVersion
import javasabr.mqtt.network.MqttConnection
import javasabr.mqtt.network.MqttMockClient
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
import spock.lang.Specification

import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

import static javasabr.mqtt.network.MqttClient.UnsafeMqttClient

Expand All @@ -32,28 +34,17 @@ class IntegrationSpecification extends Specification {
@Autowired
InetSocketAddress externalNetworkAddress

@Autowired
InetSocketAddress internalNetworkAddress

@Autowired
MqttServerConnectionConfig externalConnectionConfig

def buildExternalMqtt311Client() {
return buildMqtt311Client(generateClientId(), externalNetworkAddress)
}

def buildInternalMqtt311Client() {
return buildMqtt311Client(generateClientId(), internalNetworkAddress)
}

def buildExternalMqtt5Client() {
return buildMqtt5Client(generateClientId(), externalNetworkAddress)
}

def buildInternalMqtt5Client() {
return buildMqtt5Client(generateClientId(), internalNetworkAddress)
}

def buildExternalMqtt311Client(String clientId) {
return MqttClient.builder()
.identifier(clientId)
Expand All @@ -71,7 +62,7 @@ class IntegrationSpecification extends Specification {
.serverPort(address.getPort())
.useMqttVersion3()
.addDisconnectedListener {
println "[${clientId}|mqtt311] disconnected:[$it.cause]"
println "[${clientId}|mqtt311] disconnected:[${it.cause.message}]"
}
.build()
.toAsync()
Expand All @@ -94,7 +85,7 @@ class IntegrationSpecification extends Specification {
.serverPort(address.getPort())
.useMqttVersion5()
.addDisconnectedListener {
println "[${clientId}|mqtt5] disconnected:[$it.cause]"
println "[${clientId}|mqtt5] disconnected:[${it.cause.message}]"
}
.build()
.toAsync()
Expand Down Expand Up @@ -144,38 +135,65 @@ class IntegrationSpecification extends Specification {
)
}

def mqtt5MockedConnection(MqttServerConnectionConfig deviceConnectionConfig) {

return Stub(MqttConnection) {
def mqtt5MockedConnection(MqttServerConnectionConfig serverConnConfig) {
MqttClientConnectionConfig clientConnConfig = new MqttClientConnectionConfig(
serverConnConfig.maxQos(),
MqttVersion.MQTT_5,
MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED,
serverConnConfig.receiveMaxPublishes(),
serverConnConfig.maxPacketSize(),
serverConnConfig.topicAliasMaxValue(),
MqttProperties.SERVER_KEEP_ALIVE_DEFAULT,
false,
false,
serverConnConfig.sessionsEnabled(),
serverConnConfig.retainAvailable(),
serverConnConfig.wildcardSubscriptionAvailable(),
serverConnConfig.subscriptionIdAvailable(),
serverConnConfig.sharedSubscriptionAvailable())
def connectionRef = new AtomicReference<MqttConnection>()
def connection = Stub(MqttConnection) {
isSupported(MqttVersion.MQTT_5) >> true
isSupported(MqttVersion.MQTT_3_1_1) >> true
serverConnectionConfig() >> deviceConnectionConfig
serverConnectionConfig() >> serverConnConfig
client() >> Stub(UnsafeMqttClient) {
connectionConfig() >> deviceConnectionConfig
sessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
receiveMaxPublishes() >> deviceConnectionConfig.receiveMaxPublishes()
maxPacketSize() >> deviceConnectionConfig.maxPacketSize()
clientId() >> IntegrationSpecification.clientId
keepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
topicAliasMaxValue() >> deviceConnectionConfig.topicAliasMaxValue()
connectionConfig() >> clientConnConfig
connection() >> connectionRef.get()
clientId() >> clientId
}
}
}

def mqtt311MockedConnection(MqttServerConnectionConfig deviceConnectionConfig) {
return Stub(MqttConnection) {
connectionRef.set(connection)
return connection
}

def mqtt311MockedConnection(MqttServerConnectionConfig serverConnConfig) {
MqttClientConnectionConfig clientConnConfig = new MqttClientConnectionConfig(
serverConnConfig.maxQos(),
MqttVersion.MQTT_3_1_1,
MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED,
serverConnConfig.receiveMaxPublishes(),
serverConnConfig.maxPacketSize(),
serverConnConfig.topicAliasMaxValue(),
MqttProperties.SERVER_KEEP_ALIVE_DEFAULT,
false,
false,
serverConnConfig.sessionsEnabled(),
serverConnConfig.retainAvailable(),
serverConnConfig.wildcardSubscriptionAvailable(),
serverConnConfig.subscriptionIdAvailable(),
serverConnConfig.sharedSubscriptionAvailable())
def connectionRef = new AtomicReference<MqttConnection>()
def connection = Stub(MqttConnection) {
isSupported(MqttVersion.MQTT_5) >> false
isSupported(MqttVersion.MQTT_3_1_1) >> true
serverConnectionConfig() >> deviceConnectionConfig
serverConnectionConfig() >> serverConnConfig
client() >> Stub(UnsafeMqttClient) {
connectionConfig() >> deviceConnectionConfig
sessionExpiryInterval() >> MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
receiveMaxPublishes() >> deviceConnectionConfig.receiveMaxPublishes()
maxPacketSize() >> deviceConnectionConfig.maxPacketSize()
clientId() >> IntegrationSpecification.clientId
keepAlive() >> MqttProperties.SERVER_KEEP_ALIVE_DEFAULT
topicAliasMaxValue() >> deviceConnectionConfig.topicAliasMaxValue()
connectionConfig() >> clientConnConfig
connection() >> connectionRef.get()
clientId() >> clientId
}
}
connectionRef.set(connection)
return connection
}
}
Loading
Loading