Skip to content

Commit 98cb754

Browse files
committed
Fix streamable /mcp SSE client request/response flow
1 parent c395237 commit 98cb754

File tree

1 file changed

+97
-65
lines changed

1 file changed

+97
-65
lines changed

Sources/SwiftMCP/Client/MCPServerProxy.swift

Lines changed: 97 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ public final actor MCPServerProxy: Sendable {
9292
throw MCPServerProxyError.unsupportedPlatform("SSE client connections require URLSession.bytes support on Linux.")
9393
#else
9494
if #available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, macCatalyst 15.0, *) {
95+
let isStreamableMCP = isStreamableMCPURL(sseConfig.url)
96+
if isStreamableMCP {
97+
// Streamable HTTP posts to the same /mcp endpoint.
98+
endpointURL = sseConfig.url
99+
}
100+
95101
let sessionConfig = URLSessionConfiguration.default
96102
sessionConfig.timeoutIntervalForRequest = .infinity
97103
sessionConfig.timeoutIntervalForResource = .infinity
@@ -102,29 +108,13 @@ public final actor MCPServerProxy: Sendable {
102108
applyConfiguredSSEHeaders(&request, sseConfig: sseConfig)
103109
request.setValue("text/event-stream", forHTTPHeaderField: "Accept")
104110

105-
let (asyncBytes, response) = try await session.bytes(for: request)
106-
107-
if let response = response as? HTTPURLResponse, response.statusCode != 200 {
108-
let data = try await asyncBytes.reduce(into: Data()) { partialResult, byte in
109-
partialResult.append(byte)
110-
}
111-
throw MCPServerProxyError.communicationError("HTTP error \(response.statusCode): \(String(data: data, encoding: .utf8) ?? "Unknown error")")
112-
}
113-
114-
if let response = response as? HTTPURLResponse {
115-
sessionID = response.value(forHTTPHeaderField: "Mcp-Session-Id")
116-
if isStreamableMCPURL(sseConfig.url) {
117-
endpointURL = sseConfig.url
118-
} else if let sessionID,
119-
let endpoint = messageEndpointURL(baseURL: sseConfig.url, sessionId: sessionID) {
120-
endpointURL = endpoint
121-
}
122-
}
123-
124111
streamTask = Task {
125112
do {
113+
let (asyncBytes, response) = try await session.bytes(for: request)
114+
try await self.handleSSEConnectionResponse(response: response, asyncBytes: asyncBytes, sseConfig: sseConfig, isStreamableMCP: isStreamableMCP)
115+
126116
for try await message in asyncBytes.lines.sseMessages() {
127-
processIncomingMessage(event: message.event, data: message.data)
117+
await self.processIncomingMessage(event: message.event, data: message.data)
128118
}
129119
self.handleStreamTermination(
130120
MCPServerProxyError.communicationError(
@@ -134,12 +124,12 @@ public final actor MCPServerProxy: Sendable {
134124
} catch is CancellationError {
135125
// Pending requests are cancelled in disconnect().
136126
} catch {
137-
logger.error("[MCP DEBUG] SSE stream error: \(error)")
127+
self.logger.error("[MCP DEBUG] SSE stream error: \(error)")
138128
self.handleStreamTermination(error)
139129
}
140130
}
141131

142-
if endpointURL == nil {
132+
if !isStreamableMCP && endpointURL == nil {
143133
let _ = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<URL, Error>) in
144134
self.endpointContinuation = continuation
145135

@@ -379,7 +369,24 @@ public final actor MCPServerProxy: Sendable {
379369
}
380370

381371
switch httpResponse.statusCode {
382-
case 200:
372+
case 200, 202:
373+
if let updatedSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") {
374+
sessionID = updatedSessionID
375+
}
376+
377+
// Some servers reply immediately in the HTTP body even for 202.
378+
if let responseMessage = try responseMessage(for: messageId, from: responseData) {
379+
if responseTasks[messageId] != nil {
380+
responseTasks.removeValue(forKey: messageId)
381+
continuation.resume(returning: responseMessage)
382+
}
383+
return
384+
}
385+
386+
if httpResponse.statusCode == 202 {
387+
return // response will arrive over SSE
388+
}
389+
383390
guard let responseMessage = try responseMessage(for: messageId, from: responseData) else {
384391
let responseBody = String(data: responseData, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) ?? ""
385392
let details = responseBody.isEmpty ? "" : ": \(responseBody)"
@@ -390,8 +397,6 @@ public final actor MCPServerProxy: Sendable {
390397
responseTasks.removeValue(forKey: messageId)
391398
continuation.resume(returning: responseMessage)
392399
}
393-
case 202:
394-
break // response will arrive over SSE
395400
default:
396401
let responseBody = String(data: responseData, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) ?? ""
397402
let details = responseBody.isEmpty ? "" : ": \(responseBody)"
@@ -425,7 +430,7 @@ public final actor MCPServerProxy: Sendable {
425430
do {
426431
let lines = await lineConnection.lines()
427432
for try await data in lines {
428-
processIncomingMessage(data: data)
433+
await processIncomingMessage(data: data)
429434
}
430435
self.handleStreamTermination(
431436
MCPServerProxyError.communicationError(
@@ -529,50 +534,52 @@ public final actor MCPServerProxy: Sendable {
529534
)
530535
}
531536

532-
private func processIncomingMessage(event: String = "", data: String) {
533-
Task {
534-
if event == "endpoint" {
535-
if let url = URL(string: data) {
536-
endpointURL = url
537-
if let continuation = endpointContinuation {
538-
endpointContinuation = nil
539-
continuation.resume(returning: url)
540-
}
541-
return
537+
private func processIncomingMessage(event: String = "", data: String) async {
538+
if event == "endpoint" {
539+
if case .sse(let sseConfig) = config, isStreamableMCPURL(sseConfig.url) {
540+
// Ignore legacy endpoint events for streamable /mcp mode.
541+
return
542+
}
543+
if let url = URL(string: data) {
544+
endpointURL = url
545+
if let continuation = endpointContinuation {
546+
endpointContinuation = nil
547+
continuation.resume(returning: url)
542548
}
549+
return
543550
}
551+
}
544552

545-
guard let jsonData = data.data(using: .utf8) else { return }
546-
logger.trace("[MCP DEBUG] Received JSON-RPC message: \(data)")
547-
let decoder = JSONDecoder()
548-
if let message = try? decoder.decode(JSONRPCMessage.self, from: jsonData) {
549-
switch message {
550-
case .request(let requestData):
551-
if requestData.method == "ping" {
552-
logger.info("[MCP] Ping request received; sending response.")
553-
await handlePingRequest(requestData)
554-
} else {
555-
logger.debug("[MCP DEBUG] Ignoring client request: \(requestData.method)")
556-
}
557-
case .notification(let notification):
558-
if notification.method == "notifications/progress" {
559-
logProgressNotification(notification)
560-
} else if notification.method == "notifications/message" {
561-
await handleLogNotification(notification)
562-
} else {
563-
logger.trace("[MCP DEBUG] Received notification: \(notification.method)")
564-
}
565-
case .response, .errorResponse:
566-
if let id = message.id, let waitingContinuation = responseTasks[id] {
567-
responseTasks.removeValue(forKey: id)
568-
waitingContinuation.resume(returning: message)
569-
} else {
570-
logger.error("[MCP DEBUG] No waiting continuation found for ID \(message.id?.stringValue ?? "nil")")
571-
}
553+
guard let jsonData = data.data(using: .utf8) else { return }
554+
logger.trace("[MCP DEBUG] Received JSON-RPC message: \(data)")
555+
let decoder = JSONDecoder()
556+
if let message = try? decoder.decode(JSONRPCMessage.self, from: jsonData) {
557+
switch message {
558+
case .request(let requestData):
559+
if requestData.method == "ping" {
560+
logger.info("[MCP] Ping request received; sending response.")
561+
await handlePingRequest(requestData)
562+
} else {
563+
logger.debug("[MCP DEBUG] Ignoring client request: \(requestData.method)")
564+
}
565+
case .notification(let notification):
566+
if notification.method == "notifications/progress" {
567+
logProgressNotification(notification)
568+
} else if notification.method == "notifications/message" {
569+
await handleLogNotification(notification)
570+
} else {
571+
logger.trace("[MCP DEBUG] Received notification: \(notification.method)")
572+
}
573+
case .response, .errorResponse:
574+
if let id = message.id, let waitingContinuation = responseTasks[id] {
575+
responseTasks.removeValue(forKey: id)
576+
waitingContinuation.resume(returning: message)
577+
} else {
578+
logger.error("[MCP DEBUG] No waiting continuation found for ID \(message.id?.stringValue ?? "nil")")
572579
}
573-
} else {
574-
logger.error("[MCP DEBUG] Failed to decode JSON-RPC message")
575580
}
581+
} else {
582+
logger.error("[MCP DEBUG] Failed to decode JSON-RPC message")
576583
}
577584
}
578585

@@ -833,6 +840,31 @@ public final actor MCPServerProxy: Sendable {
833840
}
834841
}
835842

843+
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, macCatalyst 15.0, *)
844+
private func handleSSEConnectionResponse(
845+
response: URLResponse,
846+
asyncBytes: URLSession.AsyncBytes,
847+
sseConfig: MCPServerSseConfig,
848+
isStreamableMCP: Bool
849+
) async throws {
850+
if let response = response as? HTTPURLResponse, response.statusCode != 200 {
851+
let data = try await asyncBytes.reduce(into: Data()) { partialResult, byte in
852+
partialResult.append(byte)
853+
}
854+
throw MCPServerProxyError.communicationError("HTTP error \(response.statusCode): \(String(data: data, encoding: .utf8) ?? "Unknown error")")
855+
}
856+
857+
if let response = response as? HTTPURLResponse {
858+
sessionID = response.value(forHTTPHeaderField: "Mcp-Session-Id")
859+
if isStreamableMCP {
860+
endpointURL = sseConfig.url
861+
} else if let sessionID,
862+
let endpoint = messageEndpointURL(baseURL: sseConfig.url, sessionId: sessionID) {
863+
endpointURL = endpoint
864+
}
865+
}
866+
}
867+
836868
private func messageEndpointURL(baseURL: URL, sessionId: String) -> URL? {
837869
guard var components = URLComponents(url: baseURL, resolvingAgainstBaseURL: false) else {
838870
return nil

0 commit comments

Comments
 (0)