@@ -103,7 +103,7 @@ public actor HTTPClientTransport: Actor, Transport {
103103 request. addValue ( sessionID, forHTTPHeaderField: " Mcp-Session-Id " )
104104 }
105105
106- let ( responseData , response) = try await session. data ( for: request)
106+ let ( responseStream , response) = try await session. bytes ( for: request)
107107
108108 guard let httpResponse = response as? HTTPURLResponse else {
109109 throw MCPError . internalError ( " Invalid HTTP response " )
@@ -120,19 +120,20 @@ public actor HTTPClientTransport: Actor, Transport {
120120
121121 // Handle different response types
122122 switch httpResponse. statusCode {
123- case 200 , 201 , 202 :
123+ case 200 ..< 300 where contentType . contains ( " text/event-stream " ) :
124124 // For SSE, the processing happens in the streaming task
125- if contentType. contains ( " text/event-stream " ) {
126- logger. debug ( " Received SSE response, processing in streaming task " )
127- // The streaming is handled by the SSE task if active
128- return
129- }
125+ logger. debug ( " Received SSE response, processing in streaming task " )
126+ try await self . processSSE ( responseStream)
130127
128+ case 200 ..< 300 where contentType. contains ( " application/json " ) :
131129 // For JSON responses, deliver the data directly
132- if contentType . contains ( " application/json " ) && !responseData . isEmpty {
133- logger . debug ( " Received JSON response " , metadata : [ " size " : " \( responseData . count ) " ] )
134- messageContinuation . yield ( responseData )
130+ var buffer = Data ( )
131+ for try await byte in responseStream {
132+ buffer . append ( byte )
135133 }
134+ logger. debug ( " Received JSON response " , metadata: [ " size " : " \( buffer. count) " ] )
135+ messageContinuation. yield ( buffer)
136+
136137 case 404 :
137138 // If we get a 404 with a session ID, it means our session is invalid
138139 if sessionID != nil {
@@ -141,8 +142,16 @@ public actor HTTPClientTransport: Actor, Transport {
141142 throw MCPError . internalError ( " Session expired " )
142143 }
143144 throw MCPError . internalError ( " Endpoint not found " )
145+
146+ case 405 :
147+ // If we get a 405, it means the server does not support streaming,
148+ // so we should cancel the streaming task.
149+ self . streamingTask? . cancel ( )
150+ throw MCPError . internalError ( " Server does not support streaming " )
151+
144152 default :
145- throw MCPError . internalError ( " HTTP error: \( httpResponse. statusCode) " )
153+ throw MCPError . internalError (
154+ " Unexpected HTTP response: \( httpResponse. statusCode) \( contentType) " )
146155 }
147156 }
148157
@@ -175,6 +184,10 @@ public actor HTTPClientTransport: Actor, Transport {
175184 private func connectToEventStream( ) async throws {
176185 logger. warning ( " SSE is not supported on this platform " )
177186 }
187+
188+ private func processSSE( _ stream: URLSession . AsyncBytes ) async throws {
189+ logger. warning ( " SSE is not supported on this platform " )
190+ }
178191 #else
179192 /// Establishes an SSE connection to the server
180193 private func connectToEventStream( ) async throws {
@@ -216,7 +229,10 @@ public actor HTTPClientTransport: Actor, Transport {
216229 logger. debug ( " Session ID received " , metadata: [ " sessionID " : " \( newSessionID) " ] )
217230 }
218231
219- // Process the SSE stream
232+ try await self . processSSE ( stream)
233+ }
234+
235+ private func processSSE( _ stream: URLSession . AsyncBytes ) async throws {
220236 do {
221237 for try await event in stream. events {
222238 // Check if task has been cancelled
0 commit comments