Skip to content

Commit 84045d4

Browse files
committed
resolves #1961 [web connector] add streaming support for rich message
Signed-off-by: Julien Buret <jburet@gmail.com>
1 parent d68db9c commit 84045d4

File tree

6 files changed

+72
-20
lines changed

6 files changed

+72
-20
lines changed

bot/api/service/src/main/kotlin/BotApiClientController.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,8 @@ internal class BotApiClientController(
130130
}
131131
} else {
132132
val holder = setHolder(request.requestId)
133-
sendWithSse(request, lastConfiguration?.version, sendResponse)
134133
holder.waitForResponse(sendResponse)
134+
sendWithSse(request, lastConfiguration?.version, sendResponse)
135135
}
136136
}
137137
}
@@ -144,8 +144,8 @@ internal class BotApiClientController(
144144
if (pushHandler != null) {
145145
val holder = setHolder(request.requestId)
146146
logger.debug { "send request ${request.requestId}" }
147-
pushHandler.invoke(mapper.writeValueAsString(request))
148147
holder.waitForResponse(sendResponse)
148+
pushHandler.invoke(mapper.writeValueAsString(request))
149149
} else {
150150
if (request.configuration != true) {
151151
error("no websocket handler for $apiKey and no webhook reachable")

bot/api/service/src/main/kotlin/WSHolder.kt

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,19 +52,25 @@ internal class WSHolder {
5252
private var latch: CountDownLatch = CountDownLatch(1)
5353

5454
fun receive(response: ResponseData) {
55+
logger.debug { "add to holder: $response" }
5556
this.response.add(response)
56-
latch.countDown()
57+
synchronized(response) {
58+
latch.countDown()
59+
}
5760
}
5861

59-
@Synchronized
6062
fun wait(): List<ResponseData> {
63+
logger.debug { "start await" }
6164
latch.await(timeoutInSeconds, SECONDS)
62-
val r = response.sortedBy { it.botResponse?.context?.date ?: Instant.now() }
63-
logger.debug { r }
64-
if (r.lastOrNull()?.botResponse?.context?.lastResponse == false) {
65-
latch = CountDownLatch(1)
65+
synchronized(response) {
66+
val r = response.sortedBy { it.botResponse?.context?.date ?: Instant.now() }
67+
logger.debug { "responses: $r" }
68+
if (r.lastOrNull()?.botResponse?.context?.lastResponse == false) {
69+
latch = CountDownLatch(1)
70+
}
71+
72+
return r.filterNot { seen.contains(it) }.apply { seen.addAll(this) }
6673
}
67-
return r.filterNot { seen.contains(it) }.apply { seen.addAll(this) }
6874
}
6975

7076
fun waitForResponse(sendResponse: (ResponseData?) -> Unit = {}): Unit =

bot/connector-web/src/main/kotlin/WebConnector.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ class WebConnector internal constructor(
258258
streamedResponse = (event as? Action)?.metadata?.streamedResponse == true,
259259
)
260260
if (sseEnabled) {
261-
// Uniquely identify each response, so they can be reconciliated between SSE and POST
261+
// Uniquely identify each response, so they can be reconciled between SSE and POST
262262
callback.addMetadata(MetadataEvent.responseId(UUID.randomUUID(), applicationId))
263263
}
264264
controller.handle(

bot/connector-web/src/main/kotlin/WebConnectorCallback.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ internal class WebConnectorCallback(
5050

5151
fun addMetadata(metadata: MetadataEvent) {
5252
this.metadata[metadata.type] = metadata.value
53-
if (metadata.isEndStreamMetadata()) {
53+
if (metadata.isStreamMetadata()) {
5454
WebRequestInfosByEvent.get(eventId)?.clearStreamedResponse()
5555
}
5656
}
@@ -60,7 +60,7 @@ internal class WebConnectorCallback(
6060
actions.mapNotNull { a ->
6161
val action =
6262
if (a is SendSentence && metadata.hasStreamMetadata() && mergeStreamResponse) {
63-
a.withText(WebRequestInfosByEvent.getOrPut(eventId).addStreamedResponse(a.stringText))
63+
WebRequestInfosByEvent.getOrPut(eventId).addStreamedResponse(a, webConnectorType)
6464
} else {
6565
a
6666
}

bot/connector-web/src/main/kotlin/WebRequestInfos.kt

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616

1717
package ai.tock.bot.connector.web
1818

19+
import ai.tock.bot.connector.ConnectorType
20+
import ai.tock.bot.connector.web.send.WebCarousel
21+
import ai.tock.bot.engine.action.SendSentence
1922
import com.google.common.cache.CacheBuilder
2023
import io.vertx.core.MultiMap
2124
import io.vertx.core.http.Cookie
2225
import io.vertx.core.http.HttpServerRequest
26+
import mu.KotlinLogging
2327
import java.util.concurrent.TimeUnit
2428

2529
object WebRequestInfosByEvent {
@@ -40,7 +44,8 @@ object WebRequestInfosByEvent {
4044
data class WebRequestInfos(
4145
private val headers: MultiMap = MultiMap.caseInsensitiveMultiMap(),
4246
private val cookies: Set<Cookie> = emptySet(),
43-
private val streamedResponse: StringBuilder = StringBuilder(),
47+
@Volatile
48+
private var streamedResponse: SendSentence? = null,
4449
) {
4550
internal constructor(request: HttpServerRequest) : this(request.headers(), request.cookies())
4651

@@ -50,14 +55,53 @@ data class WebRequestInfos(
5055

5156
fun firstCookie(name: String): String? = cookies.firstOrNull { it.name == name }?.value
5257

53-
internal fun addStreamedResponse(response: String?): String {
54-
if (response != null) {
55-
streamedResponse.append(response)
56-
}
57-
return streamedResponse.toString()
58+
internal fun addStreamedResponse(
59+
response: SendSentence,
60+
connectorType: ConnectorType,
61+
): SendSentence {
62+
val s =
63+
streamedResponse?.run {
64+
val originalMessage = message(connectorType) as? WebMessage
65+
if (originalMessage != null) {
66+
val newMessage = response.message(connectorType) as? WebMessage
67+
if (newMessage == null) {
68+
logger.warn { "no custom message in streamed message - but the previous message is custom - ignore" }
69+
return this
70+
}
71+
72+
changeConnectorMessage(
73+
WebMessage(
74+
text = (originalMessage.text ?: "") + (newMessage.text ?: ""),
75+
buttons = originalMessage.buttons + newMessage.buttons,
76+
card =
77+
originalMessage.card?.let {
78+
it.copy(
79+
title = (it.title?.toString() ?: "") + (newMessage.card?.title?.toString() ?: ""),
80+
subTitle = (it.subTitle?.toString() ?: "") + (newMessage.card?.subTitle?.toString() ?: ""),
81+
file = newMessage.card?.file ?: it.file,
82+
buttons = it.buttons + (newMessage.card?.buttons ?: emptyList()),
83+
)
84+
} ?: newMessage.card,
85+
carousel = ((originalMessage.carousel?.cards ?: emptyList()) + (newMessage.carousel?.cards ?: emptyList())).takeUnless { it.isEmpty() }?.let { WebCarousel(it) },
86+
widget = newMessage.widget ?: originalMessage.widget,
87+
image = newMessage.image ?: originalMessage.image,
88+
version = newMessage.version,
89+
deepLink = newMessage.deepLink ?: originalMessage.deepLink,
90+
footnotes = originalMessage.footnotes + newMessage.footnotes,
91+
),
92+
)
93+
this
94+
} else {
95+
withText((text?.toString() ?: "") + (response.text?.toString() ?: ""))
96+
}
97+
} ?: response
98+
streamedResponse = s
99+
return s
58100
}
59101

60102
internal fun clearStreamedResponse() {
61-
streamedResponse.clear()
103+
streamedResponse = null
62104
}
63105
}
106+
107+
private val logger = KotlinLogging.logger {}

bot/engine/src/main/kotlin/engine/event/MetadataEvent.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ class MetadataEvent(val type: String, val value: String, applicationId: String)
4040
const val STREAM_RESPONSE_METADATA = "TOCK_STREAM_RESPONSE"
4141
}
4242

43-
fun isEndStreamMetadata(): Boolean = type == STREAM_RESPONSE_METADATA && value != "true"
43+
fun isEndStreamMetadata(): Boolean = isStreamMetadata() && value != "true"
44+
45+
fun isStreamMetadata(): Boolean = type == STREAM_RESPONSE_METADATA
4446
}
4547

4648
fun Map<String, String>.hasStreamMetadata(): Boolean = this[STREAM_RESPONSE_METADATA] == "true"

0 commit comments

Comments
 (0)