Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions bot/connector-web-sse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Tock Bot Web Connector SSE

This module provides [Server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) capabilities for the Tock Web Connector,
allowing web clients to establish persistent connections and receive bot responses in real-time.
Works with multi-instance deployments, with a message queue implementation backed by TOCK's MongoDB database.

If a message is sent while the corresponding user is offline, it will be kept in the MongoDB database and delivered upon reconnection.

## Connection Flow

1. **Connection Establishment**: Client connects to SSE endpoint with user identification
- user identification may be handled by a custom `WebSecurityHandler`, otherwise defaults to query param `userId`
2. **Channel Registration**: Server creates an SSE channel and registers callback for message delivery
3. **Message Routing**:
- Local delivery if user is connected to current instance
- MongoDB persistence if user is offline or on different instance
4. **Change Stream Synchronization**: Other instances receive messages via MongoDB change streams
5. **Missed Events**: Upon reconnection, undelivered messages are sent to the client

## Usage Samples

### Setting Up the Endpoint

```kotlin
val endpoint = SseEndpoint()
val router = Router.router(vertx)

endpoint.configureRoute(
router = router,
path = "/api/bot/sse",
connectorId = "my-web-connector",
webSecurityHandler = WebSecurityCookiesHandler(), // or any other implementation
)
```

### Client Connection

Connect to the SSE endpoint from a browser:

```javascript
const eventSource = new EventSource('/api/bot/sse');

eventSource.onmessage = (event) => {
const response = JSON.parse(event.data);
console.log('Bot response:', response);
};
```

### Sending Responses

```kotlin
endpoint.sendResponse(
connectorId = "my-web-connector",
recipientId = userId,
response = WebConnectorResponseContent(responses = listOf(WebMessageContent(text = "Hello, World!")))
)
```
91 changes: 91 additions & 0 deletions bot/connector-web-sse/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (C) 2017/2025 SNCF Connect & Tech
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>tock-bot</artifactId>
<groupId>ai.tock</groupId>
<version>25.10.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>tock-bot-connector-web-sse</artifactId>
<name>Tock Bot Web Connector SSE</name>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>org.litote.kmongo</groupId>
<artifactId>kmongo</artifactId>
</dependency>
<dependency>
<groupId>org.litote.kmongo</groupId>
<artifactId>kmongo-async</artifactId>
</dependency>
<dependency>
<groupId>ai.tock</groupId>
<artifactId>tock-bot-connector-web-model</artifactId>
</dependency>
<dependency>
<groupId>ai.tock</groupId>
<artifactId>tock-shared</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>

<dependency>
<groupId>ai.tock</groupId>
<artifactId>tock-shared</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.mockk</groupId>
<artifactId>mockk-jvm</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
<scope>test</scope>
</dependency>

</dependencies>


<build>
<sourceDirectory>src/main/kotlin</sourceDirectory>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* limitations under the License.
*/

package ai.tock.bot.connector.web
package ai.tock.bot.connector.web.sse

import ai.tock.bot.connector.web.channel.ChannelDAO
import ai.tock.bot.connector.web.channel.ChannelMongoDAO
import ai.tock.bot.connector.web.sse.channel.ChannelDAO
import ai.tock.bot.connector.web.sse.channel.ChannelMongoDAO
import ai.tock.shared.service.BotAdditionalModulesService
import com.github.salomonbrys.kodein.Kodein
import com.github.salomonbrys.kodein.bind
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (C) 2017/2025 SNCF Connect & Tech
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.tock.bot.connector.web.sse

import ai.tock.bot.connector.web.WebConnectorResponseContract
import ai.tock.bot.connector.web.sse.channel.SseChannels
import ai.tock.shared.injector
import ai.tock.shared.jackson.mapper
import ai.tock.shared.provide
import ai.tock.shared.security.auth.spi.TOCK_USER_ID
import ai.tock.shared.security.auth.spi.WebSecurityHandler
import ai.tock.shared.vertx.sendSseMessage
import ai.tock.shared.vertx.setupSSE
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer
import io.vertx.core.Future
import io.vertx.core.http.HttpServerResponse
import io.vertx.ext.web.Router
import mu.KotlinLogging

class SseEndpoint internal constructor(
private val responseSerializer: ObjectMapper,
private val channels: SseChannels,
) {
companion object {
const val USER_ID_QUERY_PARAM = "userId"

val webMapper: ObjectMapper =
mapper.copy().registerModules(
SimpleModule().apply {
// fallback for serializing CharSequence
addSerializer(CharSequence::class.java, ToStringSerializer())
},
)
}

private val logger = KotlinLogging.logger {}

constructor(responseSerializer: ObjectMapper = webMapper) : this(responseSerializer, SseChannels(injector.provide()))

fun configureRoute(
router: Router,
path: String,
connectorId: String,
webSecurityHandler: WebSecurityHandler,
) {
router.get(path)
.handler(webSecurityHandler)
.handler { context ->
try {
val userId: String? = context.get<String>(TOCK_USER_ID) ?: context.queryParams()[USER_ID_QUERY_PARAM]
if (userId != null) {
channels.register(context.response(), connectorId, userId)
} else {
context.fail(400, NullPointerException("missing userId in request"))
}
} catch (t: Throwable) {
context.fail(t)
}
}
}

fun sendResponse(
connectorId: String,
recipientId: String,
response: WebConnectorResponseContract,
): Future<Unit> = channels.send(appId = connectorId, recipientId, response)

private fun SseChannels.register(
response: HttpServerResponse,
connectorId: String,
userId: String,
) {
initListeners()
val channel =
register(appId = connectorId, userId) { msg ->
logger.debug { "send response from channel: $msg" }
response.sendSseMessage(responseSerializer.writeValueAsString(msg))
}
response.setupSSE { unregister(channel) }
sendMissedEvents(channel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.tock.bot.connector.web.channel
package ai.tock.bot.connector.web.sse.channel

import ai.tock.bot.connector.web.WebConnectorResponse
import ai.tock.bot.connector.web.WebConnectorResponseContract
import io.vertx.core.Future

internal typealias ChannelCallback = (webConnectorResponse: WebConnectorResponse) -> Future<*>
internal typealias ChannelCallback = (webConnectorResponse: WebConnectorResponseContract) -> Future<*>
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.tock.bot.connector.web.channel
package ai.tock.bot.connector.web.sse.channel

internal interface ChannelDAO {
fun listenChanges(listener: ChannelEvent.Handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.tock.bot.connector.web.channel
package ai.tock.bot.connector.web.sse.channel

import ai.tock.bot.connector.web.WebConnectorResponse
import ai.tock.bot.connector.web.WebConnectorResponseContent
import ai.tock.bot.connector.web.WebConnectorResponseContract
import com.fasterxml.jackson.annotation.JsonValue
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.vertx.core.Future
import org.litote.kmongo.Id
import org.litote.kmongo.newId
Expand All @@ -28,7 +30,8 @@ import java.time.Instant
internal data class ChannelEvent(
val appId: String = "unknown",
val recipientId: String,
val webConnectorResponse: WebConnectorResponse,
@get:JsonDeserialize(`as` = WebConnectorResponseContent::class)
val webConnectorResponse: WebConnectorResponseContract,
val status: Status = Status.ENQUEUED,
val enqueuedAt: Instant = Instant.now(),
val _id: Id<ChannelEvent> = newId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.tock.bot.connector.web.channel
package ai.tock.bot.connector.web.sse.channel

import ai.tock.shared.TOCK_BOT_DATABASE
import ai.tock.shared.ensureIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.tock.bot.connector.web.channel
package ai.tock.bot.connector.web.sse.channel

import java.util.UUID

internal data class Channel(val appId: String, val uuid: UUID, val userId: String, val onAction: ChannelCallback)
internal data class SseChannel(val appId: String, val uuid: UUID, val userId: String, val onAction: ChannelCallback)
Loading