Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions bot/connector-iadvize/src/main/kotlin/DeferredConnector.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright (C) 2017/2025 SNCF Connect & Tech
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.tock.bot.connector.iadvize

import ai.tock.bot.connector.Connector
import ai.tock.bot.connector.ConnectorCallback

/**
* Interface for connectors that support deferred messaging mode.
*
* Deferred mode allows sending an immediate acknowledgement (HTTP 200) to the client,
* while processing the request asynchronously and sending responses via a different
* channel (e.g., GraphQL API for iAdvize).
*
* ## Lifecycle
*
* 1. `acknowledge(callback)` - Sends immediate HTTP 200 response
* 2. `beginDeferred(callback)` - Initializes the coordinator for the conversation
* 3. Messages are collected during handler execution via standard `send()`
* 4. `endDeferred(callback)` - Flushes all collected messages
*
* ## Thread Safety
*
* The deferred coordinator is stored in the callback (per conversation),
* ensuring thread safety for concurrent conversations.
*
* ## Usage
*
* Connectors implementing this interface will automatically handle deferred mode
* when appropriate. The standard `send()` method detects `lastAnswer=true` to
* trigger the flush.
*
* ```kotlin
* class MyConnector : ConnectorBase(...), DeferredConnector {
*
* override fun handleRequest(...) {
* val callback = createCallback(...)
*
* if (canUseDeferred) {
* acknowledge(callback)
* beginDeferred(callback, metadata)
*
* executor.executeBlocking {
* controller.handle(event, ConnectorData(callback, metadata))
* }
* } else {
* // Standard synchronous handling
* controller.handle(event, ConnectorData(callback, metadata))
* }
* }
*
* override fun send(event, callback, delayInMs) {
* if (isDeferredMode(callback)) {
* collectDeferred(callback, event, delayInMs)
* if (event.metadata.lastAnswer) {
* endDeferred(callback)
* }
* } else {
* // Standard send
* }
* }
* }
* ```
*/
interface DeferredConnector : Connector {
/**
* Check if the callback is in deferred mode.
*
* @param callback The connector callback
* @return true if deferred mode is active
*/
fun isDeferredMode(callback: ConnectorCallback): Boolean

/**
* Send an immediate acknowledgement (HTTP 200) to the client.
* This should be called before starting async processing.
*
* @param callback The connector callback containing the HTTP response
*/
fun acknowledge(callback: ConnectorCallback)

/**
* Initialize deferred mode for this conversation.
* Creates a coordinator and stores it in the callback.
*
* @param callback The connector callback to store the coordinator
* @param parameters Metadata for the conversation (conversation ID, etc.)
*/
fun beginDeferred(
callback: ConnectorCallback,
parameters: Map<String, String>,
)

/**
* Flush all collected messages and end deferred mode.
* This is typically called when `lastAnswer=true` is detected,
* or when force-flushing due to timeout/error.
*
* @param callback The connector callback containing the coordinator
*/
fun endDeferred(callback: ConnectorCallback)

/**
* Force end deferred mode with an optional error message.
* Used for timeout/error scenarios.
*
* @param callback The connector callback containing the coordinator
* @param reason Reason for force ending (for logging)
* @param sendErrorMessage Whether to send a default error message to the user
*/
fun forceEndDeferred(
callback: ConnectorCallback,
reason: String,
sendErrorMessage: Boolean = true,
)
}
163 changes: 163 additions & 0 deletions bot/connector-iadvize/src/main/kotlin/DeferredMessageCoordinator.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright (C) 2017/2025 SNCF Connect & Tech
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.tock.bot.connector.iadvize

import ai.tock.bot.connector.iadvize.IadvizeConnectorCallback.ActionWithDelay
import mu.KotlinLogging
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean

/**
* Coordinates deferred message handling for a single iAdvize conversation.
*
* This class solves the race condition problem by:
* 1. Being stored in the callback (per conversation, not shared)
* 2. Using thread-safe collections (ConcurrentLinkedQueue)
* 3. Using atomic operations for state management
*
* Lifecycle:
* 1. Created in handleRequest() - sends HTTP 200 immediately via start()
* 2. Messages collected via collect() during handler execution
* 3. Flushed via end() when lastAnswer=true is detected
* 4. Or force-flushed via forceEnd() on timeout/error
*/
class DeferredMessageCoordinator(
private val callback: IadvizeConnectorCallback,
private val parameters: Map<String, String>,
) {
private val logger = KotlinLogging.logger {}

// Thread-safe queue for collected messages
private val messages = ConcurrentLinkedQueue<ActionWithDelay>()

// Atomic flags for state management
private val started = AtomicBoolean(false)
private val ended = AtomicBoolean(false)

/**
* Start the deferred session.
* Sends the HTTP response immediately to acknowledge the request.
* Can only be called once (idempotent).
*/
fun start() {
if (started.compareAndSet(false, true)) {
callback.answerWithResponse()
logger.info { "Deferred session started" }
} else {
logger.warn { "Deferred session already started - ignoring" }
}
}

/**
* Collect a message for later sending.
* Thread-safe - can be called from multiple threads.
*
* @param action The action with delay to collect
*/
fun collect(action: ActionWithDelay) {
if (!ended.get()) {
messages.add(action)
logger.debug { "Deferred message collected. Queue size: ${messages.size}" }
} else {
logger.warn { "Cannot collect message - deferred session already ended" }
}
}

/**
* End the deferred session and flush all collected messages.
* Thread-safe - only the first caller will execute the flush.
*
* @param sendAction Function to send each collected action
* @return true if this call performed the end, false if already ended
*/
fun end(sendAction: (ActionWithDelay) -> Unit): Boolean {
if (!ended.compareAndSet(false, true)) {
logger.debug { "Deferred session already ended - ignoring duplicate end()" }
return false
}

logger.info { "Ending deferred session. Flushing ${messages.size} message(s)" }

// Atomically drain and send all messages
var action = messages.poll()
while (action != null) {
sendAction(action)
action = messages.poll()
}

logger.debug { "Deferred session ended" }
return true
}

/**
* Force end the deferred session (for timeout/error scenarios).
* Flushes collected messages and optionally sends an error message.
*
* @param sendAction Function to send each collected action
* @param errorAction Optional error action to send after collected messages
* @param logMessage Optional message to log for debugging
* @return true if this call performed the end, false if already ended
*/
fun forceEnd(
sendAction: (ActionWithDelay) -> Unit,
errorAction: ActionWithDelay? = null,
logMessage: String? = null,
): Boolean {
if (!ended.compareAndSet(false, true)) {
logger.debug { "Deferred session already ended - ignoring forceEnd()" }
return false
}

logger.warn { "Force ending deferred session (timeout/error): ${logMessage ?: "no details"}. Flushing ${messages.size} collected message(s)" }

// Drain and send all collected messages
var action = messages.poll()
while (action != null) {
sendAction(action)
action = messages.poll()
}

// Send error message if provided
errorAction?.let {
logger.info { "Sending error message to user" }
sendAction(it)
}

logger.debug { "Deferred session force-ended" }
return true
}

/**
* Check if the session has started.
*/
fun hasStarted(): Boolean = started.get()

/**
* Check if the session has ended.
*/
fun hasEnded(): Boolean = ended.get()

/**
* Get the number of messages currently in the queue.
*/
fun messageCount(): Int = messages.size

/**
* Get the parameters associated with this coordinator.
*/
fun getParameters(): Map<String, String> = parameters
}
Loading