Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 116 additions & 67 deletions Sources/SwiftMCP/Client/MCPServerProxy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,63 +89,12 @@ public final actor MCPServerProxy: Sendable {

case .sse(let sseConfig):
#if os(Linux)
throw MCPServerProxyError.unsupportedPlatform("SSE client connections require URLSession.bytes support on Linux.")
try await connectSSELinux(sseConfig: sseConfig)
#else
if #available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, macCatalyst 15.0, *) {
let isStreamableMCP = isStreamableMCPURL(sseConfig.url)
if isStreamableMCP {
// Streamable HTTP posts to the same /mcp endpoint.
endpointURL = sseConfig.url
}

let sessionConfig = URLSessionConfiguration.default
sessionConfig.timeoutIntervalForRequest = .infinity
sessionConfig.timeoutIntervalForResource = .infinity

let session = URLSession(configuration: sessionConfig)
var request = URLRequest(url: sseConfig.url)
request.httpMethod = "GET"
applyConfiguredSSEHeaders(&request, sseConfig: sseConfig)
request.setValue("text/event-stream", forHTTPHeaderField: "Accept")

streamTask = Task {
do {
let (asyncBytes, response) = try await session.bytes(for: request)
try await self.handleSSEConnectionResponse(response: response, asyncBytes: asyncBytes, sseConfig: sseConfig, isStreamableMCP: isStreamableMCP)

for try await message in asyncBytes.lines.sseMessages() {
await self.processIncomingMessage(event: message.event, data: message.data)
}
self.handleStreamTermination(
MCPServerProxyError.communicationError(
"SSE stream closed by server before response was received"
)
)
} catch is CancellationError {
// Pending requests are cancelled in disconnect().
} catch {
self.logger.error("[MCP DEBUG] SSE stream error: \(error)")
self.handleStreamTermination(error)
}
}

if !isStreamableMCP && endpointURL == nil {
let _ = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<URL, Error>) in
self.endpointContinuation = continuation

Task {
try await Task.sleep(nanoseconds: 10_000_000_000)
if let cont = self.endpointContinuation {
self.endpointContinuation = nil
cont.resume(throwing: MCPServerProxyError.communicationError("Timeout waiting for endpoint URL"))
}
}
}
}

try await initialize()
try await connectSSEApple(sseConfig: sseConfig)
} else {
throw MCPServerProxyError.unsupportedPlatform("SSE client connections require newer OS availability.")
throw MCPServerProxyError.unsupportedPlatform("SSE client connections require macOS 12.0 or newer.")
}
#endif
}
Expand Down Expand Up @@ -840,23 +789,108 @@ public final actor MCPServerProxy: Sendable {
}
}

// MARK: - SSE Connection (Apple platforms)

#if !os(Linux)
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, macCatalyst 15.0, *)
private func handleSSEConnectionResponse(
response: URLResponse,
asyncBytes: URLSession.AsyncBytes,
sseConfig: MCPServerSseConfig,
isStreamableMCP: Bool
) async throws {
if let response = response as? HTTPURLResponse, response.statusCode != 200 {
let data = try await asyncBytes.reduce(into: Data()) { partialResult, byte in
partialResult.append(byte)
private func connectSSEApple(sseConfig: MCPServerSseConfig) async throws {
let isStreamableMCP = isStreamableMCPURL(sseConfig.url)
if isStreamableMCP {
endpointURL = sseConfig.url
}

let sessionConfig = URLSessionConfiguration.default
sessionConfig.timeoutIntervalForRequest = .infinity
sessionConfig.timeoutIntervalForResource = .infinity

let session = URLSession(configuration: sessionConfig)
var request = URLRequest(url: sseConfig.url)
request.httpMethod = "GET"
applyConfiguredSSEHeaders(&request, sseConfig: sseConfig)
request.setValue("text/event-stream", forHTTPHeaderField: "Accept")

streamTask = Task {
do {
let (asyncBytes, response) = try await session.bytes(for: request)
try self.handleSSEResponse(response, sseConfig: sseConfig, isStreamableMCP: isStreamableMCP)

for try await message in asyncBytes.lines.sseMessages() {
await self.processIncomingMessage(event: message.event, data: message.data)
}
self.handleStreamTermination(
MCPServerProxyError.communicationError(
"SSE stream closed by server before response was received"
)
)
} catch is CancellationError {
// Pending requests are cancelled in disconnect().
} catch {
self.logger.error("[MCP DEBUG] SSE stream error: \(error)")
self.handleStreamTermination(error)
}
throw MCPServerProxyError.communicationError("HTTP error \(response.statusCode): \(String(data: data, encoding: .utf8) ?? "Unknown error")")
}

if let response = response as? HTTPURLResponse {
sessionID = response.value(forHTTPHeaderField: "Mcp-Session-Id")
try await waitForEndpointIfNeeded(isStreamableMCP: isStreamableMCP)
try await initialize()
}
#endif

// MARK: - SSE Connection (Linux)

#if os(Linux)
private func connectSSELinux(sseConfig: MCPServerSseConfig) async throws {
let isStreamableMCP = isStreamableMCPURL(sseConfig.url)
if isStreamableMCP {
endpointURL = sseConfig.url
}

let sessionConfig = URLSessionConfiguration.default

var request = URLRequest(url: sseConfig.url)
request.httpMethod = "GET"
applyConfiguredSSEHeaders(&request, sseConfig: sseConfig)
request.setValue("text/event-stream", forHTTPHeaderField: "Accept")

// Use a streaming delegate since URLSession.bytes is unavailable on Linux
let proxy = self
let delegate = SSEStreamingDelegate { response in
Task {
await proxy.handleSSEResponse(response, sseConfig: sseConfig, isStreamableMCP: isStreamableMCP)
}
}

let session = URLSession(configuration: sessionConfig, delegate: delegate, delegateQueue: nil)
let task = session.dataTask(with: request)
task.resume()

streamTask = Task {
do {
for try await message in delegate.lines.sseMessages() {
await self.processIncomingMessage(event: message.event, data: message.data)
}
self.handleStreamTermination(
MCPServerProxyError.communicationError(
"SSE stream closed by server before response was received"
)
)
} catch is CancellationError {
task.cancel()
} catch {
self.logger.error("[MCP DEBUG] SSE stream error: \(error)")
self.handleStreamTermination(error)
}
}

try await waitForEndpointIfNeeded(isStreamableMCP: isStreamableMCP)
try await initialize()
}
#endif

// MARK: - Shared SSE Helpers

private func handleSSEResponse(_ response: URLResponse, sseConfig: MCPServerSseConfig, isStreamableMCP: Bool) {
if let httpResponse = response as? HTTPURLResponse {
sessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id")
if isStreamableMCP {
endpointURL = sseConfig.url
} else if let sessionID,
Expand All @@ -865,7 +899,22 @@ public final actor MCPServerProxy: Sendable {
}
}
}
#endif

private func waitForEndpointIfNeeded(isStreamableMCP: Bool) async throws {
if !isStreamableMCP && endpointURL == nil {
let _ = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<URL, Error>) in
self.endpointContinuation = continuation

Task {
try await Task.sleep(nanoseconds: 10_000_000_000)
if let cont = self.endpointContinuation {
self.endpointContinuation = nil
cont.resume(throwing: MCPServerProxyError.communicationError("Timeout waiting for endpoint URL"))
}
}
}
}
}

private func messageEndpointURL(baseURL: URL, sessionId: String) -> URL? {
guard var components = URLComponents(url: baseURL, resolvingAgainstBaseURL: false) else {
Expand Down
73 changes: 73 additions & 0 deletions Sources/SwiftMCP/Client/SSEStreamingDelegate.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import Foundation
#if canImport(FoundationNetworking)
import FoundationNetworking
#endif

/// A `URLSession` delegate that feeds received bytes into an `AsyncStream`.
/// Used on Linux where `URLSession.bytes(for:)` is unavailable.
final class SSEStreamingDelegate: NSObject, URLSessionDataDelegate, @unchecked Sendable {
private let lineContinuation: AsyncStream<String>.Continuation
private var buffer = Data()
private var responseHandler: ((URLResponse) -> Void)?

/// The async stream of lines received from the SSE connection.
let lines: AsyncStream<String>

/// The HTTP response, available after the first data callback.
private(set) var response: URLResponse?
private let responseContinuation: CheckedContinuation<URLResponse, Never>?

init(onResponse: @escaping (URLResponse) -> Void) {
var cont: AsyncStream<String>.Continuation!
self.lines = AsyncStream<String> { cont = $0 }
self.lineContinuation = cont
self.responseHandler = onResponse
self.responseContinuation = nil
super.init()
}

func urlSession(
_ session: URLSession,
dataTask: URLSessionDataTask,
didReceive response: URLResponse,
completionHandler: @escaping (URLSession.ResponseDisposition) -> Void
) {
self.response = response
responseHandler?(response)
responseHandler = nil
completionHandler(.allow)
}

func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
buffer.append(data)

// Split buffer on newlines and emit complete lines
while let newlineRange = buffer.range(of: Data([0x0A])) {
let lineData = buffer.subdata(in: buffer.startIndex..<newlineRange.lowerBound)
buffer.removeSubrange(buffer.startIndex...newlineRange.lowerBound)

// Strip trailing \r if present
let cleanData: Data
if lineData.last == 0x0D {
cleanData = lineData.dropLast()
} else {
cleanData = lineData
}

let line = String(data: cleanData, encoding: .utf8) ?? ""
lineContinuation.yield(line)
}
}

func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
// Flush any remaining data in buffer as a final line
if !buffer.isEmpty {
let line = String(data: buffer, encoding: .utf8) ?? ""
if !line.isEmpty {
lineContinuation.yield(line)
}
buffer.removeAll()
}
lineContinuation.finish()
}
}