1+ package ai.ancf.lmos.wot.binding.mqtt
2+
3+ import ai.ancf.lmos.wot.content.Content
4+ import ai.ancf.lmos.wot.content.ContentManager
5+ import ai.ancf.lmos.wot.thing.form.Form
6+ import ai.anfc.lmos.wot.binding.ProtocolClient
7+ import ai.anfc.lmos.wot.binding.ProtocolClientException
8+ import com.hivemq.client.mqtt.MqttGlobalPublishFilter
9+ import com.hivemq.client.mqtt.datatypes.MqttQos
10+ import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
11+ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
12+ import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe
13+ import kotlinx.coroutines.flow.Flow
14+ import kotlinx.coroutines.flow.channelFlow
15+ import kotlinx.coroutines.flow.onCompletion
16+ import kotlinx.coroutines.future.await
17+ import kotlinx.coroutines.suspendCancellableCoroutine
18+ import org.slf4j.LoggerFactory
19+ import java.net.URI
20+ import java.net.URISyntaxException
21+ import java.net.URL
22+ import java.util.*
23+ import kotlin.coroutines.resume
24+ import kotlin.coroutines.resumeWithException
25+
26+
27+ class MqttProtocolClient (
28+ private val client : Mqtt5AsyncClient ,
29+ secure : Boolean = false
30+ ) : ProtocolClient {
31+
32+ private val log = LoggerFactory .getLogger(MqttProtocolClient ::class .java)
33+ private val scheme = " mqtt" + if (secure) " s" else " "
34+
35+ override suspend fun invokeResource (form : Form , content : Content ? ): Content {
36+ val topic = try {
37+ URI (form.href).path.substring(1 )
38+ } catch (e: URISyntaxException ) {
39+ throw ProtocolClientException (" Unable to extract topic from href '${form.href} '" , e)
40+ }
41+ return requestReply(content, topic)
42+ }
43+
44+ override suspend fun subscribeResource (form : Form ): Flow <Content > {
45+ val topic = try {
46+ URI (form.href).path.substring(1 )
47+ } catch (e: URISyntaxException ) {
48+ throw ProtocolClientException (" Unable to subscribe resource: ${e.message} " )
49+ }
50+
51+ return topicObserver(form, topic)
52+ }
53+
54+ override suspend fun start () {
55+ client.connect().await()
56+ }
57+
58+ override suspend fun stop () {
59+ client.disconnect().await()
60+ }
61+
62+ // Function to observe a topic using HiveMQ Mqtt5AsyncClient
63+ private fun topicObserver (form : Form , topic : String ): Flow <Content > = channelFlow {
64+ log.debug(" MqttClient connected to broker at '{}:{}' subscribing to topic '{}'" , client.config.serverHost, client.config.serverPort, topic)
65+
66+ try {
67+ client.subscribeWith()
68+ .topicFilter(topic)
69+ .qos(MqttQos .AT_LEAST_ONCE ) // QoS level 1
70+ .send()
71+ .await() // Suspending function for subscription completion
72+
73+ client.publishes(MqttGlobalPublishFilter .SUBSCRIBED ) { message ->
74+ log.debug(" Received message from topic '{}'" , topic)
75+
76+ val content = Content (form.contentType, message.payloadAsBytes) // Convert payload to Content
77+ trySend(content)
78+ }
79+ } catch (e: Exception ) {
80+ log.warn(" Error subscribing to topic '$topic ': ${e.message} " )
81+ close(e) // Close flow on error
82+ }
83+ }.onCompletion {
84+ val client = client
85+ log.debug(" No more observers for broker '{}' and topic '{}', unsubscribing." , client.config.serverHost, topic)
86+
87+ try {
88+ client.unsubscribeWith()
89+ .topicFilter(topic)
90+ .send()
91+ .await() // Await unsubscribe completion
92+ } catch (e: Exception ) {
93+ log.warn(" Error unsubscribing from topic '$topic ': ${e.message} " )
94+ }
95+ }
96+
97+ // Function to publish content to a topic and return a response
98+ private suspend fun requestReply (content : Content ? , topic : String ): Content {
99+ // Generate a unique response topic for this request
100+ val responseTopic = " ${topic} /reply/${UUID .randomUUID()} "
101+
102+ try {
103+ log.debug(
104+ " Publishing to topic '{}' on broker '{}' with response expected on '{}'" ,
105+ topic,
106+ client.config.serverHost,
107+ responseTopic
108+ )
109+
110+ val payload = content?.body
111+
112+ // Requester subscribes to response topic
113+
114+ // First, subscribe to the response topic to receive the reply
115+ client.subscribeWith()
116+ .topicFilter(responseTopic)
117+ .qos(MqttQos .AT_LEAST_ONCE ) // QoS level 1 for reliability
118+ .send()
119+ .await() // Await subscription completion
120+
121+ // Prepare and send the publish message with a response topic
122+ val publishMessage = Mqtt5Publish .builder()
123+ .topic(topic)
124+ .payload(payload)
125+ .qos(MqttQos .AT_LEAST_ONCE )
126+ .responseTopic(responseTopic) // Set the response topic
127+ .build()
128+
129+ // Publish the message and await reply on the response topic
130+ return suspendCancellableCoroutine { continuation ->
131+ client.publishes(MqttGlobalPublishFilter .SUBSCRIBED ) { message ->
132+ if (message.topic.toString() == responseTopic) {
133+ log.debug(" Received reply from '{}'" , responseTopic)
134+
135+ // Convert the received message payload into Content and resume coroutine
136+ val replyContent = content?.type?.let { Content (it, message.payloadAsBytes) } ? : Content .EMPTY_CONTENT
137+ continuation.resume(replyContent)
138+
139+ // Unsubscribe from the response topic after receiving the response
140+ client.unsubscribeWith()
141+ .topicFilter(responseTopic)
142+ .send()
143+ }
144+ }
145+
146+ // Publish the request message and await
147+ client.publish(publishMessage)
148+ .thenAccept {
149+ log.debug(" Request message published to topic '$topic '" )
150+ }.exceptionally { e ->
151+ log.warn(" Failed to publish message to topic '$topic ': ${e.message} " )
152+ continuation.resumeWithException(e)
153+ null
154+ }
155+ }
156+ } catch (e: Exception ) {
157+ throw ProtocolClientException (" Failed to execute request/reply on topic '$topic ' with broker '${client.config.serverHost} ': ${e.message} " , e)
158+ }
159+ }
160+
161+ override suspend fun readResource (form : Form ): Content = suspendCancellableCoroutine { continuation ->
162+ val contentType = form.contentType ? : ContentManager .DEFAULT
163+ val requestUri = URL (form.href)
164+
165+ // Extract the topic from the path, removing any leading "/"
166+ val filter = requestUri.path.removePrefix(" /" )
167+
168+ try {
169+
170+ Mqtt5Subscribe .builder().topicFilter(filter).build()
171+
172+ // Subscribing to the topic
173+ val subscription = client.subscribe(Mqtt5Subscribe .builder().topicFilter(filter).build())
174+ { message ->
175+ val content = Content (contentType, message.payloadAsBytes)
176+ continuation.resume(content) // Resume the coroutine with the content
177+
178+ // Unsubscribe after receiving the first message
179+ client.unsubscribeWith().topicFilter(filter)
180+ }
181+
182+ // Ensure the subscription is canceled if the coroutine is canceled
183+ continuation.invokeOnCancellation {
184+ client.unsubscribeWith().topicFilter(filter)
185+ }
186+
187+ } catch (e: Exception ) {
188+ // Handle any exception during subscription
189+ continuation.resumeWithException(e)
190+ }
191+ }
192+
193+ override suspend fun writeResource (form : Form , content : Content ) {
194+ val requestUri = URL (form.href)
195+ val topic = requestUri.path.removePrefix(" /" )
196+
197+ // Publishing message with optional retain and QoS settings
198+ val payload = content.body
199+
200+ val publishMessage = Mqtt5Publish .builder()
201+ .topic(topic)
202+ .payload(payload)
203+ .qos(MqttQos .AT_LEAST_ONCE )
204+ .build()
205+
206+ client.publish(publishMessage).await()
207+ }
208+ }
0 commit comments