Skip to content

Commit de5ef44

Browse files
authored
Modify SSE config for server (#23)
* Fixed compilation * Added `mcp` Ktor server route alongside Application extension * Fix AbstractTransport to not skip messages
1 parent d9ab85f commit de5ef44

21 files changed

+296
-355
lines changed

README.md

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,13 @@ server.connect(transport)
114114

115115
### Using SSE Transport
116116

117+
Directly in Ktor's `Application`:
117118
```kotlin
118119
import io.ktor.server.application.*
119-
import io.modelcontextprotocol.kotlin.sdk.server.MCP
120+
import io.modelcontextprotocol.kotlin.sdk.server.mcp
120121

121122
fun Application.module() {
122-
MCP {
123+
mcp {
123124
Server(
124125
serverInfo = Implementation(
125126
name = "example-sse-server",
@@ -136,6 +137,35 @@ fun Application.module() {
136137
}
137138
```
138139

140+
Inside a custom Ktor's `Route`:
141+
```kotlin
142+
import io.ktor.server.application.*
143+
import io.ktor.server.sse.SSE
144+
import io.modelcontextprotocol.kotlin.sdk.server.mcp
145+
146+
fun Application.module() {
147+
install(SSE)
148+
149+
routing {
150+
route("myRoute") {
151+
mcp {
152+
Server(
153+
serverInfo = Implementation(
154+
name = "example-sse-server",
155+
version = "1.0.0"
156+
),
157+
options = ServerOptions(
158+
capabilities = ServerCapabilities(
159+
prompts = ServerCapabilities.Prompts(listChanged = null),
160+
resources = ServerCapabilities.Resources(subscribe = null, listChanged = null)
161+
)
162+
)
163+
)
164+
}
165+
}
166+
}
167+
}
168+
```
139169
## Contributing
140170

141171
Please see the [contribution guide](CONTRIBUTING.md) and the [Code of conduct](CODE_OF_CONDUCT.md) before contributing.

build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ kotlin {
225225
jvmTest {
226226
dependencies {
227227
implementation(libs.mockk)
228+
implementation(libs.slf4j.simple)
228229
}
229230
}
230231
}

gradle/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ mockk = "1.13.13"
1212
logging = "7.0.0"
1313
jreleaser = "1.15.0"
1414
binaryCompatibilityValidatorPlugin = "0.17.0"
15+
slf4j = "2.0.16"
1516

1617
[libraries]
1718
# Kotlinx libraries
@@ -30,6 +31,7 @@ kotlinx-coroutines-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-cor
3031
kotlinx-coroutines-debug = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-debug", version.ref = "coroutines" }
3132
ktor-server-test-host = { group = "io.ktor", name = "ktor-server-test-host", version.ref = "ktor" }
3233
mockk = { group = "io.mockk", name = "mockk", version.ref = "mockk" }
34+
slf4j-simple = { group = "org.slf4j", name = "slf4j-simple", version.ref = "slf4j" }
3335

3436

3537

src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/sse.ktor.kt renamed to src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/KtorClient.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public fun HttpClient.mcpSseTransport(
1919
urlString: String? = null,
2020
reconnectionTime: Duration? = null,
2121
requestBuilder: HttpRequestBuilder.() -> Unit = {},
22-
): SSEClientTransport = SSEClientTransport(this, urlString, reconnectionTime, requestBuilder)
22+
): SseClientTransport = SseClientTransport(this, urlString, reconnectionTime, requestBuilder)
2323

2424
/**
2525
* Creates and connects an MCP client over SSE using the provided HttpClient.

src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/SSEClientTransport.kt

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,28 @@ import io.ktor.client.request.*
66
import io.ktor.client.statement.*
77
import io.ktor.http.*
88
import io.modelcontextprotocol.kotlin.sdk.JSONRPCMessage
9+
import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport
910
import io.modelcontextprotocol.kotlin.sdk.shared.McpJson
10-
import io.modelcontextprotocol.kotlin.sdk.shared.Transport
1111
import kotlinx.atomicfu.AtomicBoolean
1212
import kotlinx.atomicfu.atomic
1313
import kotlinx.coroutines.*
1414
import kotlinx.serialization.encodeToString
1515
import kotlin.properties.Delegates
1616
import kotlin.time.Duration
1717

18+
@Deprecated("Use SseClientTransport instead", ReplaceWith("SseClientTransport"), DeprecationLevel.WARNING)
19+
public typealias SSEClientTransport = SseClientTransport
20+
1821
/**
1922
* Client transport for SSE: this will connect to a server using Server-Sent Events for receiving
2023
* messages and make separate POST requests for sending messages.
2124
*/
22-
public class SSEClientTransport(
25+
public class SseClientTransport(
2326
private val client: HttpClient,
2427
private val urlString: String?,
2528
private val reconnectionTime: Duration? = null,
2629
private val requestBuilder: HttpRequestBuilder.() -> Unit = {},
27-
) : Transport {
30+
) : AbstractTransport() {
2831
private val scope by lazy {
2932
CoroutineScope(session.coroutineContext + SupervisorJob())
3033
}
@@ -33,10 +36,6 @@ public class SSEClientTransport(
3336
private var session: ClientSSESession by Delegates.notNull()
3437
private val endpoint = CompletableDeferred<String>()
3538

36-
private var _onClose: (() -> Unit) = {}
37-
private var _onError: ((Throwable) -> Unit) = {}
38-
private var _onMessage: (suspend ((JSONRPCMessage) -> Unit)) = {}
39-
4039
private var job: Job? = null
4140

4241
private val baseUrl by lazy {
@@ -136,28 +135,4 @@ public class SSEClientTransport(
136135
_onClose()
137136
job?.cancelAndJoin()
138137
}
139-
140-
override fun onClose(block: () -> Unit) {
141-
val old = _onClose
142-
_onClose = {
143-
old()
144-
block()
145-
}
146-
}
147-
148-
override fun onError(block: (Throwable) -> Unit) {
149-
val old = _onError
150-
_onError = { e ->
151-
old(e)
152-
block(e)
153-
}
154-
}
155-
156-
override fun onMessage(block: suspend (JSONRPCMessage) -> Unit) {
157-
val old = _onMessage
158-
_onMessage = { message ->
159-
old(message)
160-
block(message)
161-
}
162-
}
163138
}

src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StdioClientTransport.kt

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package io.modelcontextprotocol.kotlin.sdk.client
22

33
import io.github.oshai.kotlinlogging.KotlinLogging
44
import io.modelcontextprotocol.kotlin.sdk.JSONRPCMessage
5+
import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport
56
import io.modelcontextprotocol.kotlin.sdk.shared.ReadBuffer
6-
import io.modelcontextprotocol.kotlin.sdk.shared.Transport
77
import io.modelcontextprotocol.kotlin.sdk.shared.serializeMessage
88
import kotlinx.atomicfu.AtomicBoolean
99
import kotlinx.atomicfu.atomic
@@ -30,7 +30,7 @@ import kotlin.coroutines.CoroutineContext
3030
public class StdioClientTransport(
3131
private val input: Source,
3232
private val output: Sink
33-
) : Transport {
33+
) : AbstractTransport() {
3434
private val logger = KotlinLogging.logger {}
3535
private val ioCoroutineContext: CoroutineContext = Dispatchers.IO
3636
private val scope by lazy {
@@ -41,10 +41,6 @@ public class StdioClientTransport(
4141
private val sendChannel = Channel<JSONRPCMessage>(Channel.UNLIMITED)
4242
private val readBuffer = ReadBuffer()
4343

44-
override var onClose: (() -> Unit)? = null
45-
override var onError: ((Throwable) -> Unit)? = null
46-
override var onMessage: (suspend ((JSONRPCMessage) -> Unit))? = null
47-
4844
override suspend fun start() {
4945
if (!initialized.compareAndSet(false, true)) {
5046
error("StdioClientTransport already started!")
@@ -70,7 +66,7 @@ public class StdioClientTransport(
7066
}
7167
}
7268
} catch (e: Exception) {
73-
onError?.invoke(e)
69+
_onError.invoke(e)
7470
logger.error(e) { "Error reading from input stream" }
7571
}
7672
}
@@ -85,7 +81,7 @@ public class StdioClientTransport(
8581
}
8682
} catch (e: Throwable) {
8783
if (isActive) {
88-
onError?.invoke(e)
84+
_onError.invoke(e)
8985
logger.error(e) { "Error writing to output stream" }
9086
}
9187
} finally {
@@ -95,7 +91,7 @@ public class StdioClientTransport(
9591

9692
readJob.join()
9793
writeJob.cancelAndJoin()
98-
onClose?.invoke()
94+
_onClose.invoke()
9995
}
10096
}
10197

@@ -116,16 +112,16 @@ public class StdioClientTransport(
116112
output.close()
117113
readBuffer.clear()
118114
sendChannel.close()
119-
onClose?.invoke()
115+
_onClose.invoke()
120116
}
121117

122118
private suspend fun processReadBuffer() {
123119
while (true) {
124120
val msg = readBuffer.readMessage() ?: break
125121
try {
126-
onMessage?.invoke(msg)
122+
_onMessage.invoke(msg)
127123
} catch (e: Throwable) {
128-
onError?.invoke(e)
124+
_onError.invoke(e)
129125
logger.error(e) { "Error processing message." }
130126
}
131127
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package io.modelcontextprotocol.kotlin.sdk.server
2+
3+
import io.github.oshai.kotlinlogging.KotlinLogging
4+
import io.ktor.http.*
5+
import io.ktor.server.application.*
6+
import io.ktor.server.response.*
7+
import io.ktor.server.routing.*
8+
import io.ktor.server.sse.*
9+
import io.ktor.util.collections.*
10+
import io.ktor.utils.io.KtorDsl
11+
12+
private val logger = KotlinLogging.logger {}
13+
14+
@KtorDsl
15+
public fun Routing.mcp(path: String, block: () -> Server) {
16+
route(path) {
17+
mcp(block)
18+
}
19+
}
20+
21+
/**
22+
* Configures the Ktor Application to handle Model Context Protocol (MCP) over Server-Sent Events (SSE).
23+
*/
24+
@KtorDsl
25+
public fun Routing.mcp(block: () -> Server) {
26+
val transports = ConcurrentMap<String, SseServerTransport>()
27+
28+
sse {
29+
mcpSseEndpoint("", transports, block)
30+
}
31+
32+
post {
33+
mcpPostEndpoint(transports)
34+
}
35+
}
36+
37+
@Suppress("FunctionName")
38+
@Deprecated("Use mcp() instead", ReplaceWith("mcp(block)"), DeprecationLevel.WARNING)
39+
public fun Application.MCP(block: () -> Server) {
40+
mcp(block)
41+
}
42+
43+
@KtorDsl
44+
public fun Application.mcp(block: () -> Server) {
45+
val transports = ConcurrentMap<String, SseServerTransport>()
46+
47+
install(SSE)
48+
49+
routing {
50+
sse("/sse") {
51+
mcpSseEndpoint("/message", transports, block)
52+
}
53+
54+
post("/message") {
55+
mcpPostEndpoint(transports)
56+
}
57+
}
58+
}
59+
60+
private suspend fun ServerSSESession.mcpSseEndpoint(
61+
postEndpoint: String,
62+
transports: ConcurrentMap<String, SseServerTransport>,
63+
block: () -> Server,
64+
) {
65+
val transport = mcpSseTransport(postEndpoint, transports)
66+
67+
val server = block()
68+
69+
server.onClose {
70+
logger.info { "Server connection closed for sessionId: ${transport.sessionId}" }
71+
transports.remove(transport.sessionId)
72+
}
73+
74+
server.connect(transport)
75+
logger.debug { "Server connected to transport for sessionId: ${transport.sessionId}" }
76+
}
77+
78+
internal fun ServerSSESession.mcpSseTransport(
79+
postEndpoint: String,
80+
transports: ConcurrentMap<String, SseServerTransport>,
81+
): SseServerTransport {
82+
val transport = SseServerTransport(postEndpoint, this)
83+
transports[transport.sessionId] = transport
84+
85+
logger.info { "New SSE connection established and stored with sessionId: ${transport.sessionId}" }
86+
87+
return transport
88+
}
89+
90+
internal suspend fun RoutingContext.mcpPostEndpoint(
91+
transports: ConcurrentMap<String, SseServerTransport>,
92+
) {
93+
val sessionId: String = call.request.queryParameters["sessionId"]
94+
?: run {
95+
call.respond(HttpStatusCode.BadRequest, "sessionId query parameter is not provided")
96+
return
97+
}
98+
99+
logger.debug { "Received message for sessionId: $sessionId" }
100+
101+
val transport = transports[sessionId]
102+
if (transport == null) {
103+
logger.warn { "Session not found for sessionId: $sessionId" }
104+
call.respond(HttpStatusCode.NotFound, "Session not found")
105+
return
106+
}
107+
108+
transport.handlePostMessage(call)
109+
logger.trace { "Message handled for sessionId: $sessionId" }
110+
}

src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/McpKtorServerPlugin.kt

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)