Skip to content

Commit 63fa322

Browse files
author
Robert Winkler
committed
MQTT progress
1 parent 220fdcd commit 63fa322

File tree

5 files changed

+40
-17
lines changed

5 files changed

+40
-17
lines changed

kotlin-wot-binding-mqtt/src/main/kotlin/mqtt/MqttClientConfig.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package ai.ancf.lmos.wot.binding.mqtt
22

3+
import java.util.*
34

45
data class MqttClientConfig(val host: String,
56
val port: Int,
6-
val clientId: String,
7+
val clientId: String = UUID.randomUUID().toString(),
78
private val username: String? = null,
89
private val password: String? = null) {
910
}

kotlin-wot-binding-mqtt/src/main/kotlin/mqtt/MqttProtocolClientFactory.kt

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,25 @@ import ai.anfc.lmos.wot.binding.ProtocolClientFactory
44
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client
55

66
open class MqttProtocolClientFactory(private val mqttClientConfig: MqttClientConfig) : ProtocolClientFactory {
7+
8+
private val _client: MqttProtocolClient = MqttProtocolClient(
9+
Mqtt5Client.builder()
10+
.identifier(mqttClientConfig.clientId)
11+
.serverHost(mqttClientConfig.host)
12+
.serverPort(mqttClientConfig.port)
13+
.automaticReconnect()
14+
.applyAutomaticReconnect()
15+
.build()
16+
.toAsync()
17+
)
18+
719
override fun toString(): String {
820
return "MqttClient"
921
}
1022
override val scheme: String
1123
get() = "mqtt"
1224
override val client: MqttProtocolClient
13-
get() = MqttProtocolClient(Mqtt5Client.builder()
14-
.identifier(mqttClientConfig.clientId)
15-
.serverHost(mqttClientConfig.host)
16-
.serverPort(mqttClientConfig.port)
17-
.automaticReconnect().applyAutomaticReconnect()
18-
.build().toAsync())
25+
get() = _client
1926

2027
override suspend fun init() {
2128
client.start()

kotlin-wot-binding-mqtt/src/main/kotlin/mqtt/MqttProtocolServer.kt

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,18 +170,30 @@ class MqttProtocolServer(
170170
}
171171
}
172172

173-
private fun exposeTD(thing: ExposedThing) = runBlocking {
173+
private suspend fun exposeTD(thing: ExposedThing) {
174174
val topic = thing.id
175175
log.debug("Publishing Thing Description '{}' to topic '{}'", thing.id, topic)
176176

177177
try {
178-
val content = ContentManager.valueToContent(thing.toJson())
178+
val content = ContentManager.valueToContent(thing.toJson(), "application/json")
179+
val tdTopic = thing.id
180+
client.subscribeWith()
181+
.topicFilter(tdTopic)
182+
.qos(MqttQos.AT_LEAST_ONCE) // Use AT_LEAST_ONCE QoS level
183+
.callback { message ->
184+
CoroutineScope(Dispatchers.IO + exceptionHandler).launch {
185+
respondToTopic(content, message.responseTopic.get())
186+
}
187+
}
188+
.send().await() // Sending the subscription request
189+
/*
179190
val publishMessage = Mqtt5Publish.builder()
180191
.topic(topic)
181192
.payload(content.body)
182193
.retain(true)
183194
.build()
184195
client.publish(publishMessage).await()
196+
*/
185197
} catch (e: Exception) {
186198
log.warn("Unable to publish thing description to topic '{}': {}", topic, e.message)
187199
}

kotlin-wot-binding-mqtt/src/test/kotlin/integration/MqttProtocolServerTest.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,15 @@ class MqttProtocolServerTest {
6262

6363
brokerUrl = "mqtt://$host:${mappedPort}"
6464

65-
val config = MqttClientConfig(host, mappedPort, "testClient")
65+
val clientConfig = MqttClientConfig(host, mappedPort, "client")
66+
val serverConfig = MqttClientConfig(host, mappedPort, "server")
67+
68+
val clientFactory = MqttProtocolClientFactory(clientConfig)
69+
clientFactory.init()
70+
protocolClient = clientFactory.client
6671

67-
protocolClient = MqttProtocolClientFactory(config).client
68-
protocolClient.start()
6972
val servient = Servient()
70-
mqttServer = MqttProtocolServer(config)
73+
mqttServer = MqttProtocolServer(serverConfig)
7174
mqttServer.start(servient)
7275
exposedThing = exposedThing(servient, id = "test") {
7376
stringProperty(PROPERTY_NAME) {

kotlin-wot-integration-tests/src/test/kotlin/integration/WoTMqttIntegrationTest.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ class WoTMqttIntegrationTest() {
5757

5858
@Test
5959
fun `Should fetch thing`() = runTest {
60-
val config = MqttClientConfig(hiveMqContainer.host,
61-
hiveMqContainer.getMappedPort(1883), "testClient")
60+
val clientConfig = MqttClientConfig(hiveMqContainer.host, hiveMqContainer.getMappedPort(1883), "client")
61+
val serverConfig = MqttClientConfig(hiveMqContainer.host, hiveMqContainer.getMappedPort(1883), "server")
6262

6363
val servient = Servient(
64-
servers = listOf(MqttProtocolServer(config)),
65-
clientFactories = listOf(MqttProtocolClientFactory(config))
64+
servers = listOf(MqttProtocolServer(serverConfig)),
65+
clientFactories = listOf(MqttProtocolClientFactory(clientConfig))
6666
)
6767
val wot = Wot.create(servient)
6868

0 commit comments

Comments
 (0)