Skip to content

Commit 8a09eea

Browse files
author
Robert Winkler
committed
Enhanced WebSocketProtocolClient
1 parent b07eb97 commit 8a09eea

File tree

9 files changed

+163
-77
lines changed

9 files changed

+163
-77
lines changed

kotlin-wot-binding-mqtt/src/main/kotlin/mqtt/MqttProtocolServer.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ class MqttProtocolServer(
139139

140140
val href = "$baseUrl$topic"
141141
val form = Form(href= href,
142-
contentType = ContentManager.DEFAULT,
142+
contentType = ContentManager.DEFAULT_MEDIA_TYPE,
143143
op = listOf(Operation.INVOKE_ACTION))
144144
action.forms += (form)
145145
log.debug("Assigned '{}' to Action '{}'", href, name)
@@ -153,7 +153,7 @@ class MqttProtocolServer(
153153
val topic = "${thing.id}/events/$name"
154154
val href = "$baseUrl$topic"
155155
val form = Form(href= href,
156-
contentType = ContentManager.DEFAULT,
156+
contentType = ContentManager.DEFAULT_MEDIA_TYPE,
157157
op = listOf(Operation.SUBSCRIBE_EVENT, Operation.UNSUBSCRIBE_EVENT),
158158
optionalProperties= mapOf("mqtt:qos" to 0, "mqtt:retain" to false)
159159
)
@@ -294,7 +294,7 @@ class MqttProtocolServer(
294294
respondToTopic(responseContent, message.responseTopic.get())
295295
} else {
296296
// If payload is provided, consider it a write request
297-
val inputContent = Content(ContentManager.DEFAULT, message.payloadAsBytes)
297+
val inputContent = Content(ContentManager.DEFAULT_MEDIA_TYPE, message.payloadAsBytes)
298298
val responseContent = thing.handleWriteProperty(propertyName, inputContent)
299299
respondToTopic(responseContent, message.responseTopic.get())
300300
}
@@ -307,7 +307,7 @@ class MqttProtocolServer(
307307
log.warn("Action '{}' not found on thing '{}'", actionName, thing.id)
308308
return
309309
}
310-
val inputContent = Content(ContentManager.DEFAULT, message.payloadAsBytes)
310+
val inputContent = Content(ContentManager.DEFAULT_MEDIA_TYPE, message.payloadAsBytes)
311311
val actionResult = thing.handleInvokeAction(actionName, inputContent)
312312
respondToTopic(actionResult, message.responseTopic.get())
313313
}

kotlin-wot-binding-websocket/src/main/kotlin/websocket/WebSocketProtocolClient.kt

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class WebSocketProtocolClient(
140140
try {
141141
when (val woTMessage = JsonMapper.instance.readValue<WoTMessage>(messageText)) {
142142
is Acknowledgement -> handleReplyMessage(woTMessage)
143-
is ErrorMessage -> handleErrorMessage(woTMessage)
143+
is ErrorMessage -> handleReplyMessage(woTMessage)
144144
is PropertyReadingMessage -> handleReplyMessage(woTMessage)
145145
is ActionStatusMessage -> handleReplyMessage(woTMessage)
146146
is EventMessage -> handleEventMessage(woTMessage)
@@ -194,13 +194,18 @@ class WebSocketProtocolClient(
194194
if (message.correlationId != null) {
195195
val handler = requestHandlers.remove(message.correlationId)
196196
if (handler != null) {
197-
handler.completeExceptionally(
198-
ProtocolClientException("Error received: ${message.title} - ${message.detail}")
199-
)
197+
val errorMessage = buildString {
198+
append("Error received")
199+
message.correlationId?.let { append(" for correlationId: $it") }
200+
append(" - Title: ${message.title}, Detail: ${message.detail}")
201+
}
202+
handler.completeExceptionally(ProtocolClientException(errorMessage))
200203
return
201204
}
202205
}
203-
log.warn("Unhandled ErrorMessage without a correlationId: ${message.title}")
206+
log.warn(
207+
"Unhandled ErrorMessage received - Title: ${message.title}, Detail: ${message.detail}, CorrelationId: ${message.correlationId ?: "N/A"}"
208+
)
204209
}
205210
else -> {
206211
log.warn("Unhandled message type in handleReplyMessage: ${message::class.simpleName}")
@@ -228,16 +233,23 @@ class WebSocketProtocolClient(
228233
}
229234
}
230235

231-
private fun handleErrorMessage(message: ErrorMessage) {
232-
val handler = requestHandlers.remove(message.correlationId)
233-
handler?.completeExceptionally(ProtocolClientException("Error received: ${message.title} - ${message.detail}"))
234-
}
235-
236-
private suspend fun requestAndReply(form: Form, message: WoTMessage): Content {
236+
private suspend fun requestAndReply(form: Form, message: WoTMessage, timeoutMillis: Long = 5000L): Content {
237237
val session = getOrCreateSession(form.href)
238238
val deferred = CompletableDeferred<Content>()
239+
239240
requestHandlers[message.messageId] = deferred
240-
session.sendSerialized(message)
241-
return deferred.await()
241+
242+
try {
243+
session.sendSerialized(message)
244+
return withTimeout(timeoutMillis) {
245+
deferred.await()
246+
}
247+
} catch (e: TimeoutCancellationException) {
248+
requestHandlers.remove(message.messageId)
249+
throw ProtocolClientException("Request timed out for '${message.messageType}' message to thing '${message.thingId}' with messageId: '${message.messageId}'. No response received within ${timeoutMillis} [ms].", e)
250+
} catch (e: Exception) {
251+
requestHandlers.remove(message.messageId)
252+
throw e
253+
}
242254
}
243255
}

kotlin-wot-binding-websocket/src/test/kotlin/websocket/WebSocketProtocolClientTest.kt

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import ai.ancf.lmos.wot.thing.ExposedThing
99
import ai.ancf.lmos.wot.thing.exposedThing
1010
import ai.ancf.lmos.wot.thing.schema.*
1111
import io.mockk.clearAllMocks
12+
import kotlinx.coroutines.runBlocking
1213
import kotlinx.coroutines.test.runTest
1314
import java.util.concurrent.CountDownLatch
1415
import java.util.concurrent.TimeUnit
@@ -84,14 +85,18 @@ class WebSocketProtocolClientTest {
8485
}.setPropertyReadHandler(PROPERTY_NAME) {
8586
property1.toInteractionInputValue()
8687
}.setPropertyReadHandler(PROPERTY_NAME_2) {
87-
5.toInteractionInputValue()
88+
"test".toInteractionInputValue()
8889
}.setActionHandler(ACTION_NAME) { input, _->
8990
val inputString = input.value() as DataSchemaValue.StringValue
9091
"${inputString.value} 10".toInteractionInputValue()
9192
}.setPropertyWriteHandler(PROPERTY_NAME) { input, _->
9293
val inputInt = input.value() as DataSchemaValue.IntegerValue
9394
property1 = inputInt.value
9495
property1.toInteractionInputValue()
96+
}.setPropertyWriteHandler(PROPERTY_NAME_2) { input, _->
97+
val inputInt = input.value() as DataSchemaValue.StringValue
98+
property2 = inputInt.value
99+
property2.toInteractionInputValue()
95100
}.setActionHandler(ACTION_NAME_2) { input, _->
96101
"test test".toInteractionInputValue()
97102
}.setActionHandler(ACTION_NAME_3) { input, _->
@@ -123,48 +128,64 @@ class WebSocketProtocolClientTest {
123128
}
124129

125130
@Test
126-
fun `should get property`() = runTest{
131+
fun `should get property`() = runBlocking {
127132

128133
val readProperty1 = thing.readProperty(PROPERTY_NAME).value()
129134
assertEquals(10, (readProperty1 as DataSchemaValue.IntegerValue).value)
130135

131136
val readProperty2 = thing.readProperty(PROPERTY_NAME_2).value()
132-
assertEquals(5, (readProperty2 as DataSchemaValue.IntegerValue).value)
137+
assertEquals("test", (readProperty2 as DataSchemaValue.StringValue).value)
133138

134139
}
135140

136141
@Test
137-
fun `should write property`() = runTest{
142+
fun `should get all properties`() = runBlocking {
143+
val readPropertyMap = thing.readAllProperties()
144+
assertEquals(10, (readPropertyMap[PROPERTY_NAME]?.value() as DataSchemaValue.IntegerValue).value)
145+
assertEquals("test", (readPropertyMap[PROPERTY_NAME_2]?.value() as DataSchemaValue.StringValue).value)
146+
}
147+
148+
@Test
149+
fun `should write property`() = runBlocking {
138150
thing.writeProperty(PROPERTY_NAME, 20.toInteractionInputValue())
139151

140152
assertEquals(20, property1)
141153
}
142154

155+
@Test
156+
fun `should write multiple properties`() = runBlocking {
157+
158+
thing.writeMultipleProperties(mapOf(PROPERTY_NAME to 30.toDataSchemeValue(), PROPERTY_NAME_2 to "new".toDataSchemeValue()))
159+
160+
assertEquals(30, property1)
161+
assertEquals("new", property2)
162+
}
163+
143164

144165
@Test
145-
fun `should invoke action`() = runTest{
146-
val response = thing.invokeAction(ACTION_NAME, "test".toInteractionInputValue()).value()
166+
fun `should invoke action`() = runBlocking {
167+
val response = thing.invokeAction(ACTION_NAME, "test".toDataSchemeValue())
147168

148169
assertEquals("test 10", (response as DataSchemaValue.StringValue).value)
149170
}
150171

151172
@Test
152-
fun `should invoke action without input`() = runTest{
153-
val response = thing.invokeAction(ACTION_NAME_2).value()
173+
fun `should invoke action without input`() = runBlocking {
174+
val response = thing.invokeAction(ACTION_NAME_2)
154175

155176
assertEquals("test test", (response as DataSchemaValue.StringValue).value)
156177
}
157178

158179
@Test
159-
fun `should invoke action without output`() = runTest{
160-
val response = thing.invokeAction(ACTION_NAME_3, "test".toInteractionInputValue()).value()
180+
fun `should invoke action without output`(): Unit = runBlocking {
181+
val response = thing.invokeAction(ACTION_NAME_3, "test".toDataSchemeValue())
161182
assertEquals("test", property2)
162183
assertIs<DataSchemaValue.NullValue>(response)
163184
}
164185

165186

166187
@Test
167-
fun `should subscribe to event`() = runTest{
188+
fun `should subscribe to event`() = runBlocking {
168189

169190
val lock = CountDownLatch(2);
170191

@@ -184,7 +205,7 @@ class WebSocketProtocolClientTest {
184205
}
185206

186207
@Test
187-
fun `should observe property`() = runTest{
208+
fun `should observe property`() = runBlocking {
188209

189210
val lock = CountDownLatch(2);
190211

kotlin-wot/src/main/kotlin/content/Content.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ package ai.ancf.lmos.wot.content
33
/**
44
* Represents any serialized content. Enables the transfer of arbitrary data structures.
55
*/
6-
data class Content(val type: String = ContentManager.DEFAULT, val body: ByteArray = ByteArray(0)) {
6+
data class Content(val type: String = ContentManager.DEFAULT_MEDIA_TYPE, val body: ByteArray = ByteArray(0)) {
77

88
companion object {
9-
val EMPTY_CONTENT = Content(ContentManager.DEFAULT, ByteArray(0))
9+
val EMPTY_CONTENT = Content(ContentManager.DEFAULT_MEDIA_TYPE, ByteArray(0))
1010
}
1111

1212
override fun equals(other: Any?): Boolean {
@@ -30,5 +30,5 @@ data class Content(val type: String = ContentManager.DEFAULT, val body: ByteArra
3030

3131
fun String.toJsonContent(): Content{
3232
val jsonContent = """"$this""""
33-
return Content(ContentManager.DEFAULT, jsonContent.toByteArray())
33+
return Content(ContentManager.DEFAULT_MEDIA_TYPE, jsonContent.toByteArray())
3434
}

kotlin-wot/src/main/kotlin/content/ContentManager.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import java.io.*
1010

1111

1212
object ContentManager {
13-
const val DEFAULT = "application/json"
13+
const val DEFAULT_MEDIA_TYPE = "application/json"
1414
private val log = LoggerFactory.getLogger(ContentManager::class.java)
1515
private val CODECS: MutableMap<String, ContentCodec> = mutableMapOf()
1616
private val OFFERED: MutableSet<String> = HashSet()
@@ -191,8 +191,8 @@ object ContentManager {
191191
}
192192

193193
fun valueToContent(value: DataSchemaValue, contentType: String?): Content {
194-
val mediaType = getMediaType(contentType ?: DEFAULT)
195-
val parameters = getMediaTypeParameters(contentType ?: DEFAULT)
194+
val mediaType = getMediaType(contentType ?: DEFAULT_MEDIA_TYPE)
195+
val parameters = getMediaTypeParameters(contentType ?: DEFAULT_MEDIA_TYPE)
196196

197197
// Select codec based on mediaType and log the action
198198
val codec = CODECS[mediaType]
@@ -203,7 +203,7 @@ object ContentManager {
203203
log.warn("Content passthrough due to unsupported serialization format '$mediaType'")
204204
fallbackValueToBytes(value)
205205
}
206-
return Content(contentType ?: DEFAULT, bytes)
206+
return Content(contentType ?: DEFAULT_MEDIA_TYPE, bytes)
207207
}
208208

209209
/**
@@ -219,8 +219,8 @@ object ContentManager {
219219
*/
220220
fun valueToContent(value: Any?, contentType: String?): Content {
221221
// Use a default value for contentType if null
222-
val mediaType = getMediaType(contentType ?: DEFAULT)
223-
val parameters = getMediaTypeParameters(contentType ?: DEFAULT)
222+
val mediaType = getMediaType(contentType ?: DEFAULT_MEDIA_TYPE)
223+
val parameters = getMediaTypeParameters(contentType ?: DEFAULT_MEDIA_TYPE)
224224

225225
// Select codec based on mediaType and log the action
226226
val codec = CODECS[mediaType]
@@ -232,7 +232,7 @@ object ContentManager {
232232
fallbackValueToBytes(value)
233233
}
234234

235-
return Content(contentType ?: DEFAULT, bytes)
235+
return Content(contentType ?: DEFAULT_MEDIA_TYPE, bytes)
236236
}
237237

238238
private fun fallbackValueToBytes(value: Any?): ByteArray {

kotlin-wot/src/main/kotlin/thing/ConsumedThing.kt

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,13 @@ data class ConsumedThing(
138138
}
139139
}
140140

141-
override suspend fun writeProperty(propertyName: String, value: InteractionInput, options: InteractionOptions?) {
141+
override suspend fun writeProperty(propertyName: String, input: InteractionInput, options: InteractionOptions?) {
142+
val interactionValue = input as? InteractionInput.Value
143+
?: throw UnsupportedOperationException("Streaming input is not supported for property: $propertyName")
144+
writeProperty(propertyName, interactionValue.value)
145+
}
146+
147+
override suspend fun writeProperty(propertyName: String, value: DataSchemaValue, options: InteractionOptions?) {
142148
val property = this.properties[propertyName]
143149
requireNotNull(property) { "ConsumedThing '${this.title}' does not have property $propertyName" }
144150

@@ -153,9 +159,7 @@ data class ConsumedThing(
153159
// Handle URI variables if present
154160
val finalForm = handleUriVariables(this, property, form, options)
155161

156-
val interactionValue = value as InteractionInput.Value
157-
158-
val content = ContentManager.valueToContent(interactionValue.value, finalForm.contentType)
162+
val content = ContentManager.valueToContent(value, finalForm.contentType)
159163

160164
client.writeResource(Resource(id, propertyName, finalForm), content)
161165

@@ -182,16 +186,16 @@ data class ConsumedThing(
182186
}
183187
}
184188

185-
override suspend fun invokeAction(
189+
private suspend fun invokeActionInternal(
186190
actionName: String,
187-
params: InteractionInput,
191+
value: DataSchemaValue,
188192
options: InteractionOptions?
189193
): WoTInteractionOutput {
190194
val action = this.actions[actionName]
191195
requireNotNull(action) { "ConsumedThing '${this.title}' does not have action $actionName" }
192196

193-
return try {
194-
// Retrieve the client and form for the property
197+
try {
198+
// Retrieve the client and form for the action
195199
val (client, form) = getClientFor(action.forms, Operation.INVOKE_ACTION)
196200

197201
// Log the action
@@ -200,19 +204,44 @@ data class ConsumedThing(
200204
// Handle URI variables if present
201205
val finalForm = handleUriVariables(this, action, form, options)
202206

203-
val interactionValue = params as InteractionInput.Value
204-
205-
val content = ContentManager.valueToContent(interactionValue.value, finalForm.contentType)
207+
val content = ContentManager.valueToContent(value, finalForm.contentType)
206208

209+
// Invoke the action
207210
val response = client.invokeResource(Resource(id, actionName, finalForm), content)
208211

209-
InteractionOutput(response, action.output)
210-
212+
return InteractionOutput(response, action.output)
211213
} catch (e: Exception) {
212214
throw ConsumedThingException("Error while invoking action for ${action.title}. ${e.message}", e)
213215
}
214216
}
215217

218+
override suspend fun invokeAction(
219+
actionName: String,
220+
input: InteractionInput,
221+
options: InteractionOptions?
222+
): WoTInteractionOutput {
223+
val interactionValue = input as? InteractionInput.Value
224+
?: throw UnsupportedOperationException("Streaming input is not supported for action: $actionName")
225+
return invokeActionInternal(actionName, interactionValue.value, options)
226+
}
227+
228+
override suspend fun invokeAction(
229+
actionName: String,
230+
input: DataSchemaValue,
231+
options: InteractionOptions?
232+
): DataSchemaValue {
233+
val output = invokeActionInternal(actionName, input, options)
234+
return output.value()
235+
}
236+
237+
override suspend fun invokeAction(
238+
actionName: String,
239+
options: InteractionOptions?
240+
): DataSchemaValue {
241+
val output = invokeActionInternal(actionName, DataSchemaValue.NullValue, options)
242+
return output.value()
243+
}
244+
216245
override suspend fun observeProperty(
217246
propertyName: String,
218247
listener: InteractionListener,

0 commit comments

Comments
 (0)