Skip to content

Commit 17812b6

Browse files
authored
Merge pull request #63 from Cocoanetics/feature/linux-sse-client
SSE client: add Linux support via URLSession delegate streaming
2 parents fd0bc2f + d33bde2 commit 17812b6

File tree

2 files changed

+189
-67
lines changed

2 files changed

+189
-67
lines changed

Sources/SwiftMCP/Client/MCPServerProxy.swift

Lines changed: 116 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -89,63 +89,12 @@ public final actor MCPServerProxy: Sendable {
8989

9090
case .sse(let sseConfig):
9191
#if os(Linux)
92-
throw MCPServerProxyError.unsupportedPlatform("SSE client connections require URLSession.bytes support on Linux.")
92+
try await connectSSELinux(sseConfig: sseConfig)
9393
#else
9494
if #available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, macCatalyst 15.0, *) {
95-
let isStreamableMCP = isStreamableMCPURL(sseConfig.url)
96-
if isStreamableMCP {
97-
// Streamable HTTP posts to the same /mcp endpoint.
98-
endpointURL = sseConfig.url
99-
}
100-
101-
let sessionConfig = URLSessionConfiguration.default
102-
sessionConfig.timeoutIntervalForRequest = .infinity
103-
sessionConfig.timeoutIntervalForResource = .infinity
104-
105-
let session = URLSession(configuration: sessionConfig)
106-
var request = URLRequest(url: sseConfig.url)
107-
request.httpMethod = "GET"
108-
applyConfiguredSSEHeaders(&request, sseConfig: sseConfig)
109-
request.setValue("text/event-stream", forHTTPHeaderField: "Accept")
110-
111-
streamTask = Task {
112-
do {
113-
let (asyncBytes, response) = try await session.bytes(for: request)
114-
try await self.handleSSEConnectionResponse(response: response, asyncBytes: asyncBytes, sseConfig: sseConfig, isStreamableMCP: isStreamableMCP)
115-
116-
for try await message in asyncBytes.lines.sseMessages() {
117-
await self.processIncomingMessage(event: message.event, data: message.data)
118-
}
119-
self.handleStreamTermination(
120-
MCPServerProxyError.communicationError(
121-
"SSE stream closed by server before response was received"
122-
)
123-
)
124-
} catch is CancellationError {
125-
// Pending requests are cancelled in disconnect().
126-
} catch {
127-
self.logger.error("[MCP DEBUG] SSE stream error: \(error)")
128-
self.handleStreamTermination(error)
129-
}
130-
}
131-
132-
if !isStreamableMCP && endpointURL == nil {
133-
let _ = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<URL, Error>) in
134-
self.endpointContinuation = continuation
135-
136-
Task {
137-
try await Task.sleep(nanoseconds: 10_000_000_000)
138-
if let cont = self.endpointContinuation {
139-
self.endpointContinuation = nil
140-
cont.resume(throwing: MCPServerProxyError.communicationError("Timeout waiting for endpoint URL"))
141-
}
142-
}
143-
}
144-
}
145-
146-
try await initialize()
95+
try await connectSSEApple(sseConfig: sseConfig)
14796
} else {
148-
throw MCPServerProxyError.unsupportedPlatform("SSE client connections require newer OS availability.")
97+
throw MCPServerProxyError.unsupportedPlatform("SSE client connections require macOS 12.0 or newer.")
14998
}
15099
#endif
151100
}
@@ -840,23 +789,108 @@ public final actor MCPServerProxy: Sendable {
840789
}
841790
}
842791

792+
// MARK: - SSE Connection (Apple platforms)
793+
843794
#if !os(Linux)
844795
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, macCatalyst 15.0, *)
845-
private func handleSSEConnectionResponse(
846-
response: URLResponse,
847-
asyncBytes: URLSession.AsyncBytes,
848-
sseConfig: MCPServerSseConfig,
849-
isStreamableMCP: Bool
850-
) async throws {
851-
if let response = response as? HTTPURLResponse, response.statusCode != 200 {
852-
let data = try await asyncBytes.reduce(into: Data()) { partialResult, byte in
853-
partialResult.append(byte)
796+
private func connectSSEApple(sseConfig: MCPServerSseConfig) async throws {
797+
let isStreamableMCP = isStreamableMCPURL(sseConfig.url)
798+
if isStreamableMCP {
799+
endpointURL = sseConfig.url
800+
}
801+
802+
let sessionConfig = URLSessionConfiguration.default
803+
sessionConfig.timeoutIntervalForRequest = .infinity
804+
sessionConfig.timeoutIntervalForResource = .infinity
805+
806+
let session = URLSession(configuration: sessionConfig)
807+
var request = URLRequest(url: sseConfig.url)
808+
request.httpMethod = "GET"
809+
applyConfiguredSSEHeaders(&request, sseConfig: sseConfig)
810+
request.setValue("text/event-stream", forHTTPHeaderField: "Accept")
811+
812+
streamTask = Task {
813+
do {
814+
let (asyncBytes, response) = try await session.bytes(for: request)
815+
try self.handleSSEResponse(response, sseConfig: sseConfig, isStreamableMCP: isStreamableMCP)
816+
817+
for try await message in asyncBytes.lines.sseMessages() {
818+
await self.processIncomingMessage(event: message.event, data: message.data)
819+
}
820+
self.handleStreamTermination(
821+
MCPServerProxyError.communicationError(
822+
"SSE stream closed by server before response was received"
823+
)
824+
)
825+
} catch is CancellationError {
826+
// Pending requests are cancelled in disconnect().
827+
} catch {
828+
self.logger.error("[MCP DEBUG] SSE stream error: \(error)")
829+
self.handleStreamTermination(error)
854830
}
855-
throw MCPServerProxyError.communicationError("HTTP error \(response.statusCode): \(String(data: data, encoding: .utf8) ?? "Unknown error")")
856831
}
857832

858-
if let response = response as? HTTPURLResponse {
859-
sessionID = response.value(forHTTPHeaderField: "Mcp-Session-Id")
833+
try await waitForEndpointIfNeeded(isStreamableMCP: isStreamableMCP)
834+
try await initialize()
835+
}
836+
#endif
837+
838+
// MARK: - SSE Connection (Linux)
839+
840+
#if os(Linux)
841+
private func connectSSELinux(sseConfig: MCPServerSseConfig) async throws {
842+
let isStreamableMCP = isStreamableMCPURL(sseConfig.url)
843+
if isStreamableMCP {
844+
endpointURL = sseConfig.url
845+
}
846+
847+
let sessionConfig = URLSessionConfiguration.default
848+
849+
var request = URLRequest(url: sseConfig.url)
850+
request.httpMethod = "GET"
851+
applyConfiguredSSEHeaders(&request, sseConfig: sseConfig)
852+
request.setValue("text/event-stream", forHTTPHeaderField: "Accept")
853+
854+
// Use a streaming delegate since URLSession.bytes is unavailable on Linux
855+
let proxy = self
856+
let delegate = SSEStreamingDelegate { response in
857+
Task {
858+
await proxy.handleSSEResponse(response, sseConfig: sseConfig, isStreamableMCP: isStreamableMCP)
859+
}
860+
}
861+
862+
let session = URLSession(configuration: sessionConfig, delegate: delegate, delegateQueue: nil)
863+
let task = session.dataTask(with: request)
864+
task.resume()
865+
866+
streamTask = Task {
867+
do {
868+
for try await message in delegate.lines.sseMessages() {
869+
await self.processIncomingMessage(event: message.event, data: message.data)
870+
}
871+
self.handleStreamTermination(
872+
MCPServerProxyError.communicationError(
873+
"SSE stream closed by server before response was received"
874+
)
875+
)
876+
} catch is CancellationError {
877+
task.cancel()
878+
} catch {
879+
self.logger.error("[MCP DEBUG] SSE stream error: \(error)")
880+
self.handleStreamTermination(error)
881+
}
882+
}
883+
884+
try await waitForEndpointIfNeeded(isStreamableMCP: isStreamableMCP)
885+
try await initialize()
886+
}
887+
#endif
888+
889+
// MARK: - Shared SSE Helpers
890+
891+
private func handleSSEResponse(_ response: URLResponse, sseConfig: MCPServerSseConfig, isStreamableMCP: Bool) {
892+
if let httpResponse = response as? HTTPURLResponse {
893+
sessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id")
860894
if isStreamableMCP {
861895
endpointURL = sseConfig.url
862896
} else if let sessionID,
@@ -865,7 +899,22 @@ public final actor MCPServerProxy: Sendable {
865899
}
866900
}
867901
}
868-
#endif
902+
903+
private func waitForEndpointIfNeeded(isStreamableMCP: Bool) async throws {
904+
if !isStreamableMCP && endpointURL == nil {
905+
let _ = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<URL, Error>) in
906+
self.endpointContinuation = continuation
907+
908+
Task {
909+
try await Task.sleep(nanoseconds: 10_000_000_000)
910+
if let cont = self.endpointContinuation {
911+
self.endpointContinuation = nil
912+
cont.resume(throwing: MCPServerProxyError.communicationError("Timeout waiting for endpoint URL"))
913+
}
914+
}
915+
}
916+
}
917+
}
869918

870919
private func messageEndpointURL(baseURL: URL, sessionId: String) -> URL? {
871920
guard var components = URLComponents(url: baseURL, resolvingAgainstBaseURL: false) else {
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import Foundation
2+
#if canImport(FoundationNetworking)
3+
import FoundationNetworking
4+
#endif
5+
6+
/// A `URLSession` delegate that feeds received bytes into an `AsyncStream`.
7+
/// Used on Linux where `URLSession.bytes(for:)` is unavailable.
8+
final class SSEStreamingDelegate: NSObject, URLSessionDataDelegate, @unchecked Sendable {
9+
private let lineContinuation: AsyncStream<String>.Continuation
10+
private var buffer = Data()
11+
private var responseHandler: ((URLResponse) -> Void)?
12+
13+
/// The async stream of lines received from the SSE connection.
14+
let lines: AsyncStream<String>
15+
16+
/// The HTTP response, available after the first data callback.
17+
private(set) var response: URLResponse?
18+
private let responseContinuation: CheckedContinuation<URLResponse, Never>?
19+
20+
init(onResponse: @escaping (URLResponse) -> Void) {
21+
var cont: AsyncStream<String>.Continuation!
22+
self.lines = AsyncStream<String> { cont = $0 }
23+
self.lineContinuation = cont
24+
self.responseHandler = onResponse
25+
self.responseContinuation = nil
26+
super.init()
27+
}
28+
29+
func urlSession(
30+
_ session: URLSession,
31+
dataTask: URLSessionDataTask,
32+
didReceive response: URLResponse,
33+
completionHandler: @escaping (URLSession.ResponseDisposition) -> Void
34+
) {
35+
self.response = response
36+
responseHandler?(response)
37+
responseHandler = nil
38+
completionHandler(.allow)
39+
}
40+
41+
func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
42+
buffer.append(data)
43+
44+
// Split buffer on newlines and emit complete lines
45+
while let newlineRange = buffer.range(of: Data([0x0A])) {
46+
let lineData = buffer.subdata(in: buffer.startIndex..<newlineRange.lowerBound)
47+
buffer.removeSubrange(buffer.startIndex...newlineRange.lowerBound)
48+
49+
// Strip trailing \r if present
50+
let cleanData: Data
51+
if lineData.last == 0x0D {
52+
cleanData = lineData.dropLast()
53+
} else {
54+
cleanData = lineData
55+
}
56+
57+
let line = String(data: cleanData, encoding: .utf8) ?? ""
58+
lineContinuation.yield(line)
59+
}
60+
}
61+
62+
func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
63+
// Flush any remaining data in buffer as a final line
64+
if !buffer.isEmpty {
65+
let line = String(data: buffer, encoding: .utf8) ?? ""
66+
if !line.isEmpty {
67+
lineContinuation.yield(line)
68+
}
69+
buffer.removeAll()
70+
}
71+
lineContinuation.finish()
72+
}
73+
}

0 commit comments

Comments
 (0)