diff --git a/Package@swift-6.swift b/Package@swift-6.swift index 1a84f988d..e7ee68e33 100644 --- a/Package@swift-6.swift +++ b/Package@swift-6.swift @@ -207,7 +207,7 @@ extension Target { ], path: "Sources/GRPCCore", swiftSettings: [ - .swiftLanguageMode(.v5), + .swiftLanguageMode(.v6), .enableUpcomingFeature("ExistentialAny"), .enableUpcomingFeature("InternalImportsByDefault") ] diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift index d21b2d3bf..36364d22c 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift @@ -21,14 +21,11 @@ extension ClientRPCExecutor { @usableFromInline struct HedgingExecutor< Transport: ClientTransport, + Input: Sendable, + Output: Sendable, Serializer: MessageSerializer, Deserializer: MessageDeserializer - > { - @usableFromInline - typealias Input = Serializer.Message - @usableFromInline - typealias Output = Deserializer.Message - + >: Sendable where Serializer.Message == Input, Deserializer.Message == Output { @usableFromInline let transport: Transport @usableFromInline @@ -181,14 +178,14 @@ extension ClientRPCExecutor.HedgingExecutor { let state = SharedState(policy: self.policy) // There's always a first attempt, safe to '!'. - let (attempt, scheduleNext) = state.withState({ $0.nextAttemptNumber() })! + let result = state.withState { $0.nextAttemptNumber()! } group.addTask { let result = await self._startAttempt( request: request, method: method, options: options, - attempt: attempt, + attempt: result.nextAttempt, state: state, picker: picker, responseHandler: responseHandler @@ -199,7 +196,7 @@ extension ClientRPCExecutor.HedgingExecutor { // Schedule the second attempt. var nextScheduledAttempt = ScheduledState() - if scheduleNext { + if result.scheduleNext { nextScheduledAttempt.schedule(in: &group, pushback: false, delay: self.policy.hedgingDelay) } @@ -212,13 +209,13 @@ extension ClientRPCExecutor.HedgingExecutor { switch outcome { case .ran: // Start a new attempt and possibly schedule the next. - if let (attempt, scheduleNext) = state.withState({ $0.nextAttemptNumber() }) { + if let result = state.withState({ $0.nextAttemptNumber() }) { group.addTask { let result = await self._startAttempt( request: request, method: method, options: options, - attempt: attempt, + attempt: result.nextAttempt, state: state, picker: picker, responseHandler: responseHandler @@ -227,7 +224,7 @@ extension ClientRPCExecutor.HedgingExecutor { } // Schedule the next attempt. - if scheduleNext { + if result.scheduleNext { nextScheduledAttempt.schedule( in: &group, pushback: false, @@ -265,13 +262,13 @@ extension ClientRPCExecutor.HedgingExecutor { nextScheduledAttempt.cancel() - if let (attempt, scheduleNext) = state.withState({ $0.nextAttemptNumber() }) { + if let result = state.withState({ $0.nextAttemptNumber() }) { group.addTask { let result = await self._startAttempt( request: request, method: method, options: options, - attempt: attempt, + attempt: result.nextAttempt, state: state, picker: picker, responseHandler: responseHandler @@ -280,7 +277,7 @@ extension ClientRPCExecutor.HedgingExecutor { } // Schedule the next retry. - if scheduleNext { + if result.scheduleNext { nextScheduledAttempt.schedule( in: &group, pushback: true, @@ -314,7 +311,7 @@ extension ClientRPCExecutor.HedgingExecutor { } @inlinable - func _startAttempt( + func _startAttempt( request: ClientRequest.Stream, method: MethodDescriptor, options: CallOptions, @@ -431,7 +428,7 @@ extension ClientRPCExecutor.HedgingExecutor { } @usableFromInline - final class SharedState { + final class SharedState: Sendable { @usableFromInline let state: Mutex @@ -441,7 +438,7 @@ extension ClientRPCExecutor.HedgingExecutor { } @inlinable - func withState(_ body: @Sendable (inout State) -> ReturnType) -> ReturnType { + func withState(_ body: (inout State) -> ReturnType) -> ReturnType { self.state.withLock { body(&$0) } @@ -449,7 +446,7 @@ extension ClientRPCExecutor.HedgingExecutor { } @usableFromInline - struct State { + struct State: Sendable { @usableFromInline let _maximumAttempts: Int @usableFromInline @@ -474,14 +471,31 @@ extension ClientRPCExecutor.HedgingExecutor { } } + @usableFromInline + struct NextAttemptResult: Sendable { + @usableFromInline + var nextAttempt: Int + @usableFromInline + var scheduleNext: Bool + + @inlinable + init(nextAttempt: Int, scheduleNext: Bool) { + self.nextAttempt = nextAttempt + self.scheduleNext = scheduleNext + } + } + @inlinable - mutating func nextAttemptNumber() -> (Int, Bool)? { + mutating func nextAttemptNumber() -> NextAttemptResult? { if self.hasUsableResponse || self.attempt > self._maximumAttempts { return nil } else { let attempt = self.attempt self.attempt += 1 - return (attempt, self.attempt <= self._maximumAttempts) + return NextAttemptResult( + nextAttempt: attempt, + scheduleNext: self.attempt <= self._maximumAttempts + ) } } } @@ -533,7 +547,7 @@ extension ClientRPCExecutor.HedgingExecutor { @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) @usableFromInline -enum _HedgingTaskResult { +enum _HedgingTaskResult: Sendable { case rpcHandled(Result) case finishedRequest(Result) case timedOut(Result) @@ -541,20 +555,20 @@ enum _HedgingTaskResult { @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) @usableFromInline -enum _HedgingAttemptTaskResult { +enum _HedgingAttemptTaskResult: Sendable { case attemptPicked(Bool) case attemptCompleted(AttemptResult) case scheduledAttemptFired(ScheduleEvent) @usableFromInline - enum AttemptResult { + enum AttemptResult: Sendable { case unusableResponse(ClientResponse.Stream, Metadata.RetryPushback?) case usableResponse(Result) case noStreamAvailable(any Error) } @usableFromInline - enum ScheduleEvent { + enum ScheduleEvent: Sendable { case ran case cancelled } diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift index 47668a8f9..a1288999c 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift @@ -21,14 +21,11 @@ extension ClientRPCExecutor { @usableFromInline struct OneShotExecutor< Transport: ClientTransport, + Input: Sendable, + Output: Sendable, Serializer: MessageSerializer, Deserializer: MessageDeserializer - > { - @usableFromInline - typealias Input = Serializer.Message - @usableFromInline - typealias Output = Deserializer.Message - + >: Sendable where Serializer.Message == Input, Deserializer.Message == Output { @usableFromInline let transport: Transport @usableFromInline @@ -60,7 +57,7 @@ extension ClientRPCExecutor { @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) extension ClientRPCExecutor.OneShotExecutor { @inlinable - func execute( + func execute( request: ClientRequest.Stream, method: MethodDescriptor, options: CallOptions, @@ -71,9 +68,10 @@ extension ClientRPCExecutor.OneShotExecutor { if let deadline = self.deadline { var request = request request.metadata.timeout = ContinuousClock.now.duration(to: deadline) + let immutableRequest = request result = await withDeadline(deadline) { await self._execute( - request: request, + request: immutableRequest, method: method, options: options, responseHandler: responseHandler @@ -95,7 +93,7 @@ extension ClientRPCExecutor.OneShotExecutor { @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) extension ClientRPCExecutor.OneShotExecutor { @inlinable - func _execute( + func _execute( request: ClientRequest.Stream, method: MethodDescriptor, options: CallOptions, @@ -133,9 +131,9 @@ extension ClientRPCExecutor.OneShotExecutor { @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) @inlinable -func withDeadline( +func withDeadline( _ deadline: ContinuousClock.Instant, - execute: @escaping () async -> Result + execute: @Sendable @escaping () async -> Result ) async -> Result { return await withTaskGroup(of: _DeadlineChildTaskResult.self) { group in group.addTask { @@ -173,7 +171,7 @@ func withDeadline( } @usableFromInline -enum _DeadlineChildTaskResult { +enum _DeadlineChildTaskResult: Sendable { case deadlinePassed case timeoutCancelled case taskCompleted(Value) diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift index 7c29a6c05..2cce865e1 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift @@ -19,14 +19,11 @@ extension ClientRPCExecutor { @usableFromInline struct RetryExecutor< Transport: ClientTransport, + Input: Sendable, + Output: Sendable, Serializer: MessageSerializer, Deserializer: MessageDeserializer - > { - @usableFromInline - typealias Input = Serializer.Message - @usableFromInline - typealias Output = Deserializer.Message - + >: Sendable where Serializer.Message == Input, Deserializer.Message == Output { @usableFromInline let transport: Transport @usableFromInline @@ -198,7 +195,7 @@ extension ClientRPCExecutor.RetryExecutor { } @inlinable - func executeAttempt( + func executeAttempt( stream: RPCStream, metadata: Metadata, retryStream: BroadcastAsyncSequence, @@ -307,7 +304,7 @@ extension ClientRPCExecutor.RetryExecutor { @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) @usableFromInline -enum _RetryExecutorTask { +enum _RetryExecutorTask: Sendable { case timedOut(Result) case handledResponse(Result) case retry(Duration?) diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift index 472639835..c0f4b166a 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift @@ -55,7 +55,7 @@ internal enum ClientStreamExecutor { } let bodyParts = RawBodyPartToMessageSequence( - base: AsyncIteratorSequence(iterator.wrappedValue), + base: UncheckedAsyncIteratorSequence(iterator.wrappedValue), deserializer: deserializer ) @@ -168,7 +168,7 @@ internal enum ClientStreamExecutor { Message: Sendable, Deserializer: MessageDeserializer, Failure: Error - >: AsyncSequence { + >: AsyncSequence, Sendable where Base: Sendable { @usableFromInline typealias Element = AsyncIterator.Element diff --git a/Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift b/Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift index 49e3b713e..74f657884 100644 --- a/Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift +++ b/Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift @@ -168,7 +168,7 @@ struct ServerRPCExecutor { ServerRequest.Stream ) async throws -> ServerResponse.Stream ) async { - let messages = AsyncIteratorSequence(inbound.wrappedValue).map { part throws -> Input in + let messages = UncheckedAsyncIteratorSequence(inbound.wrappedValue).map { part in switch part { case .message(let bytes): return try deserializer.deserialize(bytes) @@ -284,7 +284,7 @@ struct ServerRPCExecutor { } @usableFromInline - enum ServerExecutorTask { + enum ServerExecutorTask: Sendable { case timedOut(Result) case executed } diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift index 8b0027a74..c11d32caf 100644 --- a/Sources/GRPCCore/GRPCClient.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -256,7 +256,7 @@ public final class GRPCClient: Sendable { /// - handler: A unary response handler. /// /// - Returns: The return value from the `handler`. - public func unary( + public func unary( request: ClientRequest.Single, descriptor: MethodDescriptor, serializer: some MessageSerializer, @@ -287,7 +287,7 @@ public final class GRPCClient: Sendable { /// - handler: A unary response handler. /// /// - Returns: The return value from the `handler`. - public func clientStreaming( + public func clientStreaming( request: ClientRequest.Stream, descriptor: MethodDescriptor, serializer: some MessageSerializer, @@ -318,7 +318,7 @@ public final class GRPCClient: Sendable { /// - handler: A response stream handler. /// /// - Returns: The return value from the `handler`. - public func serverStreaming( + public func serverStreaming( request: ClientRequest.Single, descriptor: MethodDescriptor, serializer: some MessageSerializer, @@ -350,7 +350,7 @@ public final class GRPCClient: Sendable { /// - handler: A response stream handler. /// /// - Returns: The return value from the `handler`. - public func bidirectionalStreaming( + public func bidirectionalStreaming( request: ClientRequest.Stream, descriptor: MethodDescriptor, serializer: some MessageSerializer, diff --git a/Sources/GRPCCore/Internal/TaskGroup+CancellableTask.swift b/Sources/GRPCCore/Internal/TaskGroup+CancellableTask.swift index 1800f32dc..454a85b85 100644 --- a/Sources/GRPCCore/Internal/TaskGroup+CancellableTask.swift +++ b/Sources/GRPCCore/Internal/TaskGroup+CancellableTask.swift @@ -58,7 +58,7 @@ extension TaskGroup { } @usableFromInline - enum _ResultOrCancelled { + enum _ResultOrCancelled: Sendable { case result(ChildTaskResult) case cancelled } diff --git a/Sources/GRPCCore/Status.swift b/Sources/GRPCCore/Status.swift index d1c9e087b..ed6636b7f 100644 --- a/Sources/GRPCCore/Status.swift +++ b/Sources/GRPCCore/Status.swift @@ -60,17 +60,19 @@ public struct Status: @unchecked Sendable, Hashable { public init(code: Code, message: String) { if code == .ok, message.isEmpty { // Avoid a heap allocation for the common case. - self.storage = Storage.okWithNoMessage + self = .ok } else { self.storage = Storage(code: code, message: message) } } - /// A status with code ``Code-swift.struct/ok`` and an empty message. - @inlinable - internal static var ok: Self { - Status(code: .ok, message: "") + private init(storage: Storage) { + self.storage = storage } + + /// A status with code ``Code-swift.struct/ok`` and an empty message. + @usableFromInline + internal static let ok = Status(storage: Storage(code: .ok, message: "")) } extension Status: CustomStringConvertible { @@ -81,8 +83,6 @@ extension Status: CustomStringConvertible { extension Status { private final class Storage: Hashable { - static let okWithNoMessage = Storage(code: .ok, message: "") - var code: Status.Code var message: String diff --git a/Sources/GRPCCore/Streaming/Internal/GRPCAsyncThrowingStream.swift b/Sources/GRPCCore/Streaming/Internal/GRPCAsyncThrowingStream.swift new file mode 100644 index 000000000..5b4bcb58b --- /dev/null +++ b/Sources/GRPCCore/Streaming/Internal/GRPCAsyncThrowingStream.swift @@ -0,0 +1,103 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This exists to provide a version of 'AsyncThrowingStream' which is constrained to 'Sendable' +// elements. This is required in order for the continuation to be compatible with +// 'RPCWriterProtocol'. (Adding a constrained conformance to 'RPCWriterProtocol' on +// 'AsyncThrowingStream.Continuation' isn't possible because 'Sendable' is a marker protocol.) + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +package struct GRPCAsyncThrowingStream: AsyncSequence, Sendable { + package typealias Element = Element + package typealias Failure = any Error + + private let base: AsyncThrowingStream + + package static func makeStream( + of: Element.Type = Element.self + ) -> (stream: Self, continuation: Self.Continuation) { + let base = AsyncThrowingStream.makeStream(of: Element.self) + let stream = GRPCAsyncThrowingStream(base: base.stream) + let continuation = GRPCAsyncThrowingStream.Continuation(base: base.continuation) + return (stream, continuation) + } + + fileprivate init(base: AsyncThrowingStream) { + self.base = base + } + + package struct Continuation: Sendable { + private let base: AsyncThrowingStream.Continuation + + fileprivate init(base: AsyncThrowingStream.Continuation) { + self.base = base + } + + func yield(_ value: Element) { + self.base.yield(value) + } + + func finish(throwing error: (any Error)? = nil) { + self.base.finish(throwing: error) + } + } + + package func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(base: self.base.makeAsyncIterator()) + } + + package struct AsyncIterator: AsyncIteratorProtocol { + private var base: AsyncThrowingStream.AsyncIterator + + fileprivate init(base: AsyncThrowingStream.AsyncIterator) { + self.base = base + } + + package mutating func next() async throws(any Error) -> Element? { + try await self.next(isolation: nil) + } + + package mutating func next( + isolation actor: isolated (any Actor)? + ) async throws(any Error) -> Element? { + try await self.base.next(isolation: `actor`) + } + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension GRPCAsyncThrowingStream.Continuation: RPCWriterProtocol { + package func write(_ element: Element) async throws { + self.yield(element) + } + + package func write(contentsOf elements: some Sequence) async throws { + for element in elements { + self.yield(element) + } + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension GRPCAsyncThrowingStream.Continuation: ClosableRPCWriterProtocol { + package func finish() async { + self.finish(throwing: nil) + } + + package func finish(throwing error: any Error) async { + self.finish(throwing: .some(error)) + } +} diff --git a/Sources/GRPCCore/Streaming/Internal/RPCWriter+Map.swift b/Sources/GRPCCore/Streaming/Internal/RPCWriter+Map.swift index 3afe49425..7755b46e1 100644 --- a/Sources/GRPCCore/Streaming/Internal/RPCWriter+Map.swift +++ b/Sources/GRPCCore/Streaming/Internal/RPCWriter+Map.swift @@ -16,7 +16,11 @@ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @usableFromInline -struct MapRPCWriter>: RPCWriterProtocol { +struct MapRPCWriter< + Value: Sendable, + Mapped: Sendable, + Base: RPCWriterProtocol +>: RPCWriterProtocol { @usableFromInline typealias Element = Value diff --git a/Sources/GRPCCore/Streaming/Internal/RPCWriter+MessageToRPCResponsePart.swift b/Sources/GRPCCore/Streaming/Internal/RPCWriter+MessageToRPCResponsePart.swift index 24ff04c64..c3dc290e9 100644 --- a/Sources/GRPCCore/Streaming/Internal/RPCWriter+MessageToRPCResponsePart.swift +++ b/Sources/GRPCCore/Streaming/Internal/RPCWriter+MessageToRPCResponsePart.swift @@ -16,7 +16,9 @@ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @usableFromInline -struct MessageToRPCResponsePartWriter: RPCWriterProtocol { +struct MessageToRPCResponsePartWriter< + Serializer: MessageSerializer +>: RPCWriterProtocol where Serializer.Message: Sendable { @usableFromInline typealias Element = Serializer.Message diff --git a/Sources/GRPCCore/Streaming/Internal/RPCWriter+Serialize.swift b/Sources/GRPCCore/Streaming/Internal/RPCWriter+Serialize.swift index f72df4ee6..f2a9acd22 100644 --- a/Sources/GRPCCore/Streaming/Internal/RPCWriter+Serialize.swift +++ b/Sources/GRPCCore/Streaming/Internal/RPCWriter+Serialize.swift @@ -19,7 +19,7 @@ struct SerializingRPCWriter< Base: RPCWriterProtocol<[UInt8]>, Serializer: MessageSerializer ->: RPCWriterProtocol { +>: RPCWriterProtocol where Serializer.Message: Sendable { @usableFromInline typealias Element = Serializer.Message diff --git a/Sources/GRPCCore/Streaming/Internal/AsyncIteratorSequence.swift b/Sources/GRPCCore/Streaming/Internal/UncheckedAsyncIteratorSequence.swift similarity index 68% rename from Sources/GRPCCore/Streaming/Internal/AsyncIteratorSequence.swift rename to Sources/GRPCCore/Streaming/Internal/UncheckedAsyncIteratorSequence.swift index f75421dd6..e3bdcafee 100644 --- a/Sources/GRPCCore/Streaming/Internal/AsyncIteratorSequence.swift +++ b/Sources/GRPCCore/Streaming/Internal/UncheckedAsyncIteratorSequence.swift @@ -19,7 +19,19 @@ public import Synchronization // should be @usableFromInline @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) @usableFromInline /// An `AsyncSequence` which wraps an existing async iterator. -final class AsyncIteratorSequence: AsyncSequence { +final class UncheckedAsyncIteratorSequence< + Base: AsyncIteratorProtocol +>: AsyncSequence, @unchecked Sendable { + // This is '@unchecked Sendable' because iterators are typically marked as not being Sendable + // to avoid multiple iterators being created. This is done to avoid multiple concurrent consumers + // of a single async sequence. + // + // However, gRPC needs to read the first message in a sequence of inbound request/response parts + // to check how the RPC should be handled. To do this it creates an async iterator and waits for + // the first value and then decides what to do. If it continues processing the RPC it uses this + // wrapper type to turn the iterator back into an async sequence and then drops the iterator on + // the floor so that there is only a single consumer of the original source. + @usableFromInline typealias Element = Base.Element diff --git a/Sources/GRPCCore/Streaming/RPCWriterProtocol.swift b/Sources/GRPCCore/Streaming/RPCWriterProtocol.swift index 5841f5802..63e3ee26d 100644 --- a/Sources/GRPCCore/Streaming/RPCWriterProtocol.swift +++ b/Sources/GRPCCore/Streaming/RPCWriterProtocol.swift @@ -18,7 +18,7 @@ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public protocol RPCWriterProtocol: Sendable { /// The type of value written. - associatedtype Element + associatedtype Element: Sendable /// Writes a single element. /// @@ -65,27 +65,3 @@ public protocol ClosableRPCWriterProtocol: RPCWriterProtocol { /// being thrown. func finish(throwing error: any Error) async } - -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -extension AsyncThrowingStream.Continuation: RPCWriterProtocol { - public func write(_ element: Element) async throws { - self.yield(element) - } - - public func write(contentsOf elements: some Sequence) async throws { - for element in elements { - self.yield(element) - } - } -} - -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -extension AsyncThrowingStream.Continuation: ClosableRPCWriterProtocol where Failure == any Error { - public func finish() { - self.finish(throwing: nil) - } - - public func finish(throwing error: any Error) { - self.finish(throwing: .some(error)) - } -} diff --git a/Sources/GRPCInProcessTransport/InProcessClientTransport.swift b/Sources/GRPCInProcessTransport/InProcessClientTransport.swift index ddbb4a8bd..2e29bdd82 100644 --- a/Sources/GRPCInProcessTransport/InProcessClientTransport.swift +++ b/Sources/GRPCInProcessTransport/InProcessClientTransport.swift @@ -232,8 +232,8 @@ public final class InProcessClientTransport: ClientTransport { options: CallOptions, _ closure: (RPCStream) async throws -> T ) async throws -> T { - let request = AsyncThrowingStream.makeStream(of: RPCRequestPart.self) - let response = AsyncThrowingStream.makeStream(of: RPCResponsePart.self) + let request = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self) + let response = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) let clientStream = RPCStream( descriptor: descriptor, diff --git a/Sources/GRPCInterceptors/HookedWriter.swift b/Sources/GRPCInterceptors/HookedWriter.swift index 8a9994033..9d85df044 100644 --- a/Sources/GRPCInterceptors/HookedWriter.swift +++ b/Sources/GRPCInterceptors/HookedWriter.swift @@ -17,7 +17,7 @@ internal import GRPCCore internal import Tracing @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -struct HookedWriter: RPCWriterProtocol { +struct HookedWriter: RPCWriterProtocol { private let writer: any RPCWriterProtocol private let beforeEachWrite: @Sendable () -> Void private let afterEachWrite: @Sendable () -> Void diff --git a/Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift b/Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift index ca23d02a6..274913ab8 100644 --- a/Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift +++ b/Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift @@ -80,8 +80,8 @@ struct ServerRPCExecutorTestHarness { RPCAsyncSequence ) async throws -> Void ) async throws { - let input = AsyncThrowingStream.makeStream(of: RPCRequestPart.self) - let output = AsyncThrowingStream.makeStream(of: RPCResponsePart.self) + let input = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self) + let output = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { diff --git a/Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift b/Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift index b5a7b0771..923ab267d 100644 --- a/Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift +++ b/Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift @@ -30,7 +30,7 @@ extension RPCWriter { } @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -private struct FailOnWrite: RPCWriterProtocol { +private struct FailOnWrite: RPCWriterProtocol { func write(_ element: Element) async throws { XCTFail("Unexpected write") } diff --git a/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift b/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift index 9890d6c4b..7816bc79a 100644 --- a/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift +++ b/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift @@ -24,7 +24,7 @@ final class InProcessServerTransportTests: XCTestCase { func testStartListening() async throws { let transport = InProcessServerTransport() - let outbound = AsyncThrowingStream.makeStream(of: RPCResponsePart.self) + let outbound = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) let stream = RPCStream< RPCAsyncSequence, RPCWriter.Closable @@ -55,7 +55,7 @@ final class InProcessServerTransportTests: XCTestCase { func testStopListening() async throws { let transport = InProcessServerTransport() - let firstStreamOutbound = AsyncThrowingStream.makeStream(of: RPCResponsePart.self) + let firstStreamOutbound = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) let firstStream = RPCStream< RPCAsyncSequence, RPCWriter.Closable >( @@ -79,7 +79,7 @@ final class InProcessServerTransportTests: XCTestCase { transport.beginGracefulShutdown() - let secondStreamOutbound = AsyncThrowingStream.makeStream(of: RPCResponsePart.self) + let secondStreamOutbound = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) let secondStream = RPCStream< RPCAsyncSequence, RPCWriter.Closable >(