Skip to content

Commit b1a619f

Browse files
committed
- Reverter use of lines (it swallows newlines)
- Fiexed original issue I experienced with line 229 `while let newlineIndex = buffer.firstIndex(of: "\n") {` not handling "\r\n" properly - Added unit test to test for "\r\n" scenario - Simplified `end of event` test - `line` creation trims newline, added code trims carriage return. Only thing that should be left is an empty line
1 parent 653bf08 commit b1a619f

File tree

2 files changed

+116
-64
lines changed

2 files changed

+116
-64
lines changed

Sources/MCP/Base/Transports/HTTPClientTransport.swift

Lines changed: 82 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -179,107 +179,125 @@ public actor HTTPClientTransport: Actor, Transport {
179179
/// Establishes an SSE connection to the server
180180
private func connectToEventStream() async throws {
181181
guard isConnected else { return }
182-
182+
183183
var request = URLRequest(url: endpoint)
184184
request.httpMethod = "GET"
185185
request.addValue("text/event-stream", forHTTPHeaderField: "Accept")
186-
186+
187187
// Add session ID if available
188188
if let sessionID = sessionID {
189189
request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id")
190190
}
191-
191+
192192
// Add Last-Event-ID header for resumability if available
193193
if let lastEventID = lastEventID {
194194
request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID")
195195
}
196-
196+
197197
logger.debug("Starting SSE connection")
198-
198+
199199
// Create URLSession task for SSE
200200
let (stream, response) = try await session.bytes(for: request)
201-
201+
202202
guard let httpResponse = response as? HTTPURLResponse else {
203203
throw MCPError.internalError("Invalid HTTP response")
204204
}
205-
205+
206206
// Check response status
207207
guard httpResponse.statusCode == 200 else {
208208
throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)")
209209
}
210-
210+
211211
// Extract session ID if present
212212
if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") {
213213
self.sessionID = newSessionID
214214
}
215-
215+
216216
// Process the SSE stream
217+
var buffer = ""
217218
var eventType = ""
218219
var eventID: String?
219220
var eventData = ""
220-
221-
for try await line in stream.lines {
221+
222+
for try await byte in stream {
222223
if Task.isCancelled { break }
223-
224-
// Empty line marks the end of an event
225-
if line.isEmpty {
226-
if !eventData.isEmpty {
227-
// Process the event
228-
if eventType == "id" {
229-
lastEventID = eventID
230-
} else {
231-
// Default event type is "message" if not specified
232-
if let data = eventData.data(using: .utf8) {
233-
logger.debug(
234-
"SSE event received",
235-
metadata: [
236-
"type": "\(eventType.isEmpty ? "message" : eventType)",
237-
"id": "\(eventID ?? "none")",
238-
])
239-
messageContinuation.yield(data)
224+
225+
guard let char = String(bytes: [byte], encoding: .utf8) else { continue }
226+
buffer.append(char)
227+
228+
// Process complete lines
229+
while let newlineIndex = buffer.utf8.firstIndex(where: { $0 == 10 }) {
230+
var line = buffer[..<newlineIndex]
231+
if line.hasSuffix("\r") {
232+
line = line.dropLast()
233+
}
234+
235+
buffer = String(buffer[buffer.index(after: newlineIndex)...])
236+
237+
// Empty line marks the end of an event
238+
if line.isEmpty {
239+
if !eventData.isEmpty {
240+
// Process the event
241+
if eventType == "id" {
242+
lastEventID = eventID
243+
} else {
244+
// Default event type is "message" if not specified
245+
if let data = eventData.data(using: .utf8) {
246+
logger.debug(
247+
"SSE event received",
248+
metadata: [
249+
"type": "\(eventType.isEmpty ? "message" : eventType)",
250+
"id": "\(eventID ?? "none")",
251+
])
252+
messageContinuation.yield(data)
253+
}
240254
}
255+
256+
// Reset for next event
257+
eventType = ""
258+
eventData = ""
241259
}
242-
243-
// Reset for next event
244-
eventType = ""
245-
eventData = ""
260+
continue
246261
}
247-
continue
248-
}
249-
250-
// Lines starting with ":" are comments
251-
if line.hasPrefix(":") { continue }
252-
253-
// Parse field: value format
254-
if let colonIndex = line.firstIndex(of: ":") {
255-
let field = String(line[..<colonIndex])
256-
var value = String(line[line.index(after: colonIndex)...])
257-
258-
// Trim leading space
259-
if value.hasPrefix(" ") {
260-
value = String(value.dropFirst())
262+
263+
if line.hasSuffix("\r") {
264+
line = line.dropLast()
261265
}
262266

263-
// Process based on field
264-
switch field {
265-
case "event":
266-
eventType = value
267-
case "data":
268-
if !eventData.isEmpty {
269-
eventData.append("\n")
267+
// Lines starting with ":" are comments
268+
if line.hasPrefix(":") { continue }
269+
270+
// Parse field: value format
271+
if let colonIndex = line.firstIndex(of: ":") {
272+
let field = String(line[..<colonIndex])
273+
var value = String(line[line.index(after: colonIndex)...])
274+
275+
// Trim leading space
276+
if value.hasPrefix(" ") {
277+
value = String(value.dropFirst())
270278
}
271-
eventData.append(value)
272-
case "id":
273-
if !value.contains("\0") { // ID must not contain NULL
274-
eventID = value
275-
lastEventID = value
279+
280+
// Process based on field
281+
switch field {
282+
case "event":
283+
eventType = value
284+
case "data":
285+
if !eventData.isEmpty {
286+
eventData.append("\n")
287+
}
288+
eventData.append(value)
289+
case "id":
290+
if !value.contains("\0") { // ID must not contain NULL
291+
eventID = value
292+
lastEventID = value
293+
}
294+
case "retry":
295+
// Retry timing not implemented
296+
break
297+
default:
298+
// Unknown fields are ignored per SSE spec
299+
break
276300
}
277-
case "retry":
278-
// Retry timing not implemented
279-
break
280-
default:
281-
// Unknown fields are ignored per SSE spec
282-
break
283301
}
284302
}
285303
}

Tests/MCPTests/HTTPClientTransportTests.swift

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,40 @@ import Testing
411411
let expectedData = #"{"key":"value"}"#.data(using: .utf8)!
412412
let receivedData = try await iterator.next()
413413

414+
#expect(receivedData == expectedData)
415+
}
416+
@Test("Receive Server-Sent Event (SSE) (CR-NL)", .httpClientTransportSetup)
417+
func testReceiveSSE_CRNL() async throws {
418+
let configuration = URLSessionConfiguration.ephemeral
419+
configuration.protocolClasses = [MockURLProtocol.self]
420+
421+
let transport = HTTPClientTransport(
422+
endpoint: testEndpoint, configuration: configuration, streaming: true,
423+
logger: nil)
424+
425+
let eventString = "id: event1\r\ndata: {\"key\":\"value\"}\r\n\n"
426+
let sseEventData = eventString.data(using: .utf8)!
427+
428+
await MockURLProtocol.requestHandlerStorage.setHandler {
429+
[testEndpoint] (request: URLRequest) in
430+
#expect(request.url == testEndpoint)
431+
#expect(request.httpMethod == "GET")
432+
#expect(request.value(forHTTPHeaderField: "Accept") == "text/event-stream")
433+
let response = HTTPURLResponse(
434+
url: testEndpoint, statusCode: 200, httpVersion: "HTTP/1.1",
435+
headerFields: ["Content-Type": "text/event-stream"])!
436+
return (response, sseEventData)
437+
}
438+
439+
try await transport.connect()
440+
try await Task.sleep(for: .milliseconds(100))
441+
442+
let stream = await transport.receive()
443+
var iterator = stream.makeAsyncIterator()
444+
445+
let expectedData = #"{"key":"value"}"#.data(using: .utf8)!
446+
let receivedData = try await iterator.next()
447+
414448
#expect(receivedData == expectedData)
415449
}
416450
#endif // !canImport(FoundationNetworking)

0 commit comments

Comments
 (0)