Skip to content

Commit 64e651a

Browse files
author
Robert Winkler
committed
MQTT progress
1 parent 7e0141a commit 64e651a

File tree

4 files changed

+125
-61
lines changed

4 files changed

+125
-61
lines changed

kotlin-wot-binding-mqtt/src/main/kotlin/mqtt/MqttProtocolClient.kt

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import com.hivemq.client.mqtt.datatypes.MqttQos
1010
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
1111
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
1212
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe
13+
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe
1314
import kotlinx.coroutines.flow.Flow
1415
import kotlinx.coroutines.flow.channelFlow
1516
import kotlinx.coroutines.flow.onCompletion
@@ -103,21 +104,12 @@ class MqttProtocolClient(
103104
log.debug(
104105
"Publishing to topic '{}' on broker '{}' with response expected on '{}'",
105106
topic,
106-
client.config.serverHost,
107+
"${client.config.serverHost}:${client.config.serverPort}",
107108
responseTopic
108109
)
109110

110111
val payload = content?.body
111112

112-
//Requester subscribes to response topic
113-
114-
// First, subscribe to the response topic to receive the reply
115-
client.subscribeWith()
116-
.topicFilter(responseTopic)
117-
.qos(MqttQos.AT_LEAST_ONCE) // QoS level 1 for reliability
118-
.send()
119-
.await() // Await subscription completion
120-
121113
// Prepare and send the publish message with a response topic
122114
val publishMessage = Mqtt5Publish.builder()
123115
.topic(topic)
@@ -127,28 +119,41 @@ class MqttProtocolClient(
127119
.build()
128120

129121
// Publish the message and await reply on the response topic
130-
return suspendCancellableCoroutine { continuation ->
131-
client.publishes(MqttGlobalPublishFilter.SUBSCRIBED) { message ->
132-
if (message.topic.toString() == responseTopic) {
133-
log.debug("Received reply from '{}'", responseTopic)
134-
135-
// Convert the received message payload into Content and resume coroutine
136-
val replyContent = content?.type?.let { Content(it, message.payloadAsBytes) } ?: Content.EMPTY_CONTENT
122+
return suspendCancellableCoroutine { continuation ->
123+
124+
client.subscribeWith()
125+
.topicFilter(responseTopic)
126+
.qos(MqttQos.AT_LEAST_ONCE) // QoS level 1 for reliability
127+
.callback {
128+
response ->
129+
log.debug("Response message consumed from topic '$responseTopic'")
130+
val replyContent = content?.type?.let { Content(it, response.payloadAsBytes) } ?: Content.EMPTY_CONTENT
137131
continuation.resume(replyContent)
138132

139133
// Unsubscribe from the response topic after receiving the response
140134
client.unsubscribeWith()
141135
.topicFilter(responseTopic)
142136
.send()
143137
}
138+
.send().thenAccept {
139+
log.debug("Subscribed to topic '$responseTopic'")
140+
}.exceptionally { e ->
141+
log.warn("Failed to subscribe to topic '$responseTopic': ${e.message}", e)
142+
continuation.resumeWithException(e)
143+
null
144+
}
145+
146+
// Ensure the subscription is canceled if the coroutine is canceled
147+
continuation.invokeOnCancellation {
148+
client.unsubscribe(Mqtt5Unsubscribe.builder().topicFilter(responseTopic).build())
144149
}
145150

146151
// Publish the request message and await
147152
client.publish(publishMessage)
148153
.thenAccept {
149154
log.debug("Request message published to topic '$topic'")
150155
}.exceptionally { e ->
151-
log.warn("Failed to publish message to topic '$topic': ${e.message}")
156+
log.warn("Failed to publish message to topic '$topic': ${e.message}", e)
152157
continuation.resumeWithException(e)
153158
null
154159
}

kotlin-wot-binding-mqtt/src/test/kotlin/integration/MqttProtocolClientTest.kt

Lines changed: 80 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,81 +4,121 @@ import ai.ancf.lmos.wot.content.Content
44
import ai.ancf.lmos.wot.thing.form.Form
55
import app.cash.turbine.test
66
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
7+
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client
78
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
8-
import io.mockk.every
9-
import io.mockk.mockk
10-
import io.mockk.verify
11-
import kotlinx.coroutines.flow.Flow
129
import kotlinx.coroutines.flow.flowOf
10+
import kotlinx.coroutines.future.await
11+
import kotlinx.coroutines.launch
1312
import kotlinx.coroutines.test.runTest
14-
import kotlin.test.BeforeTest
13+
import org.junit.jupiter.api.AfterAll
14+
import org.junit.jupiter.api.BeforeAll
15+
import org.junit.jupiter.api.BeforeEach
16+
import org.testcontainers.containers.GenericContainer
17+
import org.testcontainers.utility.DockerImageName
1518
import kotlin.test.Test
1619
import kotlin.test.assertEquals
1720

1821
class MqttProtocolClientTest {
1922

20-
private lateinit var mqttClientConfig: MqttClientConfig
21-
private lateinit var mqttClient: Mqtt5AsyncClient
22-
private lateinit var topicSubjects: MutableMap<String, Flow<Content>>
23+
companion object {
24+
private lateinit var hiveMqContainer: GenericContainer<*>
25+
private lateinit var mqttClient: Mqtt5AsyncClient
26+
private lateinit var brokerUrl: String
27+
28+
@BeforeAll
29+
@JvmStatic
30+
fun setUpContainer() {
31+
hiveMqContainer = GenericContainer(DockerImageName.parse("hivemq/hivemq-ce:latest"))
32+
.withExposedPorts(1883)
33+
hiveMqContainer.start()
34+
35+
brokerUrl = "mqtt://${hiveMqContainer.host}:${hiveMqContainer.getMappedPort(1883)}"
36+
mqttClient = Mqtt5Client.builder()
37+
.serverHost(hiveMqContainer.host)
38+
.serverPort(hiveMqContainer.getMappedPort(1883))
39+
.buildAsync()
40+
}
41+
42+
@AfterAll
43+
@JvmStatic
44+
fun tearDownContainer() {
45+
hiveMqContainer.stop()
46+
}
47+
}
48+
2349
private lateinit var client: MqttProtocolClient
2450
private lateinit var form: Form
25-
private lateinit var content: Content
26-
27-
@BeforeTest
28-
fun setUp() {
29-
mqttClientConfig = mockk()
30-
mqttClient = mockk(relaxed = true)
31-
topicSubjects = mutableMapOf()
32-
form = mockk()
33-
content = mockk()
3451

35-
every { mqttClientConfig.broker } returns "mqtt://test.mosquitto.org"
52+
@BeforeEach
53+
fun setUp() = runTest {
54+
client = MqttProtocolClient(mqttClient, false)
55+
client.start()
3656
}
3757

3858
@Test
39-
fun `invokeResource should publish null to broker`() = runTest {
40-
every { form.href } returns "mqtt://test.mosquitto.org/counter/actions/increment"
41-
client = MqttProtocolClient(Pair(mqttClientConfig, mqttClient), topicSubjects)
59+
fun `invokeResource should publish null message to broker`() = runTest {
60+
form = Form("$brokerUrl/thingId/actions/actionName", "application/json")
4261

4362
client.invokeResource(form)
4463

45-
verify { mqttClient.publish(any<Mqtt5Publish>()) }
64+
// Verify that the message was published
4665
}
4766

4867
@Test
4968
fun `invokeResource with content should publish given content to broker`() = runTest {
50-
every { form.href } returns "mqtt://test.mosquitto.org/counter/actions/increment"
51-
every { content.body } returns "Hello World".toByteArray()
52-
53-
client = MqttProtocolClient(Pair(mqttClientConfig, mqttClient), topicSubjects)
54-
client.invokeResource(form, content)
69+
form = Form("$brokerUrl/thingId/actions/actionName", "application/json")
70+
val testMessage = "\"Hello World\""
71+
val expectedPayload = testMessage.toByteArray()
72+
val responseMessage = "\"Acknowledged\""
73+
val responsePayload = responseMessage.toByteArray()
74+
75+
// Subscribe to the topic before publishing
76+
// Subscribe to the request topic and publish a response upon receiving the request
77+
launch {
78+
mqttClient.subscribeWith()
79+
.topicFilter("thingId/actions/actionName")
80+
.callback { publish ->
81+
println("Received message on topic: ${publish.topic}")
82+
val receivedPayload = publish.payloadAsBytes
83+
assertEquals(
84+
expectedPayload.contentToString(),
85+
receivedPayload.contentToString(),
86+
"Received payload on request topic did not match expected request payload"
87+
)
88+
// Publish a response message on the response topic
89+
mqttClient.publishWith()
90+
.topic(publish.responseTopic.get())
91+
.payload(responsePayload)
92+
.send()
93+
}
94+
.send().await()
95+
}
5596

56-
verify { mqttClient.publish(any<Mqtt5Publish>()) }
97+
// Publish the message using invokeResource
98+
val response = client.invokeResource(form, Content("application/json", expectedPayload))
99+
assertEquals("application/json", response.type)
100+
assertEquals("\"Acknowledged\"", response.body.decodeToString())
57101
}
58102

59103
@Test
60104
fun `subscribeResource should subscribe to broker and emit content via Flow`() = runTest {
61-
every { form.href } returns "mqtt://test.mosquitto.org/counter/events/change"
62-
client = MqttProtocolClient(Pair(mqttClientConfig, mqttClient), mutableMapOf())
63-
64-
65105
client.subscribeResource(form).test {
106+
mqttClient.publish(Mqtt5Publish.builder()
107+
.topic("counter/events/change")
108+
.payload("Hello World".toByteArray())
109+
.build()).await()
110+
66111
val item = awaitItem()
67112
assertEquals("Hello World", item.body.decodeToString())
68113
awaitComplete()
69114
}
70-
71-
verify { mqttClient.subscribeWith().topicFilter("counter/events/change").qos(any()).send() }
72115
}
73116

74117
@Test
75118
fun `subscribeResource should reuse existing broker subscriptions`() = runTest {
76-
every { form.href } returns "mqtt://test.mosquitto.org/counter/events/change"
77-
78119
val existingFlow = flowOf(Content("application/json", "Existing Data".toByteArray()))
79-
topicSubjects["counter/events/change"] = existingFlow
80-
81-
client = MqttProtocolClient(Pair(mqttClientConfig, mqttClient), topicSubjects)
120+
val topicSubjects = mutableMapOf("counter/events/change" to existingFlow)
121+
client = MqttProtocolClient(mqttClient, false)
82122

83123
client.subscribeResource(form).test {
84124
val item = awaitItem()
@@ -89,13 +129,10 @@ class MqttProtocolClientTest {
89129

90130
@Test
91131
fun `subscribeResource should unsubscribe from broker when no more subscriptions`() = runTest {
92-
every { form.href } returns "mqtt://test.mosquitto.org/counter/events/change"
93-
client = MqttProtocolClient(Pair(mqttClientConfig, mqttClient), mutableMapOf())
94-
95132
client.subscribeResource(form).test {
96133
cancelAndIgnoreRemainingEvents()
97134
}
98135

99-
verify { mqttClient.unsubscribeWith().topicFilter("counter/events/change").send() }
136+
// Verify that the client unsubscribed from the topic
100137
}
101138
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<configuration>
2+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
3+
<encoder>
4+
<pattern>%d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
5+
</encoder>
6+
</appender>
7+
<root level="debug">
8+
<appender-ref ref="STDOUT"/>
9+
</root>
10+
<logger name="io.netty" level="INFO"/>
11+
</configuration>
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<configuration>
2+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
3+
<encoder>
4+
<pattern>%d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
5+
</encoder>
6+
</appender>
7+
<root level="debug">
8+
<appender-ref ref="STDOUT"/>
9+
</root>
10+
<logger name="io.netty" level="INFO"/>
11+
</configuration>

0 commit comments

Comments
 (0)