Skip to content

Commit 5f0333e

Browse files
committed
Some more enhancements
1 parent 8a09eea commit 5f0333e

File tree

16 files changed

+429
-299
lines changed

16 files changed

+429
-299
lines changed

build.gradle.kts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ subprojects {
1818

1919
tasks.test {
2020
useJUnitPlatform()
21+
22+
// Configure proxy only if the required properties are set
23+
val proxyHost = System.getProperty("http.proxyHost")
24+
val proxyPort = System.getProperty("http.proxyPort")
25+
26+
if (!proxyHost.isNullOrEmpty() && !proxyPort.isNullOrEmpty()) {
27+
systemProperty("http.proxyHost", proxyHost)
28+
systemProperty("http.proxyPort", proxyPort)
29+
}
2130
}
2231

2332
kotlin {

kotlin-wot-binding-http/src/main/kotlin/http/HttpProtocolClient.kt

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import ai.anfc.lmos.wot.binding.ProtocolClient
1010
import ai.anfc.lmos.wot.binding.ProtocolClientException
1111
import http.HttpClientConfig
1212
import io.ktor.client.*
13+
import io.ktor.client.engine.*
1314
import io.ktor.client.engine.cio.*
1415
import io.ktor.client.plugins.*
1516
import io.ktor.client.request.*
@@ -27,7 +28,7 @@ import java.util.*
2728
*/
2829
class HttpProtocolClient(
2930
private val httpClientConfig: HttpClientConfig? = null,
30-
private val client: HttpClient = HttpClient(CIO)
31+
private val client: HttpClient = createHttpClient()
3132
) : ProtocolClient {
3233

3334
private var authorization: String? = null
@@ -135,4 +136,24 @@ class HttpProtocolClient(
135136
private const val HTTP_METHOD_NAME = "htv:methodName"
136137
private val LONG_POLLING_TIMEOUT = Duration.ofMinutes(60)
137138
}
139+
}
140+
141+
fun createHttpClient(): HttpClient {
142+
val proxyHost = System.getProperty("http.proxyHost")
143+
val proxyPort = System.getProperty("http.proxyPort")?.toIntOrNull()
144+
145+
return if (!proxyHost.isNullOrBlank() && proxyPort != null) {
146+
HttpClient(CIO) {
147+
val proxyUrl = URLBuilder().apply {
148+
protocol = URLProtocol.HTTP
149+
host = proxyHost
150+
port = proxyPort
151+
}.build()
152+
engine {
153+
proxy = ProxyBuilder.http(proxyUrl)
154+
}
155+
}
156+
} else {
157+
HttpClient(CIO)
158+
}
138159
}

kotlin-wot-binding-http/src/main/kotlin/http/HttpProtocolServer.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,7 @@ open class HttpProtocolServer(
113113
href = href,
114114
contentType = contentType,
115115
op = operations,
116-
optionalProperties = hashMapOf<String, String>().apply {
117-
httpMethod?.let { put(HTTP_METHOD_NAME, it) }
118-
}
116+
optionalProperties = mutableMapOf(HTTP_METHOD_NAME to (httpMethod ?: ""))
119117
)
120118
property.forms += form
121119
log.debug("Assign '{}' to Property '{}'", href, name)

kotlin-wot-binding-mqtt/src/main/kotlin/mqtt/MqttProtocolServer.kt

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@ import com.hivemq.client.mqtt.datatypes.MqttTopic
1414
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
1515
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client
1616
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
17-
import kotlinx.coroutines.*
17+
import kotlinx.coroutines.CoroutineExceptionHandler
18+
import kotlinx.coroutines.CoroutineScope
19+
import kotlinx.coroutines.Dispatchers
1820
import kotlinx.coroutines.future.await
21+
import kotlinx.coroutines.launch
1922
import org.slf4j.Logger
2023
import org.slf4j.LoggerFactory
2124

@@ -69,7 +72,6 @@ class MqttProtocolServer(
6972
override suspend fun destroy(thing: ExposedThing) {
7073
log.info("MqttServer stop exposing '{}' as unique '/{}/*'", thing.id, thing.id)
7174

72-
unexposeTD(thing)
7375
things.remove(thing.id)
7476
}
7577

@@ -155,7 +157,7 @@ class MqttProtocolServer(
155157
val form = Form(href= href,
156158
contentType = ContentManager.DEFAULT_MEDIA_TYPE,
157159
op = listOf(Operation.SUBSCRIBE_EVENT, Operation.UNSUBSCRIBE_EVENT),
158-
optionalProperties= mapOf("mqtt:qos" to 0, "mqtt:retain" to false)
160+
optionalProperties= mutableMapOf("mqtt:qos" to 0, "mqtt:retain" to false)
159161
)
160162
event.forms += (form)
161163
log.debug("Assigned '{}' to Event '{}'", href, name)
@@ -184,10 +186,14 @@ class MqttProtocolServer(
184186
.topicFilter(tdTopic)
185187
.qos(MqttQos.AT_LEAST_ONCE) // Use AT_LEAST_ONCE QoS level
186188
.callback { message ->
187-
val responseTopic = message.responseTopic.get()
188-
log.debug("Sending Thing Description of thing '{}' to topic '{}'", thing.id, responseTopic)
189-
CoroutineScope(Dispatchers.IO + exceptionHandler).launch {
190-
respondToTopic(content, responseTopic)
189+
try{
190+
val responseTopic = message.responseTopic.get()
191+
log.debug("Sending Thing Description of thing '{}' to topic '{}'", thing.id, responseTopic)
192+
CoroutineScope(Dispatchers.IO + exceptionHandler).launch {
193+
respondToTopic(content, responseTopic)
194+
}
195+
} catch (e: Exception) {
196+
log.warn("Failed to respond with Thing Description of thing '${thing.id}'", e)
191197
}
192198
}
193199
.send().await() // Sending the subscription request
@@ -204,21 +210,6 @@ class MqttProtocolServer(
204210
}
205211
}
206212

207-
private fun unexposeTD(thing: ExposedThing) = runBlocking {
208-
val topic = thing.id
209-
log.debug("Removing published Thing Description for topic '{}'", topic)
210-
211-
try {
212-
val publishMessage = Mqtt5Publish.builder()
213-
.topic(topic)
214-
.retain(true)
215-
.build()
216-
client.publish(publishMessage).await()
217-
} catch (e: Exception) {
218-
log.warn("Unable to remove thing description from topic '{}': {}", topic, e.message)
219-
}
220-
}
221-
222213
private suspend fun listenOnMqttMessages(thing: ExposedThing) {
223214
val sharedSubscriptionTopic = "\$share/server-group/${thing.id}/+/+"
224215

@@ -326,19 +317,20 @@ class MqttProtocolServer(
326317
thing.handleSubscribeEvent(eventName = eventName, listener = contentListener)
327318
}
328319

329-
private suspend fun respondToTopic(content: Content?, responseTopic: MqttTopic) {
320+
private suspend fun respondToTopic(content: Content, responseTopic: MqttTopic) {
330321
try {
331-
val payload = content?.body
322+
val payload = content.body
332323
val publishMessage = Mqtt5Publish.builder()
333324
.topic(responseTopic)
334325
.payload(payload)
335326
.qos(MqttQos.AT_LEAST_ONCE)
327+
.contentType(content.type)
336328
.build()
337329

338330
client.publish(publishMessage).await()
339-
log.info("Response sent to topic '{}'", responseTopic)
331+
log.debug("Response sent to topic '{}'", responseTopic)
340332
} catch (e: Exception) {
341-
log.warn("Failed to send response to topic '{}': {}", responseTopic, e.message)
333+
log.warn("Failed to send response to topic '${responseTopic}'", e)
342334
}
343335
}
344336
}

kotlin-wot-binding-mqtt/src/test/kotlin/integration/MqttProtocolServerTest.kt

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import app.cash.turbine.test
1515
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
1616
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client
1717
import kotlinx.coroutines.future.await
18+
import kotlinx.coroutines.runBlocking
1819
import kotlinx.coroutines.test.runTest
1920
import org.junit.jupiter.api.AfterAll
2021
import org.junit.jupiter.api.BeforeAll
@@ -24,6 +25,7 @@ import java.util.concurrent.CountDownLatch
2425
import java.util.concurrent.TimeUnit
2526
import kotlin.test.Test
2627
import kotlin.test.assertEquals
28+
import kotlin.test.assertNotNull
2729
import kotlin.test.assertTrue
2830
import kotlin.time.Duration.Companion.seconds
2931

@@ -121,26 +123,40 @@ class MqttProtocolServerTest {
121123
}
122124

123125
@Test
124-
fun `expose should publish Thing Description`() = runTest {
125-
val topic = "${brokerUrl}/things/${exposedThing.id}"
126+
fun `expose should publish Thing Description and receive a response`(): Unit = runBlocking {
127+
val requestTopic = exposedThing.id
128+
val responseTopic = "${exposedThing.id}/td"
129+
val messagePayload = "TestMessage"
126130

127-
val lock = CountDownLatch(1);
131+
val lock = CountDownLatch(1)
132+
var responsePayload: ByteArray? = null
128133

129-
// Subscribe to verify Thing Description publishing
134+
// Subscribe to the response topic
130135
mqttClient.subscribeWith()
131-
.topicFilter(topic)
136+
.topicFilter(responseTopic)
132137
.callback { publish ->
133-
val payloadString = publish.payloadAsBytes.decodeToString()
134-
assertTrue(payloadString.contains("thingId"))
135-
lock.countDown();
138+
responsePayload = publish.payloadAsBytes
139+
lock.countDown()
136140
}
137-
.send().await()
138-
139-
// Wait for the events to be handled, with a timeout.
141+
.send()
142+
.await()
143+
144+
// Publish the message to the request topic
145+
mqttClient.publishWith()
146+
.topic(requestTopic)
147+
.payload(messagePayload.toByteArray())
148+
.responseTopic(responseTopic)
149+
.send()
150+
.await()
151+
152+
// Wait for the response, with a timeout.
140153
val completedInTime = lock.await(2000, TimeUnit.MILLISECONDS)
141154

142-
// Assert that the events were handled within the timeout period.
143-
assertTrue(completedInTime, "Expected events were not received within the timeout period.")
155+
// Assert that the response was received in time and matches expectations
156+
assertTrue(completedInTime, "Expected response was not received within the timeout period.")
157+
assertNotNull(responsePayload, "Response payload is null.")
158+
159+
//assertEquals("test", ThingDescription.fromBytes(responsePayload!!).id)
144160
}
145161

146162
@Test

kotlin-wot-integration-tests/src/main/kotlin/integration/AgentConfiguration.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ai.ancf.lmos.wot.integration
22

33
import ai.ancf.lmos.arc.spring.Agents
4+
import ai.ancf.lmos.arc.spring.Functions
45
import org.springframework.context.annotation.Bean
56
import org.springframework.context.annotation.Configuration
67

@@ -14,4 +15,15 @@ class AgentConfiguration {
1415
prompt { "you are a helpful weather agent." }
1516
model = { "GPT-4o" }
1617
}
18+
19+
20+
@Bean
21+
fun myFunction(function: Functions) = function(
22+
name = "g",
23+
description = "Returns real-time weather information for any location",
24+
) {
25+
"""
26+
The weather is good in Berlin. It is 20 degrees celsius.
27+
"""
28+
}
1729
}

kotlin-wot-integration-tests/src/test/kotlin/integration/AgentBuilderTest.kt

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
1-
package integration
2-
1+
package ai.ancf.lmos.wot.integration
32

43
import ai.ancf.lmos.arc.agents.AgentProvider
54
import ai.ancf.lmos.wot.Servient
65
import ai.ancf.lmos.wot.Wot
76
import ai.ancf.lmos.wot.binding.http.HttpProtocolClientFactory
87
import ai.ancf.lmos.wot.binding.http.HttpProtocolServer
9-
import ai.ancf.lmos.wot.integration.ThingAgent
108
import ai.ancf.lmos.wot.reflection.ExposedThingBuilder.createExposedThing
11-
import ai.ancf.lmos.wot.thing.schema.WoTExposedThing
129
import ai.ancf.lmos.wot.thing.schema.toInteractionInputValue
1310
import io.mockk.mockk
1411
import kotlinx.coroutines.test.runTest
@@ -31,13 +28,9 @@ class AgentBuilderTest {
3128
val exposedThing = createExposedThing(wot, ThingAgent(mockk<AgentProvider>()), ThingAgent::class)
3229
if(exposedThing != null){
3330

34-
35-
servient.addThing(exposedThing as WoTExposedThing)
31+
servient.addThing(exposedThing)
3632
servient.expose("agent")
3733

38-
39-
println("Thing:${exposedThing.toJson()}")
40-
4134
val httpAgentTD = wot
4235
.requestThingDescription("http://localhost:8080/agent")
4336
val httpAgent = wot.consume(httpAgentTD)

kotlin-wot-integration-tests/src/test/kotlin/integration/AgentsTest.kt

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,40 +8,69 @@ import ai.ancf.lmos.wot.binding.mqtt.MqttClientConfig
88
import ai.ancf.lmos.wot.binding.mqtt.MqttProtocolClientFactory
99
import ai.ancf.lmos.wot.thing.schema.toInteractionInputValue
1010
import kotlinx.coroutines.test.runTest
11+
import org.junit.jupiter.api.AfterAll
12+
import org.junit.jupiter.api.BeforeAll
13+
import org.testcontainers.containers.GenericContainer
14+
import org.testcontainers.utility.DockerImageName
15+
import kotlin.test.AfterTest
16+
import kotlin.test.BeforeTest
1117
import kotlin.test.Test
1218

1319
class AgentsTest {
1420

15-
@Test
16-
fun `Should talk to mqtt and http agent`() = runTest {
21+
companion object {
22+
private lateinit var hiveMqContainer: GenericContainer<*>
1723

24+
@BeforeAll
25+
@JvmStatic
26+
fun setUp() = runTest {
27+
hiveMqContainer = GenericContainer(DockerImageName.parse("hivemq/hivemq-ce:latest"))
28+
.withExposedPorts(1883)
29+
hiveMqContainer.start()
30+
}
31+
32+
@AfterAll
33+
@JvmStatic
34+
fun tearDown() {
35+
hiveMqContainer.stop()
36+
}
37+
}
38+
39+
private lateinit var servient: Servient
40+
41+
@BeforeTest
42+
fun setup() = runTest {
1843
val mqttConfig = MqttClientConfig("localhost", 54884, "wotClient")
19-
val servient = Servient(
44+
servient = Servient(
2045
clientFactories = listOf(
2146
HttpProtocolClientFactory(),
22-
MqttProtocolClientFactory(mqttConfig)),
47+
MqttProtocolClientFactory(mqttConfig)
48+
)
2349
)
24-
servient.start()
50+
servient.start() // Start the Servient before each test
51+
}
2552

26-
val wot = Wot.create(servient)
53+
@AfterTest
54+
fun teardown() = runTest {
55+
servient.shutdown() // Ensure the Servient is stopped after each test
56+
}
2757

28-
val httpAgentTD = wot
29-
.requestThingDescription("http://localhost:8080/agent")
58+
@Test
59+
fun `Should talk to mqtt and http agent`() = runTest {
60+
val wot = Wot.create(servient)
3061

62+
// Test HTTP agent
63+
val httpAgentTD = wot.requestThingDescription("http://localhost:8080/agent")
3164
val httpAgent = wot.consume(httpAgentTD)
32-
var output = httpAgent.invokeAction("ask",
33-
"What is Paris?".toInteractionInputValue())
65+
var output = httpAgent.invokeAction("ask", "What is Paris?".toInteractionInputValue())
3466
println(output.value())
3567

36-
val mqttAgentTD = wot
37-
.requestThingDescription("mqtt://localhost:54884/agent")
38-
68+
// Test MQTT agent
69+
val mqttAgentTD = wot.requestThingDescription("mqtt://localhost:54884/agent")
3970
println(JsonMapper.instance.writeValueAsString(mqttAgentTD))
4071

4172
val mqttAgent = wot.consume(mqttAgentTD)
42-
output = mqttAgent.invokeAction("ask",
43-
"What is London?".toInteractionInputValue())
73+
output = mqttAgent.invokeAction("ask", "What is London?".toInteractionInputValue())
4474
println(output.value())
45-
4675
}
4776
}

0 commit comments

Comments
 (0)