forked from modelcontextprotocol/swift-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNetworkTransport.swift
More file actions
804 lines (685 loc) · 33.4 KB
/
NetworkTransport.swift
File metadata and controls
804 lines (685 loc) · 33.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
import Foundation
import Logging
#if canImport(Network)
import Network
/// Protocol that abstracts the Network.NWConnection functionality needed for NetworkTransport
@preconcurrency protocol NetworkConnectionProtocol {
var state: NWConnection.State { get }
var stateUpdateHandler: ((@Sendable (NWConnection.State) -> Void))? { get set }
func start(queue: DispatchQueue)
func cancel()
func send(
content: Data?, contentContext: NWConnection.ContentContext, isComplete: Bool,
completion: NWConnection.SendCompletion)
func receive(
minimumIncompleteLength: Int, maximumLength: Int,
completion: @escaping @Sendable (
Data?, NWConnection.ContentContext?, Bool, NWError?
) -> Void)
}
/// Extension to conform NWConnection to internal NetworkConnectionProtocol
extension NWConnection: NetworkConnectionProtocol {}
/// An implementation of a custom MCP transport using Apple's Network framework.
///
/// This transport allows MCP clients and servers to communicate over TCP/UDP connections
/// using Apple's Network framework.
///
/// - Important: This transport is available exclusively on Apple platforms
/// (macOS, iOS, watchOS, tvOS, visionOS) as it depends on the Network framework.
///
/// ## Example Usage
///
/// ```swift
/// import MCP
/// import Network
///
/// // Create a TCP connection to a server
/// let connection = NWConnection(
/// host: NWEndpoint.Host("localhost"),
/// port: NWEndpoint.Port(8080)!,
/// using: .tcp
/// )
///
/// // Initialize the transport with the connection
/// let transport = NetworkTransport(connection: connection)
///
/// // For large messages (e.g., images), configure unlimited buffer size
/// let largeBufferTransport = NetworkTransport(
/// connection: connection,
/// bufferConfig: .unlimited
/// )
///
/// // Use the transport with an MCP client
/// let client = Client(name: "MyApp", version: "1.0.0")
/// try await client.connect(transport: transport)
/// ```
public actor NetworkTransport: Transport {
/// Represents a heartbeat message for connection health monitoring.
public struct Heartbeat: RawRepresentable, Hashable, Sendable {
/// Magic bytes used to identify a heartbeat message.
private static let magicBytes: [UInt8] = [0xF0, 0x9F, 0x92, 0x93]
/// The timestamp of when the heartbeat was created.
public let timestamp: Date
/// Creates a new heartbeat with the current timestamp.
public init() {
self.timestamp = Date()
}
/// Creates a heartbeat with a specific timestamp.
///
/// - Parameter timestamp: The timestamp for the heartbeat.
public init(timestamp: Date) {
self.timestamp = timestamp
}
// MARK: - RawRepresentable
public typealias RawValue = [UInt8]
/// Creates a heartbeat from its raw representation.
///
/// - Parameter rawValue: The raw bytes of the heartbeat message.
/// - Returns: A heartbeat if the raw value is valid, nil otherwise.
public init?(rawValue: [UInt8]) {
// Check if the data has the correct format (magic bytes + timestamp)
guard rawValue.count >= 12,
rawValue.prefix(4).elementsEqual(Self.magicBytes)
else {
return nil
}
// Extract the timestamp
let timestampData = Data(rawValue[4..<12])
let timestamp = timestampData.withUnsafeBytes {
$0.load(as: UInt64.self)
}
self.timestamp = Date(
timeIntervalSinceReferenceDate: TimeInterval(timestamp) / 1000.0)
}
/// Converts the heartbeat to its raw representation.
public var rawValue: [UInt8] {
var result = Data(Self.magicBytes)
// Add timestamp (milliseconds since reference date)
let timestamp = UInt64(self.timestamp.timeIntervalSinceReferenceDate * 1000)
withUnsafeBytes(of: timestamp) { buffer in
result.append(contentsOf: buffer)
}
return Array(result)
}
/// Converts the heartbeat to Data.
public var data: Data {
return Data(self.rawValue)
}
/// Checks if the given data represents a heartbeat message.
///
/// - Parameter data: The data to check.
/// - Returns: true if the data is a heartbeat message, false otherwise.
public static func isHeartbeat(_ data: Data) -> Bool {
guard data.count >= 4 else {
return false
}
return data.prefix(4).elementsEqual(Self.magicBytes)
}
/// Attempts to parse a heartbeat from the given data.
///
/// - Parameter data: The data to parse.
/// - Returns: A heartbeat if the data is valid, nil otherwise.
public static func from(data: Data) -> Heartbeat? {
guard data.count >= 12 else {
return nil
}
return Heartbeat(rawValue: Array(data))
}
}
/// Configuration for heartbeat behavior.
public struct HeartbeatConfiguration: Hashable, Sendable {
/// Whether heartbeats are enabled.
public let enabled: Bool
/// Interval between heartbeats in seconds.
public let interval: TimeInterval
/// Creates a new heartbeat configuration.
///
/// - Parameters:
/// - enabled: Whether heartbeats are enabled (default: true)
/// - interval: Interval in seconds between heartbeats (default: 15.0)
public init(enabled: Bool = true, interval: TimeInterval = 15.0) {
self.enabled = enabled
self.interval = interval
}
/// Default heartbeat configuration.
public static let `default` = HeartbeatConfiguration()
/// Configuration with heartbeats disabled.
public static let disabled = HeartbeatConfiguration(enabled: false)
}
/// Configuration for connection retry behavior.
public struct ReconnectionConfiguration: Hashable, Sendable {
/// Whether the transport should attempt to reconnect on failure.
public let enabled: Bool
/// Maximum number of reconnection attempts.
public let maxAttempts: Int
/// Multiplier for exponential backoff on reconnect.
public let backoffMultiplier: Double
/// Creates a new reconnection configuration.
///
/// - Parameters:
/// - enabled: Whether reconnection should be attempted on failure (default: true)
/// - maxAttempts: Maximum number of reconnection attempts (default: 5)
/// - backoffMultiplier: Multiplier for exponential backoff on reconnect (default: 1.5)
public init(
enabled: Bool = true,
maxAttempts: Int = 5,
backoffMultiplier: Double = 1.5
) {
self.enabled = enabled
self.maxAttempts = maxAttempts
self.backoffMultiplier = backoffMultiplier
}
/// Default reconnection configuration.
public static let `default` = ReconnectionConfiguration()
/// Configuration with reconnection disabled.
public static let disabled = ReconnectionConfiguration(enabled: false)
/// Calculates the backoff delay for a given attempt number.
///
/// - Parameter attempt: The current attempt number (1-based)
/// - Returns: The delay in seconds before the next attempt
public func backoffDelay(for attempt: Int) -> TimeInterval {
let baseDelay = 0.5 // 500ms
return baseDelay * pow(backoffMultiplier, Double(attempt - 1))
}
}
/// Configuration for buffer behavior.
public struct BufferConfiguration: Hashable, Sendable {
/// Maximum buffer size for receiving data chunks.
/// Set to nil for unlimited (uses system default).
public let maxReceiveBufferSize: Int?
/// Creates a new buffer configuration.
///
/// - Parameter maxReceiveBufferSize: Maximum buffer size in bytes (default: 10MB, nil for unlimited)
public init(maxReceiveBufferSize: Int? = 10 * 1024 * 1024) {
self.maxReceiveBufferSize = maxReceiveBufferSize
}
/// Default buffer configuration with 10MB limit.
public static let `default` = BufferConfiguration()
/// Configuration with no buffer size limit.
public static let unlimited = BufferConfiguration(maxReceiveBufferSize: nil)
}
// State tracking
private var isConnected = false
private var isStopping = false
private var reconnectAttempt = 0
private var heartbeatTask: Task<Void, Never>?
private var lastHeartbeatTime: Date?
private let messageStream: AsyncThrowingStream<Data, Swift.Error>
private let messageContinuation: AsyncThrowingStream<Data, Swift.Error>.Continuation
// Connection is marked nonisolated(unsafe) to allow access from closures
private nonisolated(unsafe) var connection: NetworkConnectionProtocol
/// Logger instance for transport-related events
public nonisolated let logger: Logger
// Configuration
private let heartbeatConfig: HeartbeatConfiguration
private let reconnectionConfig: ReconnectionConfiguration
private let bufferConfig: BufferConfiguration
/// Creates a new NetworkTransport with the specified NWConnection
///
/// - Parameters:
/// - connection: The NWConnection to use for communication
/// - logger: Optional logger instance for transport events
/// - reconnectionConfig: Configuration for reconnection behavior (default: .default)
/// - heartbeatConfig: Configuration for heartbeat behavior (default: .default)
/// - bufferConfig: Configuration for buffer behavior (default: .default)
public init(
connection: NWConnection,
logger: Logger? = nil,
heartbeatConfig: HeartbeatConfiguration = .default,
reconnectionConfig: ReconnectionConfiguration = .default,
bufferConfig: BufferConfiguration = .default
) {
self.init(
connection,
logger: logger,
heartbeatConfig: heartbeatConfig,
reconnectionConfig: reconnectionConfig,
bufferConfig: bufferConfig
)
}
init(
_ connection: NetworkConnectionProtocol,
logger: Logger? = nil,
heartbeatConfig: HeartbeatConfiguration = .default,
reconnectionConfig: ReconnectionConfiguration = .default,
bufferConfig: BufferConfiguration = .default
) {
self.connection = connection
self.logger =
logger
?? Logger(
label: "mcp.transport.network",
factory: { _ in SwiftLogNoOpLogHandler() }
)
self.reconnectionConfig = reconnectionConfig
self.heartbeatConfig = heartbeatConfig
self.bufferConfig = bufferConfig
// Create message stream
var continuation: AsyncThrowingStream<Data, Swift.Error>.Continuation!
self.messageStream = AsyncThrowingStream { continuation = $0 }
self.messageContinuation = continuation
}
isolated deinit {
heartbeatTask?.cancel()
messageContinuation.finish()
}
/// Establishes connection with the transport
///
/// This initiates the NWConnection and waits for it to become ready.
/// Once the connection is established, it starts the message receiving loop.
///
/// - Throws: Error if the connection fails to establish
public func connect() async throws {
guard !isConnected else { return }
// Reset state for fresh connection
isStopping = false
reconnectAttempt = 0
// Retry loop with exponential backoff
while !isConnected && reconnectAttempt <= reconnectionConfig.maxAttempts {
do {
try await attemptConnection()
return // Success
} catch {
guard !isStopping,
reconnectionConfig.enabled,
reconnectAttempt < reconnectionConfig.maxAttempts
else {
throw error
}
// Schedule retry with backoff
reconnectAttempt += 1
let delay = reconnectionConfig.backoffDelay(for: reconnectAttempt)
logger.debug(
"Attempting reconnection (\(reconnectAttempt)/\(reconnectionConfig.maxAttempts))..."
)
try await Task.sleep(for: .seconds(delay))
connection.cancel()
}
}
throw MCPError.internalError("Failed to connect after \(reconnectAttempt) attempts")
}
/// Attempts a single connection
///
/// Creates a stream for connection state changes and waits for a terminal state.
///
/// - Throws: Error if the connection fails
private func attemptConnection() async throws {
// Create stream for state changes with proper cleanup
let stateStream = AsyncStream<NWConnection.State> { continuation in
continuation.onTermination = { [weak self] _ in
self?.connection.stateUpdateHandler = nil
}
connection.stateUpdateHandler = { state in
continuation.yield(state)
switch state {
case .ready, .failed, .cancelled:
continuation.finish()
default:
break
}
}
}
connection.start(queue: .main)
// Process states until terminal state
for await state in stateStream {
switch state {
case .ready:
await handleConnectionReady()
return // Success - exits loop and stream
case .failed(let error):
logger.error("Connection failed: \(error)")
throw error // Exits loop and stream
case .cancelled:
logger.warning("Connection cancelled")
throw MCPError.internalError("Connection cancelled")
case .waiting(let error):
logger.debug("Connection waiting: \(error)")
case .preparing:
logger.debug("Connection preparing...")
case .setup:
logger.debug("Connection setup...")
@unknown default:
logger.warning("Unknown connection state")
}
}
}
/// Handles when the connection reaches the ready state
private func handleConnectionReady() async {
isConnected = true
reconnectAttempt = 0
logger.debug("Network transport connected successfully")
// Start the receive loop after connection is established
Task(name: "MCP Network receive loop") { await self.receiveLoop() }
// Start heartbeat task if enabled
if heartbeatConfig.enabled {
startHeartbeat()
}
}
/// Starts a task to periodically send heartbeats to check connection health
private func startHeartbeat() {
// Cancel any existing heartbeat task
heartbeatTask?.cancel()
// Start a new heartbeat task
heartbeatTask = Task(name: "MCP Network heartbeat") { [weak self] in
guard let self = self else { return }
// Initial delay before starting heartbeats
try? await Task.sleep(for: .seconds(1))
while !Task.isCancelled {
do {
// Check actor-isolated properties first
let isStopping = await self.isStopping
let isConnected = await self.isConnected
guard !isStopping && isConnected else { break }
try await self.sendHeartbeat()
try await Task.sleep(for: .seconds(self.heartbeatConfig.interval))
} catch {
// If heartbeat fails, log and retry after a shorter interval
self.logger.warning("Heartbeat failed: \(error)")
try? await Task.sleep(for: .seconds(2))
}
}
}
}
/// Sends a heartbeat message to verify connection health
private func sendHeartbeat() async throws {
guard isConnected && !isStopping else { return }
// Try to send the heartbeat (without the newline delimiter used for normal messages)
try await withCheckedThrowingContinuation {
[weak self] (continuation: CheckedContinuation<Void, Swift.Error>) in
guard let self = self else {
continuation.resume(throwing: MCPError.internalError("Transport deallocated"))
return
}
connection.send(
content: Heartbeat().data,
contentContext: .defaultMessage,
isComplete: true,
completion: .contentProcessed { [weak self] error in
if let error = error {
continuation.resume(throwing: error)
} else {
Task { [weak self] in
await self?.setLastHeartbeatTime(Date())
}
continuation.resume()
}
})
}
logger.trace("Heartbeat sent")
}
/// Disconnects from the transport
///
/// This cancels the NWConnection, finalizes the message stream,
/// and releases associated resources.
public func disconnect() async {
guard isConnected else { return }
// Mark as stopping to prevent reconnection attempts during disconnect
isStopping = true
isConnected = false
// Cancel heartbeat task if it exists
heartbeatTask?.cancel()
heartbeatTask = nil
connection.cancel()
messageContinuation.finish()
logger.debug("Network transport disconnected")
}
/// Sends data through the network connection
///
/// This sends a JSON-RPC message through the NWConnection, adding a newline
/// delimiter to mark the end of the message.
///
/// - Parameter message: The JSON-RPC message to send
/// - Throws: MCPError for transport failures or connection issues
public func send(_ message: Data) async throws {
guard isConnected else {
throw MCPError.internalError("Transport not connected")
}
// Add newline as delimiter
var messageWithNewline = message
messageWithNewline.append(UInt8(ascii: "\n"))
// Use a local actor-isolated variable to track continuation state
var sendContinuationResumed = false
try await withCheckedThrowingContinuation {
[weak self] (continuation: CheckedContinuation<Void, Swift.Error>) in
guard let self = self else {
continuation.resume(throwing: MCPError.internalError("Transport deallocated"))
return
}
connection.send(
content: messageWithNewline,
contentContext: .defaultMessage,
isComplete: true,
completion: .contentProcessed { [weak self] error in
guard let self = self else { return }
Task { @MainActor in
if !sendContinuationResumed {
sendContinuationResumed = true
if let error = error {
self.logger.error("Send error: \(error)")
// Check if we should attempt to reconnect on send failure
let isStopping = await self.isStopping // Await actor-isolated property
if !isStopping && self.reconnectionConfig.enabled {
let isConnected = await self.isConnected
if isConnected {
if error.isConnectionLost {
self.logger.warning(
"Connection appears broken, will attempt to reconnect..."
)
// Schedule connection restart
Task { [weak self] in // Operate on self's executor
guard let self = self else { return }
await self.setIsConnected(false)
try? await Task.sleep(for: .milliseconds(500))
let currentIsStopping = await self.isStopping
if !currentIsStopping {
// Cancel the connection, then attempt to reconnect fully.
self.connection.cancel()
try? await self.connect()
}
}
}
}
}
continuation.resume(
throwing: MCPError.internalError("Send error: \(error)"))
} else {
continuation.resume()
}
}
}
})
}
}
/// Receives data in an async sequence
///
/// This returns an AsyncThrowingStream that emits Data objects representing
/// each JSON-RPC message received from the network connection.
///
/// - Returns: An AsyncThrowingStream of Data objects
public func receive() -> AsyncThrowingStream<Data, Swift.Error> {
return messageStream
}
/// Continuous loop to receive and process incoming messages
///
/// This method runs continuously while the connection is active,
/// receiving data and yielding complete messages to the message stream.
/// Messages are delimited by newline characters.
private func receiveLoop() async {
var buffer = Data()
var consecutiveEmptyReads = 0
let maxConsecutiveEmptyReads = 5
while isConnected && !Task.isCancelled && !isStopping {
do {
let newData = try await receiveData()
// Check for EOF or empty data
if newData.isEmpty {
consecutiveEmptyReads += 1
if consecutiveEmptyReads >= maxConsecutiveEmptyReads {
logger.warning(
"Multiple consecutive empty reads (\(consecutiveEmptyReads)), possible connection issue"
)
// Check connection state
if connection.state != .ready {
logger.warning("Connection no longer ready, exiting receive loop")
break
}
}
// Brief pause before retry
try await Task.sleep(for: .milliseconds(100))
continue
}
// Check if this is a heartbeat message
if Heartbeat.isHeartbeat(newData) {
logger.trace("Received heartbeat from peer")
// Extract timestamp if available
if let heartbeat = Heartbeat.from(data: newData) {
logger.trace("Heartbeat timestamp: \(heartbeat.timestamp)")
}
// Reset the counter since we got valid data
consecutiveEmptyReads = 0
continue // Skip regular message processing for heartbeats
}
// Reset counter on successful data read
consecutiveEmptyReads = 0
buffer.append(newData)
// Process complete messages
while let newlineIndex = buffer.firstIndex(of: UInt8(ascii: "\n")) {
let messageData = buffer[..<newlineIndex]
buffer = buffer[(newlineIndex + 1)...]
if !messageData.isEmpty {
logger.debug(
"Message received", metadata: ["size": "\(messageData.count)"])
messageContinuation.yield(Data(messageData))
}
}
} catch let error as NWError {
if !Task.isCancelled && !isStopping {
logger.error("Network error occurred", metadata: ["error": "\(error)"])
// Check for specific connection-related errors
if error.isConnectionLost {
// If we should reconnect, don't finish the message stream yet
if reconnectionConfig.enabled
&& reconnectAttempt < reconnectionConfig.maxAttempts
{
reconnectAttempt += 1
logger.warning(
"Network connection lost, attempting reconnection (\(reconnectAttempt)/\(reconnectionConfig.maxAttempts))..."
)
// Mark as not connected while attempting reconnection
isConnected = false
// Schedule reconnection attempt
Task(name: "MCP Network reconnect") {
let delay = reconnectionConfig.backoffDelay(
for: reconnectAttempt)
try? await Task.sleep(for: .seconds(delay))
if !isStopping {
// Cancel the connection, then attempt to reconnect fully.
self.connection.cancel()
try? await self.connect()
// If connect succeeded, a new receive loop will be started
}
}
// Exit this receive loop since we're starting a new one after reconnect
break
} else {
// We're not reconnecting, finish the message stream with error
messageContinuation.finish(
throwing: MCPError.transportError(error))
break
}
} else {
// For other network errors, log but continue trying
do {
try await Task.sleep(for: .milliseconds(100)) // 100ms pause
continue
} catch {
logger.error("Failed to sleep after network error: \(error)")
break
}
}
}
break
} catch {
if !Task.isCancelled && !isStopping {
logger.error("Receive error: \(error)")
if reconnectionConfig.enabled
&& reconnectAttempt < reconnectionConfig.maxAttempts
{
// Similar reconnection logic for other errors
reconnectAttempt += 1
logger.warning(
"Error during receive, attempting reconnection (\(reconnectAttempt)/\(reconnectionConfig.maxAttempts))..."
)
isConnected = false
Task(name: "MCP Network reconnect") {
let delay = reconnectionConfig.backoffDelay(for: reconnectAttempt)
try? await Task.sleep(for: .seconds(delay))
if !isStopping {
self.connection.cancel()
try? await connect()
}
}
break
} else {
messageContinuation.finish(throwing: error)
}
}
break
}
}
// If stopping normally, finish the stream without error
if isStopping {
logger.debug("Receive loop stopping normally")
messageContinuation.finish()
}
}
/// Receives a chunk of data from the network connection
///
/// - Returns: The received data chunk
/// - Throws: Network errors or transport failures
private func receiveData() async throws -> Data {
var receiveContinuationResumed = false
return try await withCheckedThrowingContinuation {
[weak self] (continuation: CheckedContinuation<Data, Swift.Error>) in
guard let self = self else {
continuation.resume(throwing: MCPError.internalError("Transport deallocated"))
return
}
let maxLength = bufferConfig.maxReceiveBufferSize ?? Int.max
connection.receive(minimumIncompleteLength: 1, maximumLength: maxLength) {
content, _, isComplete, error in
Task { @MainActor in
if !receiveContinuationResumed {
receiveContinuationResumed = true
if let error = error {
continuation.resume(throwing: MCPError.transportError(error))
} else if let content = content {
continuation.resume(returning: content)
} else if isComplete {
self.logger.trace("Connection completed by peer")
continuation.resume(throwing: MCPError.connectionClosed)
} else {
// EOF: Resume with empty data instead of throwing an error
continuation.resume(returning: Data())
}
}
}
}
}
}
private func setLastHeartbeatTime(_ time: Date) {
self.lastHeartbeatTime = time
}
private func setIsConnected(_ connected: Bool) {
self.isConnected = connected
}
}
extension NWError {
/// Whether this error indicates a connection has been lost or reset.
fileprivate var isConnectionLost: Bool {
let nsError = self as NSError
return nsError.code == 57 // Socket is not connected (EHOSTUNREACH or ENOTCONN)
|| nsError.code == 54 // Connection reset by peer (ECONNRESET)
}
}
#endif