diff --git a/Sources/MCP/Base/Transports.swift b/Sources/MCP/Base/Transports.swift index 704c0f5e..71c5a694 100644 --- a/Sources/MCP/Base/Transports.swift +++ b/Sources/MCP/Base/Transports.swift @@ -180,8 +180,8 @@ public actor StdioTransport: Transport { public nonisolated let logger: Logger private var isConnected = false - private let messageStream: AsyncStream - private let messageContinuation: AsyncStream.Continuation + private let messageStream: AsyncThrowingStream + private let messageContinuation: AsyncThrowingStream.Continuation // Track connection state for continuations private var connectionContinuationResumed = false @@ -196,8 +196,8 @@ public actor StdioTransport: Transport { ) // Create message stream - var continuation: AsyncStream.Continuation! - self.messageStream = AsyncStream { continuation = $0 } + var continuation: AsyncThrowingStream.Continuation! + self.messageStream = AsyncThrowingStream { continuation = $0 } self.messageContinuation = continuation } @@ -330,10 +330,14 @@ public actor StdioTransport: Transport { public func receive() -> AsyncThrowingStream { return AsyncThrowingStream { continuation in Task { - for await message in messageStream { - continuation.yield(message) + do { + for try await message in messageStream { + continuation.yield(message) + } + continuation.finish() + } catch { + continuation.finish(throwing: error) } - continuation.finish() } } } @@ -361,6 +365,7 @@ public actor StdioTransport: Transport { } catch { if !Task.isCancelled { logger.error("Receive error: \(error)") + messageContinuation.finish(throwing: error) } break }