diff --git a/Package.resolved b/Package.resolved index eaee8ae6..5e9023c5 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,6 +1,15 @@ { - "originHash" : "cce36cb33302c2f1c28458e19b8439f736fc28106e4c6ea95d7992c74594c242", + "originHash" : "08de61941b7919a65e36c0e34f8c1c41995469b86a39122158b75b4a68c4527d", "pins" : [ + { + "identity" : "eventsource", + "kind" : "remoteSourceControl", + "location" : "https://github.com/loopwork-ai/eventsource.git", + "state" : { + "revision" : "e83f076811f32757305b8bf69ac92d05626ffdd7", + "version" : "1.1.0" + } + }, { "identity" : "swift-log", "kind" : "remoteSourceControl", diff --git a/Package.swift b/Package.swift index 270b0458..9f1cb351 100644 --- a/Package.swift +++ b/Package.swift @@ -3,6 +3,25 @@ import PackageDescription +// Base dependencies needed on all platforms +var dependencies: [Package.Dependency] = [ + .package(url: "https://github.com/apple/swift-system.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-log.git", from: "1.5.0"), +] + +// Target dependencies needed on all platforms +var targetDependencies: [Target.Dependency] = [ + .product(name: "SystemPackage", package: "swift-system"), + .product(name: "Logging", package: "swift-log"), +] + +// Add EventSource only on Apple platforms (non-Linux) +#if !os(Linux) + dependencies.append( + .package(url: "https://github.com/loopwork-ai/eventsource.git", from: "1.1.0")) + targetDependencies.append(.product(name: "EventSource", package: "eventsource")) +#endif + let package = Package( name: "mcp-swift-sdk", platforms: [ @@ -19,25 +38,15 @@ let package = Package( name: "MCP", targets: ["MCP"]) ], - dependencies: [ - .package(url: "https://github.com/apple/swift-system.git", from: "1.0.0"), - .package(url: "https://github.com/apple/swift-log.git", from: "1.5.0"), - ], + dependencies: dependencies, targets: [ // Targets are the basic building blocks of a package, defining a module or a test suite. // Targets can depend on other targets in this package and products from dependencies. .target( name: "MCP", - dependencies: [ - .product(name: "SystemPackage", package: "swift-system"), - .product(name: "Logging", package: "swift-log"), - ]), + dependencies: targetDependencies), .testTarget( name: "MCPTests", - dependencies: [ - "MCP", - .product(name: "SystemPackage", package: "swift-system"), - .product(name: "Logging", package: "swift-log"), - ]), + dependencies: ["MCP"] + targetDependencies), ] ) diff --git a/Sources/MCP/Base/Transports/HTTPClientTransport.swift b/Sources/MCP/Base/Transports/HTTPClientTransport.swift index 9cf93a0c..f9f7e40d 100644 --- a/Sources/MCP/Base/Transports/HTTPClientTransport.swift +++ b/Sources/MCP/Base/Transports/HTTPClientTransport.swift @@ -1,6 +1,10 @@ import Foundation import Logging +#if !os(Linux) + import EventSource +#endif + #if canImport(FoundationNetworking) import FoundationNetworking #endif @@ -11,7 +15,6 @@ public actor HTTPClientTransport: Actor, Transport { public private(set) var sessionID: String? private let streaming: Bool private var streamingTask: Task? - private var lastEventID: String? public nonisolated let logger: Logger private var isConnected = false @@ -103,36 +106,103 @@ public actor HTTPClientTransport: Actor, Transport { request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") } - let (responseData, response) = try await session.data(for: request) + #if os(Linux) + // Linux implementation using data(for:) instead of bytes(for:) + let (responseData, response) = try await session.data(for: request) + try await processResponse(response: response, data: responseData) + #else + // macOS and other platforms with bytes(for:) support + let (responseStream, response) = try await session.bytes(for: request) + try await processResponse(response: response, stream: responseStream) + #endif + } - guard let httpResponse = response as? HTTPURLResponse else { - throw MCPError.internalError("Invalid HTTP response") - } + #if os(Linux) + // Process response with data payload (Linux) + private func processResponse(response: URLResponse, data: Data) async throws { + guard let httpResponse = response as? HTTPURLResponse else { + throw MCPError.internalError("Invalid HTTP response") + } + + // Process the response based on content type and status code + let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? "" - // Process the response based on content type and status code - let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? "" + // Extract session ID if present + if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") { + self.sessionID = newSessionID + logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) + } - // Extract session ID if present - if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") { - self.sessionID = newSessionID - logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) + try processHTTPResponse(httpResponse, contentType: contentType) + guard case 200..<300 = httpResponse.statusCode else { return } + + // For JSON responses, yield the data + if contentType.contains("text/event-stream") { + logger.warning("SSE responses aren't fully supported on Linux") + messageContinuation.yield(data) + } else if contentType.contains("application/json") { + logger.debug("Received JSON response", metadata: ["size": "\(data.count)"]) + messageContinuation.yield(data) + } else { + logger.warning("Unexpected content type: \(contentType)") + } } + #else + // Process response with byte stream (macOS, iOS, etc.) + private func processResponse(response: URLResponse, stream: URLSession.AsyncBytes) + async throws + { + guard let httpResponse = response as? HTTPURLResponse else { + throw MCPError.internalError("Invalid HTTP response") + } + + // Process the response based on content type and status code + let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? "" + + // Extract session ID if present + if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") { + self.sessionID = newSessionID + logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) + } + + try processHTTPResponse(httpResponse, contentType: contentType) + guard case 200..<300 = httpResponse.statusCode else { return } - // Handle different response types - switch httpResponse.statusCode { - case 200, 201, 202: - // For SSE, the processing happens in the streaming task if contentType.contains("text/event-stream") { + // For SSE, processing happens via the stream logger.debug("Received SSE response, processing in streaming task") - // The streaming is handled by the SSE task if active - return + try await self.processSSE(stream) + } else if contentType.contains("application/json") { + // For JSON responses, collect and deliver the data + var buffer = Data() + for try await byte in stream { + buffer.append(byte) + } + logger.debug("Received JSON response", metadata: ["size": "\(buffer.count)"]) + messageContinuation.yield(buffer) + } else { + logger.warning("Unexpected content type: \(contentType)") } + } + #endif + + // Common HTTP response handling for all platforms + private func processHTTPResponse(_ response: HTTPURLResponse, contentType: String) throws { + // Handle status codes according to HTTP semantics + switch response.statusCode { + case 200..<300: + // Success range - these are handled by the platform-specific code + return + + case 400: + throw MCPError.internalError("Bad request") + + case 401: + throw MCPError.internalError("Authentication required") + + case 403: + throw MCPError.internalError("Access forbidden") - // For JSON responses, deliver the data directly - if contentType.contains("application/json") && !responseData.isEmpty { - logger.debug("Received JSON response", metadata: ["size": "\(responseData.count)"]) - messageContinuation.yield(responseData) - } case 404: // If we get a 404 with a session ID, it means our session is invalid if sessionID != nil { @@ -141,8 +211,29 @@ public actor HTTPClientTransport: Actor, Transport { throw MCPError.internalError("Session expired") } throw MCPError.internalError("Endpoint not found") + + case 405: + // If we get a 405, it means the server does not support the requested method + // If streaming was requested, we should cancel the streaming task + if streaming { + self.streamingTask?.cancel() + throw MCPError.internalError("Server does not support streaming") + } + throw MCPError.internalError("Method not allowed") + + case 408: + throw MCPError.internalError("Request timeout") + + case 429: + throw MCPError.internalError("Too many requests") + + case 500..<600: + // Server error range + throw MCPError.internalError("Server error: \(response.statusCode)") + default: - throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)") + throw MCPError.internalError( + "Unexpected HTTP response: \(response.statusCode) (\(contentType))") } } @@ -155,27 +246,33 @@ public actor HTTPClientTransport: Actor, Transport { /// Starts listening for server events using SSE private func startListeningForServerEvents() async { - guard isConnected else { return } + #if os(Linux) + // SSE is not fully supported on Linux + if streaming { + logger.warning( + "SSE streaming was requested but is not fully supported on Linux. SSE connection will not be attempted." + ) + } + #else + // This is the original code for platforms that support SSE + guard isConnected else { return } - // Retry loop for connection drops - while isConnected && !Task.isCancelled { - do { - try await connectToEventStream() - } catch { - if !Task.isCancelled { - logger.error("SSE connection error: \(error)") - // Wait before retrying - try? await Task.sleep(nanoseconds: 1_000_000_000) // 1 second + // Retry loop for connection drops + while isConnected && !Task.isCancelled { + do { + try await connectToEventStream() + } catch { + if !Task.isCancelled { + logger.error("SSE connection error: \(error)") + // Wait before retrying + try? await Task.sleep(for: .seconds(1)) + } } } - } + #endif } - #if canImport(FoundationNetworking) - private func connectToEventStream() async throws { - logger.warning("SSE is not supported on this platform") - } - #else + #if !os(Linux) /// Establishes an SSE connection to the server private func connectToEventStream() async throws { guard isConnected else { return } @@ -183,17 +280,13 @@ public actor HTTPClientTransport: Actor, Transport { var request = URLRequest(url: endpoint) request.httpMethod = "GET" request.addValue("text/event-stream", forHTTPHeaderField: "Accept") + request.addValue("no-cache", forHTTPHeaderField: "Cache-Control") // Add session ID if available if let sessionID = sessionID { request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") } - // Add Last-Event-ID header for resumability if available - if let lastEventID = lastEventID { - request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID") - } - logger.debug("Starting SSE connection") // Create URLSession task for SSE @@ -217,95 +310,34 @@ public actor HTTPClientTransport: Actor, Transport { // Extract session ID if present if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") { self.sessionID = newSessionID + logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) } - // Process the SSE stream - var buffer = "" - var eventType = "" - var eventID: String? - var eventData = "" - - for try await byte in stream { - if Task.isCancelled { break } - - guard let char = String(bytes: [byte], encoding: .utf8) else { continue } - buffer.append(char) - - // Process complete lines - while let newlineIndex = buffer.utf8.firstIndex(where: { $0 == 10 }) { - var line = buffer[..