From 68823067a63746c90adbe9818cadbf89b4b254ef Mon Sep 17 00:00:00 2001 From: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com> Date: Thu, 6 Nov 2025 20:26:31 +0200 Subject: [PATCH] #355 Handle coroutine cancellation and exceptions in server transports --- .../kotlin/sdk/server/KtorServer.kt | 3 +++ .../kotlin/sdk/server/SSEServerTransport.kt | 13 +++++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt index 06e65284..8ad4c2f4 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt @@ -19,6 +19,7 @@ import kotlinx.atomicfu.atomic import kotlinx.atomicfu.update import kotlinx.collections.immutable.PersistentMap import kotlinx.collections.immutable.toPersistentMap +import kotlinx.coroutines.awaitCancellation private val logger = KotlinLogging.logger {} @@ -91,6 +92,8 @@ internal suspend fun ServerSSESession.mcpSseEndpoint( server.createSession(transport) logger.debug { "Server connected to transport for sessionId: ${transport.sessionId}" } + + awaitCancellation() } internal fun ServerSSESession.mcpSseTransport( diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/SSEServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/SSEServerTransport.kt index 8b5c6be2..64e32fed 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/SSEServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/SSEServerTransport.kt @@ -11,9 +11,11 @@ import io.ktor.server.sse.ServerSSESession import io.modelcontextprotocol.kotlin.sdk.JSONRPCMessage import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport import io.modelcontextprotocol.kotlin.sdk.shared.McpJson +import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.job import kotlin.concurrent.atomics.AtomicBoolean import kotlin.concurrent.atomics.ExperimentalAtomicApi +import kotlin.coroutines.cancellation.CancellationException import kotlin.uuid.ExperimentalUuidApi import kotlin.uuid.Uuid @@ -53,10 +55,13 @@ public class SseServerTransport(private val endpoint: String, private val sessio data = "${endpoint.encodeURLPath()}?$SESSION_ID_PARAM=$sessionId", ) - try { - session.coroutineContext.job.join() - } finally { - _onClose.invoke() + @OptIn(InternalCoroutinesApi::class) + session.coroutineContext.job.invokeOnCompletion { + if (it != null && it !is CancellationException) { + _onError.invoke(it) + } else { + _onClose.invoke() + } } }