Skip to content

Commit 5825da6

Browse files
author
Robert Winkler
committed
Fixes
1 parent 55158f8 commit 5825da6

File tree

9 files changed

+110
-26
lines changed

9 files changed

+110
-26
lines changed

kotlin-wot-binding-websocket/src/main/kotlin/websocket/WebSocketProtocolClient.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class WebSocketProtocolClient(
9191
requestAndReply(resource.form, message)
9292
} finally {
9393
resourceChannels[resource.name]?.close()
94+
resourceChannels.remove(resource.name)
9495
}
9596
}
9697

@@ -105,7 +106,7 @@ class WebSocketProtocolClient(
105106
resourceChannels[resource.name] = channel
106107

107108
return channel.consumeAsFlow().onCompletion {
108-
resourceChannels.remove(resource.name)
109+
unlinkResource(resource, resourceType)
109110
}
110111
}
111112

kotlin-wot-integration-tests/build.gradle.kts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ plugins {
88
id("io.spring.dependency-management") version "1.1.7"
99
}
1010

11-
tasks.named<Test>("test") {
12-
enabled = false
13-
}
11+
//tasks.named<Test>("test") {
12+
// enabled = false
13+
//}
1414

1515
dependencies {
1616
// Replace the following with the starter dependencies of specific modules you wish to use
@@ -47,6 +47,7 @@ dependencies {
4747
testImplementation("org.springframework.boot:spring-boot-starter-test")
4848
testImplementation("com.hivemq:hivemq-mqtt-client:1.3.3")
4949
implementation("org.testcontainers:testcontainers:1.20.3")
50+
testImplementation("app.cash.turbine:turbine:1.2.0")
5051

5152
}
5253

kotlin-wot-integration-tests/src/main/kotlin/integration/ThingToFunctionsMapper.kt

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,13 @@ object ThingToFunctionsMapper {
3737

3838
suspend fun requestThingDescription(wot: Wot, functions: Functions, group: String, url: String, securityScheme: SecurityScheme = NoSecurityScheme()): List<LLMFunction> {
3939
val thingDescription = wot.requestThingDescription(url, securityScheme)
40-
val consumedThings = consumeThings(wot, setOf(thingDescription))
41-
val retrieveAllFunction = createRetrieveAllFunction(functions, group, setOf(thingDescription))
42-
mapAllThingFunctions(functions, group, consumedThings)
43-
val allFunctions = retrieveAllFunction+ functionCache.values.flatten()
40+
val consumedThing = wot.consume(thingDescription)
41+
mapThingDescriptionToFunctions2(functions, group, thingDescription)
42+
val allFunctions = functionCache.values.flatten()
4443
return allFunctions
4544
}
4645

47-
fun createRetrieveAllFunction(functions: Functions, group: String, thingDescriptions: Set<WoTThingDescription>): List<LLMFunction> {
46+
fun createRetrieveAllFunction(functions: Functions, group: String, thingDescriptions: Set<WoTThingDescription>): List<LLMFunction> {
4847
return functions("retrieveAllThings", "Returns the metadata information of all available devices.", group) {
4948
summarizeThingDescriptions(thingDescriptions)
5049
}
@@ -105,13 +104,13 @@ object ThingToFunctionsMapper {
105104
val params = defaultParams + actionParams
106105
functionCache.getOrPut(actionName) {
107106
functions(actionName, action.description ?: "No Description available", group, params) { (thingId, input) ->
108-
invokeAction(thingDescription.id, actionName, input, action.input)
107+
invokeAction(thingId, actionName, input, action.input)
109108
}
110109
}
111110
}
112111
}
113112

114-
private suspend fun invokeAction(thingId: String, actionName: String, input: String?, inputSchema: DataSchema<*>?): String {
113+
private suspend fun invokeAction(thingId: String?, actionName: String, input: String?, inputSchema: DataSchema<*>?): String {
115114
return try {
116115
val jsonInput : JsonNode = inputSchema?.let { mapSchemaToJsonNode(it, input!!) } ?: TextNode(input)
117116
thingDescriptionsMap[thingId]?.invokeAction(actionName, jsonInput)?.asText()

kotlin-wot-integration-tests/src/main/resources/application.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ arc:
2727
clients:
2828
- id: GPT-4o
2929
model-name: GPT35T-1106
30-
api-key: 0c798e9b59c64f07ab35d0b1aee65d19
30+
api-key: dummy
3131
client: langchain4j-azure
3232
url: https://gpt4-uk.openai.azure.com
3333
tools:

kotlin-wot-integration-tests/src/test/kotlin/integration/QuickTest.kt

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@ package ai.ancf.lmos.wot.integration
44
import ai.ancf.lmos.sdk.agents.WotConversationalAgent
55
import ai.ancf.lmos.sdk.agents.lastMessage
66
import ai.ancf.lmos.sdk.agents.toAgentRequest
7+
import ai.ancf.lmos.sdk.model.AgentEvent
8+
import kotlinx.coroutines.CoroutineScope
9+
import kotlinx.coroutines.Dispatchers
10+
import kotlinx.coroutines.flow.launchIn
11+
import kotlinx.coroutines.flow.onEach
712
import kotlinx.coroutines.runBlocking
8-
import org.eclipse.lmos.arc.agents.AgentEvent
913
import java.util.concurrent.CountDownLatch
1014
import kotlin.test.Test
1115

@@ -41,11 +45,13 @@ class QuickTest {
4145
println("Event: $it")
4246
latch.countDown()
4347
}
44-
*/
45-
agent.consumeEvent("agentEvent", AgentEvent::class).collect {
48+
*/
49+
50+
agent.consumeEvent("agentEvent", AgentEvent::class).onEach {
4651
println("Event: $it")
4752
latch.countDown()
48-
}
53+
}.launchIn(CoroutineScope(Dispatchers.IO))
54+
4955

5056
//val command = "What is the state of my lamp?"
5157
val command = "Scrape the page https://eclipse.dev/lmos/\""

kotlin-wot/src/main/kotlin/thing/ConsumedThing.kt

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory
2929
import java.io.File
3030
import java.io.IOException
3131
import java.net.MalformedURLException
32-
import java.net.URL
32+
import java.net.URI
3333
import java.util.concurrent.ConcurrentHashMap
3434
import java.util.concurrent.ConcurrentMap
3535

@@ -337,16 +337,27 @@ data class ConsumedThing(
337337

338338
if (subscribedEvents.containsKey(eventName)) {
339339
return flow {
340-
throw IllegalStateException("ConsumedThing '$title' already has a function subscribed to $eventName. You can only subscribe once.")
340+
throw ConsumedThingException("ConsumedThing '$title' already has a function subscribed to $eventName. You can only subscribe once.")
341341
}
342342
}
343343

344344
log.debug("ConsumedThing '$title' subscribing to ${form.href}")
345345

346346
val formWithoutURITemplates = handleUriVariables(this, eventAffordance, form, options)
347347

348-
return client.subscribeResource(Resource(id, eventName, formWithoutURITemplates), ResourceType.EVENT)
348+
val subscription = InternalEventSubscription(this, eventName, client, form)
349+
subscribedEvents[eventName] = subscription
350+
351+
val flow = client.subscribeResource(Resource(id, eventName, formWithoutURITemplates), ResourceType.EVENT)
349352
.map { handleInteractionOutput(it, form, eventAffordance.data) }
353+
.onCompletion {
354+
error -> if (error != null) {
355+
log.warn("Error while processing observe property for ${eventAffordance.title}: ${error.message}", error)
356+
}
357+
subscription.stop(options)
358+
}
359+
360+
return flow
350361
}
351362

352363
@WithSpan(kind = SpanKind.CLIENT)
@@ -363,7 +374,7 @@ data class ConsumedThing(
363374
val (client, form) = getClientFor(eventAffordance.forms, Operation.SUBSCRIBE_EVENT, options)
364375

365376
if (subscribedEvents.containsKey(eventName)) {
366-
throw IllegalStateException("ConsumedThing '$title' already has a function subscribed to $eventName. You can only subscribe once.")
377+
throw ConsumedThingException("ConsumedThing '$title' already has a function subscribed to $eventName. You can only subscribe once.")
367378
}
368379

369380
log.debug("ConsumedThing '$title' subscribing to ${form.href}")
@@ -760,8 +771,8 @@ fun findFormIndexWithScoring(
760771

761772
// Compare the origins of the URLs
762773
try {
763-
val formUrl = URL(form.href)
764-
val refFormUrl = URL(refForm.href)
774+
val formUrl = URI(form.href).toURL()
775+
val refFormUrl = URI(refForm.href).toURL()
765776
if (formUrl.protocol == refFormUrl.protocol && formUrl.host == refFormUrl.host) {
766777
score += 1
767778
}

kotlin-wot/src/test/kotlin/thing/ConsumedThingTest.kt

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import ai.ancf.lmos.wot.Servient
55
import ai.ancf.lmos.wot.content.Content
66
import ai.ancf.lmos.wot.content.ContentManager
77
import ai.ancf.lmos.wot.content.JsonCodec
8+
import ai.ancf.lmos.wot.content.toJsonContent
89
import ai.ancf.lmos.wot.security.BasicSecurityScheme
910
import ai.ancf.lmos.wot.thing.action.ThingAction
1011
import ai.ancf.lmos.wot.thing.event.ThingEvent
@@ -15,14 +16,14 @@ import ai.anfc.lmos.wot.binding.ProtocolClient
1516
import ai.anfc.lmos.wot.binding.ProtocolClientFactory
1617
import ai.anfc.lmos.wot.binding.Resource
1718
import ai.anfc.lmos.wot.binding.ResourceType
19+
import app.cash.turbine.test
1820
import com.fasterxml.jackson.databind.node.TextNode
21+
import com.fasterxml.jackson.module.kotlin.treeToValue
1922
import io.mockk.*
2023
import kotlinx.coroutines.flow.emptyFlow
24+
import kotlinx.coroutines.flow.flow
2125
import kotlinx.coroutines.runBlocking
22-
import kotlin.test.BeforeTest
23-
import kotlin.test.Test
24-
import kotlin.test.assertEquals
25-
import kotlin.test.assertNotNull
26+
import kotlin.test.*
2627

2728
class ConsumedThingTest {
2829

@@ -158,6 +159,59 @@ class ConsumedThingTest {
158159
assertEquals(true, subscription.active)
159160
}
160161

162+
@Test
163+
fun `subscribe twice to the same event should fail`(): Unit = runBlocking {
164+
val listener = mockk<InteractionListener>(relaxed = true)
165+
coEvery { protocolClient.subscribeResource(any<Resource>(), any<ResourceType>()) } returns emptyFlow()
166+
167+
consumedThing.subscribeEvent("testEvent", listener)
168+
assertFailsWith<ConsumedThingException> { consumedThing.subscribeEvent("testEvent", listener) }
169+
}
170+
171+
@Test
172+
fun `test consumeEvent`(): Unit = runBlocking {
173+
coEvery { protocolClient.subscribeResource(any<Resource>(), any<ResourceType>()) } returns flow {
174+
emit("testValue".toJsonContent())
175+
}
176+
177+
consumedThing.consumeEvent("testEvent").test {
178+
val item = awaitItem().value()
179+
val value : String = JsonMapper.instance.treeToValue(item)
180+
assertEquals("testValue", value)
181+
awaitComplete()
182+
}
183+
184+
coVerify { protocolClient.unlinkResource(any<Resource>(), any<ResourceType>()) }
185+
}
186+
187+
@Test
188+
fun `test consumeEvent with error`(): Unit = runBlocking {
189+
coEvery { protocolClient.subscribeResource(any<Resource>(), any<ResourceType>()) } returns flow {
190+
throw Exception("test error")
191+
}
192+
193+
consumedThing.consumeEvent("testEvent").test {
194+
val error = awaitError() // Waits for the expected exception
195+
assertTrue(error is Exception)
196+
}
197+
198+
coVerify { protocolClient.unlinkResource(any<Resource>(), any<ResourceType>()) }
199+
}
200+
201+
@Test
202+
fun `consume twice to the same event should fail`(): Unit = runBlocking {
203+
coEvery { protocolClient.subscribeResource(any<Resource>(), any<ResourceType>()) } returns emptyFlow()
204+
205+
consumedThing.consumeEvent("testEvent")
206+
consumedThing.consumeEvent("testEvent").test {
207+
val error = awaitError() // Waits for the expected exception
208+
assertTrue(error is ConsumedThingException)
209+
}
210+
211+
coVerify(exactly = 0) { protocolClient.unlinkResource(any<Resource>(), any<ResourceType>()) }
212+
}
213+
214+
161215
@Test
162216
fun testEquals() {
163217
val thingDescription = ThingDescription(

lmos-kotlin-sdk-base/src/main/kotlin/sdk/agents/ConversationalAgent.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ fun String.toAgentRequest(): AgentRequest {
3030
)
3131
}
3232

33+
fun String.toAgentResult(): AgentResult {
34+
return AgentResult(
35+
messages = listOf(
36+
Message(
37+
role = "user",
38+
content = this
39+
)
40+
)
41+
)
42+
}
43+
3344
fun AgentResult.lastMessage(): String {
3445
return this.messages.last().content
3546
}

lmos-kotlin-sdk-client/src/main/kotlin/sdk/WotConversationalAgent.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class WotConversationalAgent private constructor(private val thing : ConsumedThi
6363
override suspend fun <T : Any> consumeEvent(eventName: String, clazz: KClass<T>): Flow<T> {
6464
return thing.consumeEvent(eventName).map { event ->
6565
try {
66+
log.info("Event:" + event.value())
6667
JsonMapper.instance.treeToValue(event.value(), clazz.java)
6768
} catch (e: Exception) {
6869
log.error("Failed to parse event", e)

0 commit comments

Comments
 (0)