@@ -11,6 +11,7 @@ import ai.ancf.lmos.wot.thing.form.Form
1111import ai.ancf.lmos.wot.thing.form.Operation
1212import ai.ancf.lmos.wot.thing.schema.ContentListener
1313import ai.ancf.lmos.wot.thing.schema.WoTExposedThing
14+ import ai.ancf.lmos.wot.tracing.withSpan
1415import ai.anfc.lmos.wot.binding.ProtocolServer
1516import ai.anfc.lmos.wot.binding.ProtocolServerException
1617import com.fasterxml.jackson.databind.DeserializationFeature
@@ -31,6 +32,10 @@ import io.ktor.websocket.*
3132import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics
3233import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics
3334import io.micrometer.core.instrument.binder.system.ProcessorMetrics
35+ import io.opentelemetry.api.trace.Span
36+ import io.opentelemetry.api.trace.SpanKind
37+ import io.opentelemetry.instrumentation.annotations.SpanAttribute
38+ import io.opentelemetry.instrumentation.annotations.WithSpan
3439import org.slf4j.LoggerFactory
3540import java.util.*
3641
@@ -154,11 +159,13 @@ class WebSocketProtocolServer(
154159
155160// Default server setup
156161fun defaultWebSocketServer (host : String , port : Int , servient : Servient ): EmbeddedServer <* , * > {
162+
157163 return embeddedServer(Netty , port = port, host = host) {
158164 setupRoutingWithWebSockets(servient)
159165 }
160166}
161167
168+
162169fun Application.setupRoutingWithWebSockets (servient : Servient ) {
163170 install(CallLogging )
164171 install(ContentNegotiation ) {
@@ -196,41 +203,74 @@ fun Application.setupRoutingWithWebSockets(servient: Servient) {
196203 }
197204 }
198205
206+
199207 webSocket(" /ws" ) {
200- val sessionId = UUID .randomUUID().toString()
201-
202- for (frame in incoming) {
203- if (frame is Frame .Text ) {
204- try {
205- // Deserialize the message to WoTMessage
206- val message: WoTMessage = JsonMapper .instance.readValue(frame.readText())
207- // Retrieve the thingId from the message
208- val thingId = message.thingId
209- val thing = servient.things[thingId]
210-
211- if (thing == null ) {
212- sendError(thingId, message.messageId, ErrorType .THING_NOT_FOUND )
213- return @webSocket
214- }
215-
216- // Handle the message based on its type
217- when (message) {
218- is ReadAllPropertiesMessage -> handleReadAllProperties(thing, message, thingId)
219- is ReadPropertyMessage -> handleReadProperty(thing, message, thingId)
220- is WritePropertyMessage -> handleWriteProperty(thing, message, thingId)
221- is ObservePropertyMessage -> handleObserveProperty(thing, message, thingId, sessionId)
222- is UnobservePropertyMessage -> handleUnobserveProperty(thing, message, thingId, sessionId)
223- is InvokeActionMessage -> handleInvokeAction(thing, message, thingId)
224- is SubscribeEventMessage -> handleSubscribeEvent(thing, message, thingId, sessionId)
225- is UnsubscribeEventMessage -> handleUnsubscribeEvent(thing, message, thingId, sessionId)
226- else -> sendError(thingId, message.messageId, ErrorType .UNSUPPORTED_MESSAGE_TYPE )
227- }
228- } catch (e: Exception ) {
229- val errorMessage = " Failed to read message"
230- log.warn(errorMessage, e)
231- close(CloseReason (CloseReason .Codes .CANNOT_ACCEPT , errorMessage))
208+ val sessionId = this .call.request.headers[" Sec-WebSocket-Key" ] ? : UUID .randomUUID().toString()
209+ handleWebSocketSession(sessionId, servient)
210+ }
211+ }
212+ }
213+
214+ @WithSpan(kind = SpanKind .SERVER )
215+ suspend fun DefaultWebSocketServerSession.handleWebSocketSession (
216+ @SpanAttribute(" websocket.session.id" ) sessionId : String ,
217+ servient : Servient
218+ ) {
219+ for (frame in incoming) {
220+ if (frame is Frame .Text ) {
221+ try {
222+ withSpan(" WebSocketProtocolServer.receiveMessage" , {
223+ setSpanKind(SpanKind .SERVER )
224+ }) { span ->
225+ // Deserialize the message to WoTMessage
226+ val message: WoTMessage = JsonMapper .instance.readValue(frame.readText())
227+ // Retrieve the thingId from the message
228+ val thingId = message.thingId
229+ val thing = servient.things[thingId]
230+
231+ span.setAttribute(" websocket.message.id" , message.messageId)
232+ span.setAttribute(" websocket.message.thing.id" , thingId)
233+ span.setAttribute(" websocket.session.id" , sessionId)
234+
235+ if (thing == null ) {
236+ sendError(thingId, message.messageId, ErrorType .THING_NOT_FOUND )
237+ return @withSpan
238+ }
239+
240+ // Handle the message based on its type
241+ when (message) {
242+ is ReadAllPropertiesMessage -> handleReadAllProperties(thing, message, thingId)
243+ is ReadPropertyMessage -> handleReadProperty(thing, message, thingId)
244+ is WritePropertyMessage -> handleWriteProperty(thing, message, thingId)
245+ is ObservePropertyMessage -> handleObserveProperty(
246+ thing,
247+ message,
248+ thingId,
249+ sessionId
250+ )
251+
252+ is UnobservePropertyMessage -> handleUnobserveProperty(
253+ thing,
254+ message,
255+ thingId,
256+ sessionId
257+ )
258+
259+ is InvokeActionMessage -> handleInvokeAction(thing, message, thingId)
260+ is SubscribeEventMessage -> handleSubscribeEvent(thing, message, thingId, sessionId)
261+ is UnsubscribeEventMessage -> handleUnsubscribeEvent(
262+ thing,
263+ message,
264+ thingId,
265+ sessionId
266+ )
267+
268+ else -> sendError(thingId, message.messageId, ErrorType .UNSUPPORTED_MESSAGE_TYPE )
232269 }
233270 }
271+ } catch (e: Exception ) {
272+ val errorMessage = " Failed to read message"
273+ close(CloseReason (CloseReason .Codes .CANNOT_ACCEPT , errorMessage))
234274 }
235275 }
236276 }
@@ -248,6 +288,9 @@ enum class ErrorType {
248288 UNSUPPORTED_MESSAGE_TYPE
249289}
250290
291+
292+
293+ @WithSpan(kind = SpanKind .SERVER )
251294suspend fun DefaultWebSocketServerSession.sendError (thingId : String , correlationId : String , errorType : ErrorType , message : String? = null) {
252295 val errorJson = when (errorType) {
253296 ErrorType .THING_NOT_FOUND -> createThingNotFound(thingId, correlationId)
@@ -262,6 +305,7 @@ suspend fun DefaultWebSocketServerSession.sendError(thingId: String, correlation
262305 sendSerialized(errorJson)
263306}
264307
308+
265309private suspend fun readProperty (
266310 thing : ExposedThing ,
267311 propertyName : String ,
@@ -275,7 +319,8 @@ private suspend fun readProperty(
275319 )
276320}
277321
278- suspend fun DefaultWebSocketServerSession.handleReadAllProperties (thing : ExposedThing , message : ReadAllPropertiesMessage , thingId : String ) {
322+ @WithSpan(kind = SpanKind .SERVER )
323+ suspend fun DefaultWebSocketServerSession.handleReadAllProperties (thing : ExposedThing , message : ReadAllPropertiesMessage , @SpanAttribute(" thingId" ) thingId : String ) {
279324
280325 try {
281326 val propertyMap = thing.handleReadAllProperties().mapValues { entry ->
@@ -292,7 +337,8 @@ suspend fun DefaultWebSocketServerSession.handleReadAllProperties(thing: Exposed
292337 }
293338}
294339
295- suspend fun DefaultWebSocketServerSession.handleReadProperty (thing : ExposedThing , message : ReadPropertyMessage , thingId : String ) {
340+ @WithSpan(kind = SpanKind .SERVER )
341+ suspend fun DefaultWebSocketServerSession.handleReadProperty (thing : ExposedThing , message : ReadPropertyMessage , @SpanAttribute(" thingId" ) thingId : String ) {
296342 val propertyName = message.property
297343 val property = thing.properties[propertyName]
298344
@@ -311,7 +357,8 @@ suspend fun DefaultWebSocketServerSession.handleReadProperty(thing: ExposedThing
311357 }
312358}
313359
314- suspend fun DefaultWebSocketServerSession.handleWriteProperty (thing : ExposedThing , message : WritePropertyMessage , thingId : String ) {
360+ @WithSpan(kind = SpanKind .SERVER )
361+ suspend fun DefaultWebSocketServerSession.handleWriteProperty (thing : ExposedThing , message : WritePropertyMessage , @SpanAttribute(" thingId" )thingId : String ) {
315362 val propertyName = message.property
316363 val data = message.data
317364 val property = thing.properties[propertyName]
@@ -330,8 +377,8 @@ suspend fun DefaultWebSocketServerSession.handleWriteProperty(thing: ExposedThin
330377 }
331378 }
332379}
333-
334- suspend fun DefaultWebSocketServerSession.handleObserveProperty (thing : ExposedThing , message : ObservePropertyMessage , thingId : String , sessionId : String ) {
380+ @WithSpan(kind = SpanKind . SERVER )
381+ suspend fun DefaultWebSocketServerSession.handleObserveProperty (thing : ExposedThing , message : ObservePropertyMessage , @SpanAttribute( " thingId" ) thingId : String , @SpanAttribute( " sessionId " ) sessionId : String ) {
335382 val propertyName = message.property
336383 val property = thing.properties[propertyName]
337384
@@ -360,8 +407,8 @@ suspend fun DefaultWebSocketServerSession.handleObserveProperty(thing: ExposedTh
360407 }
361408 }
362409}
363-
364- suspend fun DefaultWebSocketServerSession.handleUnobserveProperty (thing : ExposedThing , message : UnobservePropertyMessage , thingId : String , sessionId : String ) {
410+ @WithSpan(kind = SpanKind . SERVER )
411+ suspend fun DefaultWebSocketServerSession.handleUnobserveProperty (thing : ExposedThing , message : UnobservePropertyMessage , @SpanAttribute( " thingId" ) thingId : String , @SpanAttribute( " sessionId " ) sessionId : String ) {
365412 val propertyName = message.property
366413 val property = thing.properties[propertyName]
367414
@@ -377,11 +424,16 @@ suspend fun DefaultWebSocketServerSession.handleUnobserveProperty(thing: Exposed
377424 }
378425 }
379426}
427+ @WithSpan(kind = SpanKind .SERVER )
428+ suspend fun DefaultWebSocketServerSession.handleInvokeAction (thing : ExposedThing , message : InvokeActionMessage , @SpanAttribute(" thingId" ) thingId : String ) {
429+
380430
381- suspend fun DefaultWebSocketServerSession.handleInvokeAction (thing : ExposedThing , message : InvokeActionMessage , thingId : String ) {
382431 val actionName = message.action
383432 val action = thing.actions[actionName]
384433
434+ Span .current().setAttribute(" wot.thing.id" , thingId)
435+ Span .current().setAttribute(" wot.action.name" , actionName)
436+
385437 if (action == null ) {
386438 sendError(thingId, message.messageId, ErrorType .ACTION_NOT_FOUND , actionName)
387439 } else {
@@ -401,8 +453,8 @@ suspend fun DefaultWebSocketServerSession.handleInvokeAction(thing: ExposedThing
401453 }
402454 }
403455}
404-
405- suspend fun DefaultWebSocketServerSession.handleSubscribeEvent (thing : ExposedThing , message : SubscribeEventMessage , thingId : String , sessionId : String ) {
456+ @WithSpan(kind = SpanKind . SERVER )
457+ suspend fun DefaultWebSocketServerSession.handleSubscribeEvent (thing : ExposedThing , message : SubscribeEventMessage , @SpanAttribute( " thingId" ) thingId : String , @SpanAttribute( " sessionId " ) sessionId : String ) {
406458 val eventName = message.event
407459 val event = thing.events[eventName]
408460
@@ -431,8 +483,8 @@ suspend fun DefaultWebSocketServerSession.handleSubscribeEvent(thing: ExposedThi
431483 }
432484 }
433485}
434-
435- suspend fun DefaultWebSocketServerSession.handleUnsubscribeEvent (thing : ExposedThing , message : UnsubscribeEventMessage , thingId : String , sessionId : String ) {
486+ @WithSpan(kind = SpanKind . SERVER )
487+ suspend fun DefaultWebSocketServerSession.handleUnsubscribeEvent (thing : ExposedThing , message : UnsubscribeEventMessage , @SpanAttribute( " thingId" ) thingId : String , @SpanAttribute( " sessionId " ) sessionId : String ) {
436488 val eventName = message.event
437489 val event = thing.events[eventName]
438490
0 commit comments