Skip to content

Commit cfc188e

Browse files
committed
Add Streamable Http Transport
1 parent c5a254b commit cfc188e

File tree

5 files changed

+746
-6
lines changed

5 files changed

+746
-6
lines changed

kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
257257
JSONRPCResponse(
258258
id = request.id,
259259
error = JSONRPCError(
260-
ErrorCode.Defined.MethodNotFound,
260+
code = ErrorCode.Defined.MethodNotFound,
261261
message = "Server does not support ${request.method}",
262262
),
263263
),

kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types.kt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,14 +249,14 @@ public data class JSONRPCNotification(
249249
*/
250250
@Serializable
251251
public class JSONRPCResponse(
252-
public val id: RequestId,
252+
public val id: RequestId?,
253253
public val jsonrpc: String = JSONRPC_VERSION,
254254
public val result: RequestResult? = null,
255255
public val error: JSONRPCError? = null,
256256
) : JSONRPCMessage {
257257

258258
public fun copy(
259-
id: RequestId = this.id,
259+
id: RequestId? = this.id,
260260
jsonrpc: String = this.jsonrpc,
261261
result: RequestResult? = this.result,
262262
error: JSONRPCError? = this.error,
@@ -292,8 +292,12 @@ public sealed interface ErrorCode {
292292
* A response to a request that indicates an error occurred.
293293
*/
294294
@Serializable
295-
public data class JSONRPCError(val code: ErrorCode, val message: String, val data: JsonObject = EmptyJsonObject) :
296-
JSONRPCMessage
295+
public data class JSONRPCError(
296+
val id: RequestId? = null,
297+
val code: ErrorCode,
298+
val message: String,
299+
val data: JsonObject = EmptyJsonObject,
300+
) : JSONRPCMessage
297301

298302
/**
299303
* Base interface for notification parameters with optional metadata.

kotlin-sdk-server/api/kotlin-sdk-server.api

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,21 @@
1+
public final class io/modelcontextprotocol/kotlin/sdk/LibVersionKt {
2+
public static final field LIB_VERSION Ljava/lang/String;
3+
}
4+
5+
public abstract interface class io/modelcontextprotocol/kotlin/sdk/server/EventStore {
6+
public abstract fun replayEventsAfter (Ljava/lang/String;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
7+
public abstract fun storeEvent (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8+
}
9+
110
public final class io/modelcontextprotocol/kotlin/sdk/server/KtorServerKt {
211
public static final fun MCP (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function1;)V
312
public static final fun mcp (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function1;)V
413
public static final fun mcp (Lio/ktor/server/routing/Routing;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)V
514
public static final fun mcp (Lio/ktor/server/routing/Routing;Lkotlin/jvm/functions/Function1;)V
15+
public static final fun mcpStatelessStreamableHttp (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;)V
16+
public static synthetic fun mcpStatelessStreamableHttp$default (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
17+
public static final fun mcpStreamableHttp (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;)V
18+
public static synthetic fun mcpStreamableHttp$default (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
619
}
720

821
public final class io/modelcontextprotocol/kotlin/sdk/server/RegisteredPrompt {
@@ -127,6 +140,24 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTranspor
127140
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
128141
}
129142

143+
public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport {
144+
public static final field STANDALONE_SSE_STREAM_ID Ljava/lang/String;
145+
public fun <init> ()V
146+
public fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;)V
147+
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
148+
public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
149+
public final fun getSessionId ()Ljava/lang/String;
150+
public final fun handleDeleteRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
151+
public final fun handleGetRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
152+
public final fun handlePostRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
153+
public final fun handleRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
154+
public fun send (Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
155+
public final fun setOnSessionClosed (Lkotlin/jvm/functions/Function1;)V
156+
public final fun setOnSessionInitialized (Lkotlin/jvm/functions/Function1;)V
157+
public final fun setSessionIdGenerator (Lkotlin/jvm/functions/Function0;)V
158+
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
159+
}
160+
130161
public final class io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpKtorServerExtensionsKt {
131162
public static final fun mcpWebSocket (Lio/ktor/server/application/Application;Ljava/lang/String;Lkotlin/jvm/functions/Function0;)V
132163
public static final fun mcpWebSocket (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function0;)V

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging
44
import io.ktor.http.HttpStatusCode
55
import io.ktor.server.application.Application
66
import io.ktor.server.application.install
7+
import io.ktor.server.request.header
78
import io.ktor.server.response.respond
89
import io.ktor.server.routing.Routing
910
import io.ktor.server.routing.RoutingContext
@@ -13,12 +14,14 @@ import io.ktor.server.routing.routing
1314
import io.ktor.server.sse.SSE
1415
import io.ktor.server.sse.ServerSSESession
1516
import io.ktor.server.sse.sse
17+
import io.ktor.util.collections.ConcurrentMap
1618
import io.ktor.utils.io.KtorDsl
1719
import kotlinx.atomicfu.AtomicRef
1820
import kotlinx.atomicfu.atomic
1921
import kotlinx.atomicfu.update
2022
import kotlinx.collections.immutable.PersistentMap
2123
import kotlinx.collections.immutable.toPersistentMap
24+
import io.modelcontextprotocol.kotlin.sdk.ErrorCode
2225

2326
private val logger = KotlinLogging.logger {}
2427

@@ -74,7 +77,52 @@ public fun Application.mcp(block: ServerSSESession.() -> Server) {
7477
}
7578
}
7679

77-
internal suspend fun ServerSSESession.mcpSseEndpoint(
80+
@KtorDsl
81+
public fun Application.mcpStreamableHttp(
82+
enableDnsRebindingProtection: Boolean = false,
83+
allowedHosts: List<String>? = null,
84+
allowedOrigins: List<String>? = null,
85+
eventStore: EventStore? = null,
86+
block: RoutingContext.() -> Server,
87+
) {
88+
val transports = ConcurrentMap<String, StreamableHttpServerTransport>()
89+
90+
routing {
91+
post("/mcp") {
92+
mcpStreamableHttpEndpoint(
93+
transports,
94+
enableDnsRebindingProtection,
95+
allowedHosts,
96+
allowedOrigins,
97+
eventStore,
98+
block,
99+
)
100+
}
101+
}
102+
}
103+
104+
@KtorDsl
105+
public fun Application.mcpStatelessStreamableHttp(
106+
enableDnsRebindingProtection: Boolean = false,
107+
allowedHosts: List<String>? = null,
108+
allowedOrigins: List<String>? = null,
109+
eventStore: EventStore? = null,
110+
block: RoutingContext.() -> Server,
111+
) {
112+
routing {
113+
post("/mcp") {
114+
mcpStatelessStreamableHttpEndpoint(
115+
enableDnsRebindingProtection,
116+
allowedHosts,
117+
allowedOrigins,
118+
eventStore,
119+
block,
120+
)
121+
}
122+
}
123+
}
124+
125+
private suspend fun ServerSSESession.mcpSseEndpoint(
78126
postEndpoint: String,
79127
sseTransportManager: SseTransportManager,
80128
block: ServerSSESession.() -> Server,
@@ -104,6 +152,88 @@ internal fun ServerSSESession.mcpSseTransport(
104152
return transport
105153
}
106154

155+
internal suspend fun RoutingContext.mcpStreamableHttpEndpoint(
156+
transports: ConcurrentMap<String, StreamableHttpServerTransport>,
157+
enableDnsRebindingProtection: Boolean = false,
158+
allowedHosts: List<String>? = null,
159+
allowedOrigins: List<String>? = null,
160+
eventStore: EventStore? = null,
161+
block: RoutingContext.() -> Server,
162+
) {
163+
val sessionId = this.call.request.header(MCP_SESSION_ID_HEADER)
164+
val transport = if (sessionId != null && transports.containsKey(sessionId)) {
165+
transports[sessionId]!!
166+
} else if (sessionId == null) {
167+
val transport = StreamableHttpServerTransport(
168+
enableDnsRebindingProtection = enableDnsRebindingProtection,
169+
allowedHosts = allowedHosts,
170+
allowedOrigins = allowedOrigins,
171+
eventStore = eventStore,
172+
enableJsonResponse = true,
173+
)
174+
175+
transport.setOnSessionInitialized { sessionId ->
176+
transports[sessionId] = transport
177+
178+
logger.info { "New StreamableHttp connection established and stored with sessionId: $sessionId" }
179+
}
180+
181+
val server = block()
182+
server.onClose {
183+
logger.info { "Server connection closed for sessionId: ${transport.sessionId}" }
184+
}
185+
186+
server.connect(transport)
187+
188+
transport
189+
} else {
190+
null
191+
}
192+
193+
if (transport == null) {
194+
this.call.reject(
195+
HttpStatusCode.BadRequest,
196+
ErrorCode.Unknown(-32000),
197+
"Bad Request: No valid session ID provided",
198+
)
199+
return
200+
}
201+
202+
transport.handleRequest(null, this.call)
203+
logger.debug { "Server connected to transport for sessionId: ${transport.sessionId}" }
204+
}
205+
206+
internal suspend fun RoutingContext.mcpStatelessStreamableHttpEndpoint(
207+
enableDnsRebindingProtection: Boolean = false,
208+
allowedHosts: List<String>? = null,
209+
allowedOrigins: List<String>? = null,
210+
eventStore: EventStore? = null,
211+
block: RoutingContext.() -> Server,
212+
) {
213+
val transport = StreamableHttpServerTransport(
214+
enableDnsRebindingProtection = enableDnsRebindingProtection,
215+
allowedHosts = allowedHosts,
216+
allowedOrigins = allowedOrigins,
217+
eventStore = eventStore,
218+
enableJsonResponse = true,
219+
)
220+
transport.setSessionIdGenerator(null)
221+
222+
logger.info { "New stateless StreamableHttp connection established without sessionId" }
223+
224+
val server = block()
225+
226+
server.onClose {
227+
logger.info { "Server connection closed without sessionId" }
228+
}
229+
230+
server.connect(transport)
231+
232+
transport.handleRequest(null, this.call)
233+
234+
logger.debug { "Server connected to transport without sessionId" }
235+
}
236+
107237
internal suspend fun RoutingContext.mcpPostEndpoint(sseTransportManager: SseTransportManager) {
108238
val sessionId: String = call.request.queryParameters["sessionId"] ?: run {
109239
call.respond(HttpStatusCode.BadRequest, "sessionId query parameter is not provided")

0 commit comments

Comments
 (0)