11package ai.ancf.lmos.wot.binding.mqtt
22
33import ai.ancf.lmos.wot.content.Content
4- import ai.ancf.lmos.wot.content.ContentManager
54import ai.ancf.lmos.wot.thing.form.Form
65import ai.anfc.lmos.wot.binding.ProtocolClient
76import ai.anfc.lmos.wot.binding.ProtocolClientException
8- import com.hivemq.client.mqtt.MqttGlobalPublishFilter
97import com.hivemq.client.mqtt.datatypes.MqttQos
108import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
119import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
12- import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe
1310import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe
11+ import kotlinx.coroutines.channels.Channel
1412import kotlinx.coroutines.flow.Flow
15- import kotlinx.coroutines.flow.channelFlow
16- import kotlinx.coroutines.flow.onCompletion
13+ import kotlinx.coroutines.flow.consumeAsFlow
1714import kotlinx.coroutines.future.await
1815import kotlinx.coroutines.suspendCancellableCoroutine
1916import org.slf4j.LoggerFactory
2017import java.net.URI
2118import java.net.URISyntaxException
22- import java.net.URL
2319import java.util.*
20+ import java.util.concurrent.ConcurrentHashMap
2421import kotlin.coroutines.resume
2522import kotlin.coroutines.resumeWithException
2623
@@ -33,23 +30,35 @@ class MqttProtocolClient(
3330 private val log = LoggerFactory .getLogger(MqttProtocolClient ::class .java)
3431 private val scheme = " mqtt" + if (secure) " s" else " "
3532
33+ private val topicChannels = ConcurrentHashMap <String , Channel <Content >>()
34+
3635 override suspend fun invokeResource (form : Form , content : Content ? ): Content {
3736 val topic = try {
3837 URI (form.href).path.substring(1 )
3938 } catch (e: URISyntaxException ) {
4039 throw ProtocolClientException (" Unable to extract topic from href '${form.href} '" , e)
4140 }
42- return requestReply(content, topic)
41+ return requestReply(form, content, topic)
4342 }
4443
4544 override suspend fun subscribeResource (form : Form ): Flow <Content > {
4645 val topic = try {
4746 URI (form.href).path.substring(1 )
48- } catch (e: URISyntaxException ) {
47+ }
48+ catch (e: URISyntaxException ) {
4949 throw ProtocolClientException (" Unable to subscribe resource: ${e.message} " )
5050 }
51+ return subscribeToTopic(form, topic)
52+ }
5153
52- return topicObserver(form, topic)
54+ override suspend fun unlinkResource (form : Form ) {
55+ val topic = try {
56+ URI (form.href).path.substring(1 )
57+ }
58+ catch (e: URISyntaxException ) {
59+ throw ProtocolClientException (" Unable to unlink resource: ${e.message} " )
60+ }
61+ return unsubscribeFromTopic(topic)
5362 }
5463
5564 override suspend fun start () {
@@ -60,31 +69,65 @@ class MqttProtocolClient(
6069 client.disconnect().await()
6170 }
6271
72+ // Function to unsubscribe from a topic and close the associated channel
73+ private suspend fun unsubscribeFromTopic (topic : String ) {
74+ // Check if the topic has an associated channel in the ConcurrentHashMap
75+ val channel = topicChannels.remove(topic)
76+
77+ if (channel != null ) {
78+ try {
79+ // Unsubscribe from the topic
80+ client.unsubscribeWith()
81+ .topicFilter(topic)
82+ .send()
83+ .await()
84+ log.debug(" Unsubscribed from topic '{}'" , topic)
85+ } catch (e: Exception ) {
86+ log.warn(" Error unsubscribing from topic '$topic ': ${e.message} " )
87+ }
88+
89+ // Close the channel
90+ channel.close()
91+ log.debug(" Closed channel for topic '{}'" , topic)
92+ } else {
93+ log.warn(" No active channel found for topic '{}'" , topic)
94+ }
95+ }
96+
6397 // Function to observe a topic using HiveMQ Mqtt5AsyncClient
64- private fun topicObserver (form : Form , topic : String ): Flow <Content > = channelFlow {
98+ private suspend fun subscribeToTopic (form : Form , topic : String ): Flow <Content > {
6599 log.debug(" MqttClient connected to broker at '{}:{}' subscribing to topic '{}'" , client.config.serverHost, client.config.serverPort, topic)
66100
101+ // Create a channel for the topic
102+ val channel = Channel <Content >()
103+ // Put the channel in the ConcurrentHashMap
104+ topicChannels[topic] = channel
105+
67106 try {
68107 client.subscribeWith()
69108 .topicFilter(topic)
70- .qos(MqttQos .AT_LEAST_ONCE ) // QoS level 1
109+ .callback() { message ->
110+ log.debug(" Received message from topic '{}'" , topic)
111+ val content = Content (form.contentType, message.payloadAsBytes) // Convert payload to Content
112+ val channelResult = channel.trySend(content)
113+ log.debug(" Send message to channel flow" )
114+ }
71115 .send()
72- .await() // Suspending function for subscription completion
116+ .await()
73117
74- client.publishes(MqttGlobalPublishFilter .SUBSCRIBED ) { message ->
75- log.debug(" Received message from topic '{}'" , topic)
76-
77- val content = Content (form.contentType, message.payloadAsBytes) // Convert payload to Content
78- trySend(content)
79- }
118+ log.debug(" Subscribed to topic '{}'" , topic)
80119 } catch (e: Exception ) {
81120 log.warn(" Error subscribing to topic '$topic ': ${e.message} " )
82- close(e) // Close flow on error
121+ channel.close(e)
122+ // close(e) // Close flow on error
83123 }
84- }.onCompletion {
85- val client = client
86- log.debug(" No more observers for broker '{}' and topic '{}', unsubscribing." , client.config.serverHost, topic)
124+ return channel.consumeAsFlow()
125+ }
87126
127+ /*
128+ .onCompletion {
129+ val client = client
130+ log.debug("No flow collectors anymore, unsubscribing.", client.config.serverHost, topic)
88131 try {
89132 client.unsubscribeWith()
90133 .topicFilter(topic)
@@ -94,9 +137,11 @@ class MqttProtocolClient(
94137 log.warn("Error unsubscribing from topic '$topic': ${e.message}")
95138 }
96139 }
140+ */
141+
97142
98143 // Function to publish content to a topic and return a response
99- private suspend fun requestReply (content : Content ? , topic : String ): Content {
144+ private suspend fun requestReply (form : Form , content : Content ? , topic : String ): Content {
100145 // Generate a unique response topic for this request
101146 val responseTopic = " ${topic} /reply/${UUID .randomUUID()} "
102147
@@ -109,25 +154,26 @@ class MqttProtocolClient(
109154 )
110155
111156 val payload = content?.body
157+ val contentType = form.contentType
112158
113159 // Prepare and send the publish message with a response topic
114160 val publishMessage = Mqtt5Publish .builder()
115161 .topic(topic)
116162 .payload(payload)
163+ .contentType(contentType)
117164 .qos(MqttQos .AT_LEAST_ONCE )
118165 .responseTopic(responseTopic) // Set the response topic
119166 .build()
120167
121168 // Publish the message and await reply on the response topic
122169 return suspendCancellableCoroutine { continuation ->
123-
124170 client.subscribeWith()
125171 .topicFilter(responseTopic)
126172 .qos(MqttQos .AT_LEAST_ONCE ) // QoS level 1 for reliability
127173 .callback {
128174 response ->
129175 log.debug(" Response message consumed from topic '$responseTopic '" )
130- val replyContent = content?.type?. let { Content (it , response.payloadAsBytes) } ? : Content . EMPTY_CONTENT
176+ val replyContent = Content (contentType , response.payloadAsBytes)
131177 continuation.resume(replyContent)
132178
133179 // Unsubscribe from the response topic after receiving the response
@@ -163,51 +209,35 @@ class MqttProtocolClient(
163209 }
164210 }
165211
166- override suspend fun readResource ( form : Form ): Content = suspendCancellableCoroutine { continuation ->
167- val contentType = form.contentType ? : ContentManager . DEFAULT
168- val requestUri = URL ( form.href)
212+ // Function to read the resource using the request-reply pattern
213+ override suspend fun readResource ( form : Form ): Content {
214+ // Extract the content type from the form or use a default if not provided
169215
170- // Extract the topic from the path, removing any leading "/"
171- val filter = requestUri.path.removePrefix(" /" )
216+ // Extract the topic from the URI
217+ val requestUri = URI (form.href)
218+ val topic = requestUri.path.removePrefix(" /" ) // Removing leading "/"
172219
173220 try {
174-
175- Mqtt5Subscribe .builder().topicFilter(filter).build()
176-
177- // Subscribing to the topic
178- val subscription = client.subscribe(Mqtt5Subscribe .builder().topicFilter(filter).build())
179- { message ->
180- val content = Content (contentType, message.payloadAsBytes)
181- continuation.resume(content) // Resume the coroutine with the content
182-
183- // Unsubscribe after receiving the first message
184- client.unsubscribeWith().topicFilter(filter)
185- }
186-
187- // Ensure the subscription is canceled if the coroutine is canceled
188- continuation.invokeOnCancellation {
189- client.unsubscribeWith().topicFilter(filter)
190- }
191-
221+ // Call requestReply to send a request and get a reply
222+ return requestReply(form, null , topic) // Passing 'null' for content if it's just a read request
192223 } catch (e: Exception ) {
193- // Handle any exception during subscription
194- continuation.resumeWithException( e)
224+ // Handle any exception during request-reply
225+ throw ProtocolClientException ( " Failed to read resource from topic ' $topic ' " , e)
195226 }
196227 }
197228
229+ // Function to write the resource using the request-reply pattern
198230 override suspend fun writeResource (form : Form , content : Content ) {
199- val requestUri = URL (form.href)
200- val topic = requestUri.path.removePrefix(" /" )
201-
202- // Publishing message with optional retain and QoS settings
203- val payload = content.body
231+ // Extract the topic from the URI
232+ val requestUri = URI (form.href)
233+ val topic = requestUri.path.removePrefix(" /" ) // Removing leading "/"
204234
205- val publishMessage = Mqtt5Publish .builder()
206- .topic(topic)
207- .payload(payload)
208- .qos( MqttQos . AT_LEAST_ONCE )
209- .build()
210-
211- client.publish(publishMessage).await()
235+ try {
236+ // Call requestReply to send the content and get the reply
237+ requestReply(form, content, topic) // Send content to the topic and expect a reply
238+ } catch (e : Exception ) {
239+ // Handle any exception during request-reply
240+ throw ProtocolClientException ( " Failed to write resource to topic ' $topic ' " , e)
241+ }
212242 }
213243}
0 commit comments