Skip to content

Commit 653bf08

Browse files
committed
Optimized code in connectToEventStream to use .lines computed value of AsyncSequence to produce a AsyncLineSequence instead of building a lines manually.
1 parent 25dee3b commit 653bf08

File tree

1 file changed

+66
-76
lines changed

1 file changed

+66
-76
lines changed

Sources/MCP/Base/Transports/HTTPClientTransport.swift

Lines changed: 66 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -179,117 +179,107 @@ public actor HTTPClientTransport: Actor, Transport {
179179
/// Establishes an SSE connection to the server
180180
private func connectToEventStream() async throws {
181181
guard isConnected else { return }
182-
182+
183183
var request = URLRequest(url: endpoint)
184184
request.httpMethod = "GET"
185185
request.addValue("text/event-stream", forHTTPHeaderField: "Accept")
186-
186+
187187
// Add session ID if available
188188
if let sessionID = sessionID {
189189
request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id")
190190
}
191-
191+
192192
// Add Last-Event-ID header for resumability if available
193193
if let lastEventID = lastEventID {
194194
request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID")
195195
}
196-
196+
197197
logger.debug("Starting SSE connection")
198-
198+
199199
// Create URLSession task for SSE
200200
let (stream, response) = try await session.bytes(for: request)
201-
201+
202202
guard let httpResponse = response as? HTTPURLResponse else {
203203
throw MCPError.internalError("Invalid HTTP response")
204204
}
205-
205+
206206
// Check response status
207207
guard httpResponse.statusCode == 200 else {
208208
throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)")
209209
}
210-
210+
211211
// Extract session ID if present
212212
if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") {
213213
self.sessionID = newSessionID
214214
}
215-
215+
216216
// Process the SSE stream
217-
var buffer = ""
218217
var eventType = ""
219218
var eventID: String?
220219
var eventData = ""
221-
222-
for try await byte in stream {
220+
221+
for try await line in stream.lines {
223222
if Task.isCancelled { break }
224-
225-
guard let char = String(bytes: [byte], encoding: .utf8) else { continue }
226-
buffer.append(char)
227-
228-
// Process complete lines
229-
while let newlineIndex = buffer.firstIndex(of: "\n") {
230-
let line = buffer[..<newlineIndex]
231-
buffer = String(buffer[buffer.index(after: newlineIndex)...])
232-
233-
// Empty line marks the end of an event
234-
if line.isEmpty || line == "\r" || line == "\n" || line == "\r\n" {
235-
if !eventData.isEmpty {
236-
// Process the event
237-
if eventType == "id" {
238-
lastEventID = eventID
239-
} else {
240-
// Default event type is "message" if not specified
241-
if let data = eventData.data(using: .utf8) {
242-
logger.debug(
243-
"SSE event received",
244-
metadata: [
245-
"type": "\(eventType.isEmpty ? "message" : eventType)",
246-
"id": "\(eventID ?? "none")",
247-
])
248-
messageContinuation.yield(data)
249-
}
223+
224+
// Empty line marks the end of an event
225+
if line.isEmpty {
226+
if !eventData.isEmpty {
227+
// Process the event
228+
if eventType == "id" {
229+
lastEventID = eventID
230+
} else {
231+
// Default event type is "message" if not specified
232+
if let data = eventData.data(using: .utf8) {
233+
logger.debug(
234+
"SSE event received",
235+
metadata: [
236+
"type": "\(eventType.isEmpty ? "message" : eventType)",
237+
"id": "\(eventID ?? "none")",
238+
])
239+
messageContinuation.yield(data)
250240
}
251-
252-
// Reset for next event
253-
eventType = ""
254-
eventData = ""
255241
}
256-
continue
242+
243+
// Reset for next event
244+
eventType = ""
245+
eventData = ""
257246
}
258-
259-
// Lines starting with ":" are comments
260-
if line.hasPrefix(":") { continue }
261-
262-
// Parse field: value format
263-
if let colonIndex = line.firstIndex(of: ":") {
264-
let field = String(line[..<colonIndex])
265-
var value = String(line[line.index(after: colonIndex)...])
266-
267-
// Trim leading space
268-
if value.hasPrefix(" ") {
269-
value = String(value.dropFirst())
247+
continue
248+
}
249+
250+
// Lines starting with ":" are comments
251+
if line.hasPrefix(":") { continue }
252+
253+
// Parse field: value format
254+
if let colonIndex = line.firstIndex(of: ":") {
255+
let field = String(line[..<colonIndex])
256+
var value = String(line[line.index(after: colonIndex)...])
257+
258+
// Trim leading space
259+
if value.hasPrefix(" ") {
260+
value = String(value.dropFirst())
261+
}
262+
263+
// Process based on field
264+
switch field {
265+
case "event":
266+
eventType = value
267+
case "data":
268+
if !eventData.isEmpty {
269+
eventData.append("\n")
270270
}
271-
272-
// Process based on field
273-
switch field {
274-
case "event":
275-
eventType = value
276-
case "data":
277-
if !eventData.isEmpty {
278-
eventData.append("\n")
279-
}
280-
eventData.append(value)
281-
case "id":
282-
if !value.contains("\0") { // ID must not contain NULL
283-
eventID = value
284-
lastEventID = value
285-
}
286-
case "retry":
287-
// Retry timing not implemented
288-
break
289-
default:
290-
// Unknown fields are ignored per SSE spec
291-
break
271+
eventData.append(value)
272+
case "id":
273+
if !value.contains("\0") { // ID must not contain NULL
274+
eventID = value
275+
lastEventID = value
292276
}
277+
case "retry":
278+
// Retry timing not implemented
279+
break
280+
default:
281+
// Unknown fields are ignored per SSE spec
282+
break
293283
}
294284
}
295285
}

0 commit comments

Comments
 (0)