Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.collections.immutable.PersistentMap
import kotlinx.collections.immutable.toPersistentMap
import kotlinx.coroutines.awaitCancellation

private val logger = KotlinLogging.logger {}

Expand Down Expand Up @@ -91,6 +92,8 @@ internal suspend fun ServerSSESession.mcpSseEndpoint(
server.createSession(transport)

logger.debug { "Server connected to transport for sessionId: ${transport.sessionId}" }

awaitCancellation()
}

internal fun ServerSSESession.mcpSseTransport(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import io.ktor.server.sse.ServerSSESession
import io.modelcontextprotocol.kotlin.sdk.JSONRPCMessage
import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport
import io.modelcontextprotocol.kotlin.sdk.shared.McpJson
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.job
import kotlin.concurrent.atomics.AtomicBoolean
import kotlin.concurrent.atomics.ExperimentalAtomicApi
import kotlin.coroutines.cancellation.CancellationException
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid

Expand Down Expand Up @@ -53,10 +55,13 @@ public class SseServerTransport(private val endpoint: String, private val sessio
data = "${endpoint.encodeURLPath()}?$SESSION_ID_PARAM=$sessionId",
)

try {
session.coroutineContext.job.join()
} finally {
_onClose.invoke()
@OptIn(InternalCoroutinesApi::class)
session.coroutineContext.job.invokeOnCompletion {
if (it != null && it !is CancellationException) {
_onError.invoke(it)
} else {
_onClose.invoke()
}
}
}

Expand Down
Loading