11package ai.ancf.lmos.wot.binding.websocket
22
33
4+ import ai.ancf.lmos.wot.JsonMapper
45import ai.ancf.lmos.wot.content.Content
6+ import ai.ancf.lmos.wot.content.ContentManager
57import ai.ancf.lmos.wot.thing.form.Form
68import ai.anfc.lmos.wot.binding.ProtocolClient
79import ai.anfc.lmos.wot.binding.ProtocolClientException
10+ import ai.anfc.lmos.wot.binding.Resource
811import io.ktor.client.*
912import io.ktor.client.engine.cio.*
1013import io.ktor.client.plugins.websocket.*
11- import io.ktor.http.*
1214import io.ktor.serialization.jackson.*
1315import io.ktor.websocket.*
1416import kotlinx.coroutines.CompletableDeferred
17+ import kotlinx.coroutines.sync.Mutex
18+ import kotlinx.coroutines.sync.withLock
1519import org.slf4j.LoggerFactory
20+ import java.util.concurrent.ConcurrentHashMap
21+ import kotlin.collections.set
1622
1723class WebSocketProtocolClient (
1824 private val httpClientConfig : HttpClientConfig ? = null ,
1925 private val client : HttpClient = HttpClient (CIO ) {
20- install(WebSockets ){
21- contentConverter = JacksonWebsocketContentConverter ()
26+ install(WebSockets ) {
27+ contentConverter = JacksonWebsocketContentConverter (JsonMapper .instance )
2228 }
2329 }
2430) : ProtocolClient {
2531 companion object {
2632 private val log = LoggerFactory .getLogger(WebSocketProtocolClient ::class .java)
2733 }
2834
29- private var session: DefaultClientWebSocketSession ? = null
35+ // Cache for WebSocket sessions, keyed by href
36+ private val sessionCache = ConcurrentHashMap <String , DefaultClientWebSocketSession >()
37+ private val cacheMutex = Mutex ()
3038
3139 override suspend fun start () {
3240 log.info(" Starting WebSocketProtocolClient" )
33- client.webSocket(
34- method = HttpMethod .Get ,
35- host = httpClientConfig?.address ? : " localhost" ,
36- port = httpClientConfig?.port ? : 80 ,
37- path = " /ws"
38- ) {
39- session = this
40- }
41+ // No global connection to start. Connections are established per href.
4142 }
4243
4344 override suspend fun stop () {
4445 log.info(" Stopping WebSocketProtocolClient" )
45- session?.close()
46- session = null
46+ // Close all cached sessions
47+ sessionCache.values.forEach { session ->
48+ try {
49+ session.close()
50+ } catch (e: Exception ) {
51+ log.warn(" Error closing WebSocket session: ${e.message} " , e)
52+ }
53+ }
54+ sessionCache.clear()
55+ }
56+
57+ override suspend fun readResource (resource : Resource ): Content {
58+ return sendMessage(resource.form, ReadPropertyMessage (resource.thingId, property = resource.name))
4759 }
4860
49- override suspend fun readResource (form : Form ): Content {
50- return resolveRequestToContent(form)
61+ override suspend fun writeResource (resource : Resource , content : Content ) {
62+ sendMessage(resource.form, WritePropertyMessage (resource.thingId, property = resource.name,
63+ data = JsonMapper .instance.readTree(content.body)
64+ ))
5165 }
5266
53- private suspend fun resolveRequestToContent (form : Form , content : Content ? = null): Content {
67+ override suspend fun invokeResource (resource : Resource , content : Content ? ): Content {
68+ return sendMessage(resource.form, InvokeActionMessage (resource.thingId, action = resource.name,
69+ input = JsonMapper .instance.readTree(content?.body)
70+ ))
71+ }
72+
73+ private suspend fun sendMessage (form : Form , message : WoTMessage ): Content {
74+ val session = getOrCreateSession(form.href)
75+
5476 val response = CompletableDeferred <Content >()
5577
5678 try {
57- session?.let {
58- it.sendSerialized(ReadPropertyMessage (" test" , property = " test" ))
59-
60- val readingMessage = it.receiveDeserialized<PropertyReadingMessage >()
79+ session.sendSerialized(message)
6180
62- val responseContent = Content (
63- body = readingMessage.data.binaryValue()
64- )
65- response.complete(responseContent)
81+ when (val woTMessage = session.receiveDeserialized<WoTMessage >()) {
82+ is ErrorMessage -> throw ProtocolClientException (" Error received: ${woTMessage.title} - ${woTMessage.detail} " )
83+ is PropertyReadingMessage -> {
84+ val responseContent = ContentManager .valueToContent(woTMessage.data)
85+ response.complete(responseContent)
86+ }
87+ is ActionStatusMessage -> {
88+ val responseContent = ContentManager .valueToContent(woTMessage.output)
89+ response.complete(responseContent)
90+ }
91+ else -> throw ProtocolClientException (" Unexpected message type received: ${woTMessage::class .simpleName} " )
6692 }
6793 } catch (e: Exception ) {
68- response.completeExceptionally(ProtocolClientException (" Error during http request: ${e.message} " , e))
94+ response.completeExceptionally(ProtocolClientException (" Error during WebSocket request: ${e.message} " , e))
6995 }
96+
7097 return response.await()
7198 }
99+
100+ private suspend fun getOrCreateSession (href : String ): DefaultClientWebSocketSession {
101+ cacheMutex.withLock {
102+ // Perform both the check and the update within the same lock
103+ sessionCache[href]?.let { return it }
104+ // If no session exists, create a new one
105+ val newSession = createSession(href)
106+ sessionCache[href] = newSession
107+ return newSession
108+ }
109+ }
110+
111+ private suspend fun createSession (href : String ): DefaultClientWebSocketSession {
112+ try {
113+ return client.webSocketSession (href)
114+ } catch (e: Exception ) {
115+ throw ProtocolClientException (" Failed to create WebSocket session for $href " , e)
116+ }
117+ }
72118}
0 commit comments