From 4ecc0fa995d0884663ce9e621f6d78f8fa589960 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Wed, 4 Sep 2024 08:31:16 +0100 Subject: [PATCH 1/4] Enable Swift 6 language mode in GRPCCore Motivation: v2 should use Swift 6 language mode to take full advantage of the compilers data race checking. Modifications: - Enable Swift 6 language more for the code module - Add a bunch of missing explicit 'Sendable's Result: - Compiles with Swift 6 lang mode --- Package@swift-6.swift | 2 +- .../ClientRPCExecutor+HedgingExecutor.swift | 64 ++++++----- .../ClientRPCExecutor+OneShotExecutor.swift | 22 ++-- .../ClientRPCExecutor+RetryExecutor.swift | 13 +-- .../Internal/ClientStreamExecutor.swift | 4 +- .../Server/Internal/ServerRPCExecutor.swift | 4 +- Sources/GRPCCore/GRPCClient.swift | 8 +- .../Internal/TaskGroup+CancellableTask.swift | 2 +- Sources/GRPCCore/Status.swift | 14 +-- .../Streaming/Internal/RPCWriter+Map.swift | 6 +- .../RPCWriter+MessageToRPCResponsePart.swift | 4 +- .../Internal/RPCWriter+Serialize.swift | 2 +- ...t => UncheckedAsyncIteratorSequence.swift} | 14 ++- .../Streaming/RPCWriterProtocol.swift | 26 +---- .../InProcessClientTransport.swift | 4 +- .../Internal/InProcessStream.swift | 103 ++++++++++++++++++ Sources/GRPCInterceptors/HookedWriter.swift | 2 +- 17 files changed, 200 insertions(+), 94 deletions(-) rename Sources/GRPCCore/Streaming/Internal/{AsyncIteratorSequence.swift => UncheckedAsyncIteratorSequence.swift} (68%) create mode 100644 Sources/GRPCInProcessTransport/Internal/InProcessStream.swift diff --git a/Package@swift-6.swift b/Package@swift-6.swift index 6b41375d6..12c8c3180 100644 --- a/Package@swift-6.swift +++ b/Package@swift-6.swift @@ -196,7 +196,7 @@ extension Target { ], path: "Sources/GRPCCore", swiftSettings: [ - .swiftLanguageMode(.v5), + .swiftLanguageMode(.v6), .enableUpcomingFeature("ExistentialAny"), .enableUpcomingFeature("InternalImportsByDefault") ] diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift index d21b2d3bf..36364d22c 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift @@ -21,14 +21,11 @@ extension ClientRPCExecutor { @usableFromInline struct HedgingExecutor< Transport: ClientTransport, + Input: Sendable, + Output: Sendable, Serializer: MessageSerializer, Deserializer: MessageDeserializer - > { - @usableFromInline - typealias Input = Serializer.Message - @usableFromInline - typealias Output = Deserializer.Message - + >: Sendable where Serializer.Message == Input, Deserializer.Message == Output { @usableFromInline let transport: Transport @usableFromInline @@ -181,14 +178,14 @@ extension ClientRPCExecutor.HedgingExecutor { let state = SharedState(policy: self.policy) // There's always a first attempt, safe to '!'. - let (attempt, scheduleNext) = state.withState({ $0.nextAttemptNumber() })! + let result = state.withState { $0.nextAttemptNumber()! } group.addTask { let result = await self._startAttempt( request: request, method: method, options: options, - attempt: attempt, + attempt: result.nextAttempt, state: state, picker: picker, responseHandler: responseHandler @@ -199,7 +196,7 @@ extension ClientRPCExecutor.HedgingExecutor { // Schedule the second attempt. var nextScheduledAttempt = ScheduledState() - if scheduleNext { + if result.scheduleNext { nextScheduledAttempt.schedule(in: &group, pushback: false, delay: self.policy.hedgingDelay) } @@ -212,13 +209,13 @@ extension ClientRPCExecutor.HedgingExecutor { switch outcome { case .ran: // Start a new attempt and possibly schedule the next. - if let (attempt, scheduleNext) = state.withState({ $0.nextAttemptNumber() }) { + if let result = state.withState({ $0.nextAttemptNumber() }) { group.addTask { let result = await self._startAttempt( request: request, method: method, options: options, - attempt: attempt, + attempt: result.nextAttempt, state: state, picker: picker, responseHandler: responseHandler @@ -227,7 +224,7 @@ extension ClientRPCExecutor.HedgingExecutor { } // Schedule the next attempt. - if scheduleNext { + if result.scheduleNext { nextScheduledAttempt.schedule( in: &group, pushback: false, @@ -265,13 +262,13 @@ extension ClientRPCExecutor.HedgingExecutor { nextScheduledAttempt.cancel() - if let (attempt, scheduleNext) = state.withState({ $0.nextAttemptNumber() }) { + if let result = state.withState({ $0.nextAttemptNumber() }) { group.addTask { let result = await self._startAttempt( request: request, method: method, options: options, - attempt: attempt, + attempt: result.nextAttempt, state: state, picker: picker, responseHandler: responseHandler @@ -280,7 +277,7 @@ extension ClientRPCExecutor.HedgingExecutor { } // Schedule the next retry. - if scheduleNext { + if result.scheduleNext { nextScheduledAttempt.schedule( in: &group, pushback: true, @@ -314,7 +311,7 @@ extension ClientRPCExecutor.HedgingExecutor { } @inlinable - func _startAttempt( + func _startAttempt( request: ClientRequest.Stream, method: MethodDescriptor, options: CallOptions, @@ -431,7 +428,7 @@ extension ClientRPCExecutor.HedgingExecutor { } @usableFromInline - final class SharedState { + final class SharedState: Sendable { @usableFromInline let state: Mutex @@ -441,7 +438,7 @@ extension ClientRPCExecutor.HedgingExecutor { } @inlinable - func withState(_ body: @Sendable (inout State) -> ReturnType) -> ReturnType { + func withState(_ body: (inout State) -> ReturnType) -> ReturnType { self.state.withLock { body(&$0) } @@ -449,7 +446,7 @@ extension ClientRPCExecutor.HedgingExecutor { } @usableFromInline - struct State { + struct State: Sendable { @usableFromInline let _maximumAttempts: Int @usableFromInline @@ -474,14 +471,31 @@ extension ClientRPCExecutor.HedgingExecutor { } } + @usableFromInline + struct NextAttemptResult: Sendable { + @usableFromInline + var nextAttempt: Int + @usableFromInline + var scheduleNext: Bool + + @inlinable + init(nextAttempt: Int, scheduleNext: Bool) { + self.nextAttempt = nextAttempt + self.scheduleNext = scheduleNext + } + } + @inlinable - mutating func nextAttemptNumber() -> (Int, Bool)? { + mutating func nextAttemptNumber() -> NextAttemptResult? { if self.hasUsableResponse || self.attempt > self._maximumAttempts { return nil } else { let attempt = self.attempt self.attempt += 1 - return (attempt, self.attempt <= self._maximumAttempts) + return NextAttemptResult( + nextAttempt: attempt, + scheduleNext: self.attempt <= self._maximumAttempts + ) } } } @@ -533,7 +547,7 @@ extension ClientRPCExecutor.HedgingExecutor { @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) @usableFromInline -enum _HedgingTaskResult { +enum _HedgingTaskResult: Sendable { case rpcHandled(Result) case finishedRequest(Result) case timedOut(Result) @@ -541,20 +555,20 @@ enum _HedgingTaskResult { @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) @usableFromInline -enum _HedgingAttemptTaskResult { +enum _HedgingAttemptTaskResult: Sendable { case attemptPicked(Bool) case attemptCompleted(AttemptResult) case scheduledAttemptFired(ScheduleEvent) @usableFromInline - enum AttemptResult { + enum AttemptResult: Sendable { case unusableResponse(ClientResponse.Stream, Metadata.RetryPushback?) case usableResponse(Result) case noStreamAvailable(any Error) } @usableFromInline - enum ScheduleEvent { + enum ScheduleEvent: Sendable { case ran case cancelled } diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift index 47668a8f9..a1288999c 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift @@ -21,14 +21,11 @@ extension ClientRPCExecutor { @usableFromInline struct OneShotExecutor< Transport: ClientTransport, + Input: Sendable, + Output: Sendable, Serializer: MessageSerializer, Deserializer: MessageDeserializer - > { - @usableFromInline - typealias Input = Serializer.Message - @usableFromInline - typealias Output = Deserializer.Message - + >: Sendable where Serializer.Message == Input, Deserializer.Message == Output { @usableFromInline let transport: Transport @usableFromInline @@ -60,7 +57,7 @@ extension ClientRPCExecutor { @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) extension ClientRPCExecutor.OneShotExecutor { @inlinable - func execute( + func execute( request: ClientRequest.Stream, method: MethodDescriptor, options: CallOptions, @@ -71,9 +68,10 @@ extension ClientRPCExecutor.OneShotExecutor { if let deadline = self.deadline { var request = request request.metadata.timeout = ContinuousClock.now.duration(to: deadline) + let immutableRequest = request result = await withDeadline(deadline) { await self._execute( - request: request, + request: immutableRequest, method: method, options: options, responseHandler: responseHandler @@ -95,7 +93,7 @@ extension ClientRPCExecutor.OneShotExecutor { @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) extension ClientRPCExecutor.OneShotExecutor { @inlinable - func _execute( + func _execute( request: ClientRequest.Stream, method: MethodDescriptor, options: CallOptions, @@ -133,9 +131,9 @@ extension ClientRPCExecutor.OneShotExecutor { @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) @inlinable -func withDeadline( +func withDeadline( _ deadline: ContinuousClock.Instant, - execute: @escaping () async -> Result + execute: @Sendable @escaping () async -> Result ) async -> Result { return await withTaskGroup(of: _DeadlineChildTaskResult.self) { group in group.addTask { @@ -173,7 +171,7 @@ func withDeadline( } @usableFromInline -enum _DeadlineChildTaskResult { +enum _DeadlineChildTaskResult: Sendable { case deadlinePassed case timeoutCancelled case taskCompleted(Value) diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift index 7c29a6c05..2cce865e1 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift @@ -19,14 +19,11 @@ extension ClientRPCExecutor { @usableFromInline struct RetryExecutor< Transport: ClientTransport, + Input: Sendable, + Output: Sendable, Serializer: MessageSerializer, Deserializer: MessageDeserializer - > { - @usableFromInline - typealias Input = Serializer.Message - @usableFromInline - typealias Output = Deserializer.Message - + >: Sendable where Serializer.Message == Input, Deserializer.Message == Output { @usableFromInline let transport: Transport @usableFromInline @@ -198,7 +195,7 @@ extension ClientRPCExecutor.RetryExecutor { } @inlinable - func executeAttempt( + func executeAttempt( stream: RPCStream, metadata: Metadata, retryStream: BroadcastAsyncSequence, @@ -307,7 +304,7 @@ extension ClientRPCExecutor.RetryExecutor { @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) @usableFromInline -enum _RetryExecutorTask { +enum _RetryExecutorTask: Sendable { case timedOut(Result) case handledResponse(Result) case retry(Duration?) diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift index 472639835..c0f4b166a 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientStreamExecutor.swift @@ -55,7 +55,7 @@ internal enum ClientStreamExecutor { } let bodyParts = RawBodyPartToMessageSequence( - base: AsyncIteratorSequence(iterator.wrappedValue), + base: UncheckedAsyncIteratorSequence(iterator.wrappedValue), deserializer: deserializer ) @@ -168,7 +168,7 @@ internal enum ClientStreamExecutor { Message: Sendable, Deserializer: MessageDeserializer, Failure: Error - >: AsyncSequence { + >: AsyncSequence, Sendable where Base: Sendable { @usableFromInline typealias Element = AsyncIterator.Element diff --git a/Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift b/Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift index 49e3b713e..74f657884 100644 --- a/Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift +++ b/Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift @@ -168,7 +168,7 @@ struct ServerRPCExecutor { ServerRequest.Stream ) async throws -> ServerResponse.Stream ) async { - let messages = AsyncIteratorSequence(inbound.wrappedValue).map { part throws -> Input in + let messages = UncheckedAsyncIteratorSequence(inbound.wrappedValue).map { part in switch part { case .message(let bytes): return try deserializer.deserialize(bytes) @@ -284,7 +284,7 @@ struct ServerRPCExecutor { } @usableFromInline - enum ServerExecutorTask { + enum ServerExecutorTask: Sendable { case timedOut(Result) case executed } diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift index 8b0027a74..c11d32caf 100644 --- a/Sources/GRPCCore/GRPCClient.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -256,7 +256,7 @@ public final class GRPCClient: Sendable { /// - handler: A unary response handler. /// /// - Returns: The return value from the `handler`. - public func unary( + public func unary( request: ClientRequest.Single, descriptor: MethodDescriptor, serializer: some MessageSerializer, @@ -287,7 +287,7 @@ public final class GRPCClient: Sendable { /// - handler: A unary response handler. /// /// - Returns: The return value from the `handler`. - public func clientStreaming( + public func clientStreaming( request: ClientRequest.Stream, descriptor: MethodDescriptor, serializer: some MessageSerializer, @@ -318,7 +318,7 @@ public final class GRPCClient: Sendable { /// - handler: A response stream handler. /// /// - Returns: The return value from the `handler`. - public func serverStreaming( + public func serverStreaming( request: ClientRequest.Single, descriptor: MethodDescriptor, serializer: some MessageSerializer, @@ -350,7 +350,7 @@ public final class GRPCClient: Sendable { /// - handler: A response stream handler. /// /// - Returns: The return value from the `handler`. - public func bidirectionalStreaming( + public func bidirectionalStreaming( request: ClientRequest.Stream, descriptor: MethodDescriptor, serializer: some MessageSerializer, diff --git a/Sources/GRPCCore/Internal/TaskGroup+CancellableTask.swift b/Sources/GRPCCore/Internal/TaskGroup+CancellableTask.swift index 1800f32dc..454a85b85 100644 --- a/Sources/GRPCCore/Internal/TaskGroup+CancellableTask.swift +++ b/Sources/GRPCCore/Internal/TaskGroup+CancellableTask.swift @@ -58,7 +58,7 @@ extension TaskGroup { } @usableFromInline - enum _ResultOrCancelled { + enum _ResultOrCancelled: Sendable { case result(ChildTaskResult) case cancelled } diff --git a/Sources/GRPCCore/Status.swift b/Sources/GRPCCore/Status.swift index 738969c0c..bb76d3aee 100644 --- a/Sources/GRPCCore/Status.swift +++ b/Sources/GRPCCore/Status.swift @@ -60,17 +60,19 @@ public struct Status: @unchecked Sendable, Hashable { public init(code: Code, message: String) { if code == .ok, message.isEmpty { // Avoid a heap allocation for the common case. - self.storage = Storage.okWithNoMessage + self = .ok } else { self.storage = Storage(code: code, message: message) } } - /// A status with code ``Code-swift.struct/ok`` and an empty message. - @inlinable - internal static var ok: Self { - Status(code: .ok, message: "") + private init(storage: Storage) { + self.storage = storage } + + /// A status with code ``Code-swift.struct/ok`` and an empty message. + @usableFromInline + internal static let ok = Status(storage: Storage(code: .ok, message: "")) } extension Status: CustomStringConvertible { @@ -81,8 +83,6 @@ extension Status: CustomStringConvertible { extension Status { private final class Storage: Hashable { - static let okWithNoMessage = Storage(code: .ok, message: "") - var code: Status.Code var message: String diff --git a/Sources/GRPCCore/Streaming/Internal/RPCWriter+Map.swift b/Sources/GRPCCore/Streaming/Internal/RPCWriter+Map.swift index 3afe49425..7755b46e1 100644 --- a/Sources/GRPCCore/Streaming/Internal/RPCWriter+Map.swift +++ b/Sources/GRPCCore/Streaming/Internal/RPCWriter+Map.swift @@ -16,7 +16,11 @@ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @usableFromInline -struct MapRPCWriter>: RPCWriterProtocol { +struct MapRPCWriter< + Value: Sendable, + Mapped: Sendable, + Base: RPCWriterProtocol +>: RPCWriterProtocol { @usableFromInline typealias Element = Value diff --git a/Sources/GRPCCore/Streaming/Internal/RPCWriter+MessageToRPCResponsePart.swift b/Sources/GRPCCore/Streaming/Internal/RPCWriter+MessageToRPCResponsePart.swift index 24ff04c64..c3dc290e9 100644 --- a/Sources/GRPCCore/Streaming/Internal/RPCWriter+MessageToRPCResponsePart.swift +++ b/Sources/GRPCCore/Streaming/Internal/RPCWriter+MessageToRPCResponsePart.swift @@ -16,7 +16,9 @@ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @usableFromInline -struct MessageToRPCResponsePartWriter: RPCWriterProtocol { +struct MessageToRPCResponsePartWriter< + Serializer: MessageSerializer +>: RPCWriterProtocol where Serializer.Message: Sendable { @usableFromInline typealias Element = Serializer.Message diff --git a/Sources/GRPCCore/Streaming/Internal/RPCWriter+Serialize.swift b/Sources/GRPCCore/Streaming/Internal/RPCWriter+Serialize.swift index f72df4ee6..f2a9acd22 100644 --- a/Sources/GRPCCore/Streaming/Internal/RPCWriter+Serialize.swift +++ b/Sources/GRPCCore/Streaming/Internal/RPCWriter+Serialize.swift @@ -19,7 +19,7 @@ struct SerializingRPCWriter< Base: RPCWriterProtocol<[UInt8]>, Serializer: MessageSerializer ->: RPCWriterProtocol { +>: RPCWriterProtocol where Serializer.Message: Sendable { @usableFromInline typealias Element = Serializer.Message diff --git a/Sources/GRPCCore/Streaming/Internal/AsyncIteratorSequence.swift b/Sources/GRPCCore/Streaming/Internal/UncheckedAsyncIteratorSequence.swift similarity index 68% rename from Sources/GRPCCore/Streaming/Internal/AsyncIteratorSequence.swift rename to Sources/GRPCCore/Streaming/Internal/UncheckedAsyncIteratorSequence.swift index f75421dd6..e3bdcafee 100644 --- a/Sources/GRPCCore/Streaming/Internal/AsyncIteratorSequence.swift +++ b/Sources/GRPCCore/Streaming/Internal/UncheckedAsyncIteratorSequence.swift @@ -19,7 +19,19 @@ public import Synchronization // should be @usableFromInline @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) @usableFromInline /// An `AsyncSequence` which wraps an existing async iterator. -final class AsyncIteratorSequence: AsyncSequence { +final class UncheckedAsyncIteratorSequence< + Base: AsyncIteratorProtocol +>: AsyncSequence, @unchecked Sendable { + // This is '@unchecked Sendable' because iterators are typically marked as not being Sendable + // to avoid multiple iterators being created. This is done to avoid multiple concurrent consumers + // of a single async sequence. + // + // However, gRPC needs to read the first message in a sequence of inbound request/response parts + // to check how the RPC should be handled. To do this it creates an async iterator and waits for + // the first value and then decides what to do. If it continues processing the RPC it uses this + // wrapper type to turn the iterator back into an async sequence and then drops the iterator on + // the floor so that there is only a single consumer of the original source. + @usableFromInline typealias Element = Base.Element diff --git a/Sources/GRPCCore/Streaming/RPCWriterProtocol.swift b/Sources/GRPCCore/Streaming/RPCWriterProtocol.swift index 5841f5802..63e3ee26d 100644 --- a/Sources/GRPCCore/Streaming/RPCWriterProtocol.swift +++ b/Sources/GRPCCore/Streaming/RPCWriterProtocol.swift @@ -18,7 +18,7 @@ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public protocol RPCWriterProtocol: Sendable { /// The type of value written. - associatedtype Element + associatedtype Element: Sendable /// Writes a single element. /// @@ -65,27 +65,3 @@ public protocol ClosableRPCWriterProtocol: RPCWriterProtocol { /// being thrown. func finish(throwing error: any Error) async } - -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -extension AsyncThrowingStream.Continuation: RPCWriterProtocol { - public func write(_ element: Element) async throws { - self.yield(element) - } - - public func write(contentsOf elements: some Sequence) async throws { - for element in elements { - self.yield(element) - } - } -} - -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -extension AsyncThrowingStream.Continuation: ClosableRPCWriterProtocol where Failure == any Error { - public func finish() { - self.finish(throwing: nil) - } - - public func finish(throwing error: any Error) { - self.finish(throwing: .some(error)) - } -} diff --git a/Sources/GRPCInProcessTransport/InProcessClientTransport.swift b/Sources/GRPCInProcessTransport/InProcessClientTransport.swift index ddbb4a8bd..9b1862d45 100644 --- a/Sources/GRPCInProcessTransport/InProcessClientTransport.swift +++ b/Sources/GRPCInProcessTransport/InProcessClientTransport.swift @@ -232,8 +232,8 @@ public final class InProcessClientTransport: ClientTransport { options: CallOptions, _ closure: (RPCStream) async throws -> T ) async throws -> T { - let request = AsyncThrowingStream.makeStream(of: RPCRequestPart.self) - let response = AsyncThrowingStream.makeStream(of: RPCResponsePart.self) + let request = InProcessStream.makeStream(of: RPCRequestPart.self) + let response = InProcessStream.makeStream(of: RPCResponsePart.self) let clientStream = RPCStream( descriptor: descriptor, diff --git a/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift b/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift new file mode 100644 index 000000000..d4054c5a3 --- /dev/null +++ b/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift @@ -0,0 +1,103 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +internal import GRPCCore + +// This exists to provide a version of 'AsyncThrowingStream' which is constrained to 'Sendable' +// elements. This is required in order for the continuation to be compatible with +// 'RPCWriterProtocol'. (Adding a constrained conformance to 'RPCWriterProtocol' on +// 'AsyncThrowingStream.Continuation' isn't possible because 'Sendable' is a marker protocol.) + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +struct InProcessStream: AsyncSequence, Sendable { + typealias Element = Element + typealias Failure = any Error + + private let base: AsyncThrowingStream + + static func makeStream( + of: Element.Type = Element.self + ) -> (stream: Self, continuation: Self.Continuation) { + let base = AsyncThrowingStream.makeStream(of: Element.self) + let stream = InProcessStream(base: base.stream) + let continuation = InProcessStream.Continuation(base: base.continuation) + return (stream, continuation) + } + + fileprivate init(base: AsyncThrowingStream) { + self.base = base + } + + struct Continuation: Sendable { + private let base: AsyncThrowingStream.Continuation + + fileprivate init(base: AsyncThrowingStream.Continuation) { + self.base = base + } + + func yield(_ value: Element) { + self.base.yield(value) + } + + func finish(throwing error: (any Error)? = nil) { + self.base.finish(throwing: error) + } + } + + func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(base: self.base.makeAsyncIterator()) + } + + struct AsyncIterator: AsyncIteratorProtocol { + private var base: AsyncThrowingStream.AsyncIterator + + fileprivate init(base: AsyncThrowingStream.AsyncIterator) { + self.base = base + } + + mutating func next() async throws(any Error) -> Element? { + try await self.next(isolation: nil) + } + + mutating func next(isolation actor: isolated (any Actor)?) async throws(any Error) -> Element? { + try await self.base.next(isolation: `actor`) + } + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension InProcessStream.Continuation: RPCWriterProtocol { + func write(_ element: Element) async throws { + self.yield(element) + } + + func write(contentsOf elements: some Sequence) async throws { + for element in elements { + self.yield(element) + } + } +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension InProcessStream.Continuation: ClosableRPCWriterProtocol { + func finish() { + self.finish(throwing: nil) + } + + func finish(throwing error: any Error) { + self.finish(throwing: .some(error)) + } +} diff --git a/Sources/GRPCInterceptors/HookedWriter.swift b/Sources/GRPCInterceptors/HookedWriter.swift index 8a9994033..9d85df044 100644 --- a/Sources/GRPCInterceptors/HookedWriter.swift +++ b/Sources/GRPCInterceptors/HookedWriter.swift @@ -17,7 +17,7 @@ internal import GRPCCore internal import Tracing @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -struct HookedWriter: RPCWriterProtocol { +struct HookedWriter: RPCWriterProtocol { private let writer: any RPCWriterProtocol private let beforeEachWrite: @Sendable () -> Void private let afterEachWrite: @Sendable () -> Void From f57344e5cd6ba0a0196175ddf56aeb582c881631 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 5 Sep 2024 16:06:07 +0100 Subject: [PATCH 2/4] fixup tests --- .../Internal/InProcessStream.swift | 26 +++++++++---------- .../InProcessServerTransportTests.swift | 6 ++--- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift b/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift index d4054c5a3..b2b1a5f65 100644 --- a/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift +++ b/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift @@ -22,13 +22,13 @@ internal import GRPCCore // 'AsyncThrowingStream.Continuation' isn't possible because 'Sendable' is a marker protocol.) @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -struct InProcessStream: AsyncSequence, Sendable { - typealias Element = Element - typealias Failure = any Error +package struct InProcessStream: AsyncSequence, Sendable { + package typealias Element = Element + package typealias Failure = any Error private let base: AsyncThrowingStream - static func makeStream( + package static func makeStream( of: Element.Type = Element.self ) -> (stream: Self, continuation: Self.Continuation) { let base = AsyncThrowingStream.makeStream(of: Element.self) @@ -41,7 +41,7 @@ struct InProcessStream: AsyncSequence, Sendable { self.base = base } - struct Continuation: Sendable { + package struct Continuation: Sendable { private let base: AsyncThrowingStream.Continuation fileprivate init(base: AsyncThrowingStream.Continuation) { @@ -57,22 +57,22 @@ struct InProcessStream: AsyncSequence, Sendable { } } - func makeAsyncIterator() -> AsyncIterator { + package func makeAsyncIterator() -> AsyncIterator { AsyncIterator(base: self.base.makeAsyncIterator()) } - struct AsyncIterator: AsyncIteratorProtocol { + package struct AsyncIterator: AsyncIteratorProtocol { private var base: AsyncThrowingStream.AsyncIterator fileprivate init(base: AsyncThrowingStream.AsyncIterator) { self.base = base } - mutating func next() async throws(any Error) -> Element? { + package mutating func next() async throws(any Error) -> Element? { try await self.next(isolation: nil) } - mutating func next(isolation actor: isolated (any Actor)?) async throws(any Error) -> Element? { + package mutating func next(isolation actor: isolated (any Actor)?) async throws(any Error) -> Element? { try await self.base.next(isolation: `actor`) } } @@ -80,11 +80,11 @@ struct InProcessStream: AsyncSequence, Sendable { @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) extension InProcessStream.Continuation: RPCWriterProtocol { - func write(_ element: Element) async throws { + package func write(_ element: Element) async throws { self.yield(element) } - func write(contentsOf elements: some Sequence) async throws { + package func write(contentsOf elements: some Sequence) async throws { for element in elements { self.yield(element) } @@ -93,11 +93,11 @@ extension InProcessStream.Continuation: RPCWriterProtocol { @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) extension InProcessStream.Continuation: ClosableRPCWriterProtocol { - func finish() { + package func finish() async { self.finish(throwing: nil) } - func finish(throwing error: any Error) { + package func finish(throwing error: any Error) async { self.finish(throwing: .some(error)) } } diff --git a/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift b/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift index 9890d6c4b..15d9361ca 100644 --- a/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift +++ b/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift @@ -24,7 +24,7 @@ final class InProcessServerTransportTests: XCTestCase { func testStartListening() async throws { let transport = InProcessServerTransport() - let outbound = AsyncThrowingStream.makeStream(of: RPCResponsePart.self) + let outbound = InProcessStream.makeStream(of: RPCResponsePart.self) let stream = RPCStream< RPCAsyncSequence, RPCWriter.Closable @@ -55,7 +55,7 @@ final class InProcessServerTransportTests: XCTestCase { func testStopListening() async throws { let transport = InProcessServerTransport() - let firstStreamOutbound = AsyncThrowingStream.makeStream(of: RPCResponsePart.self) + let firstStreamOutbound = InProcessStream.makeStream(of: RPCResponsePart.self) let firstStream = RPCStream< RPCAsyncSequence, RPCWriter.Closable >( @@ -79,7 +79,7 @@ final class InProcessServerTransportTests: XCTestCase { transport.beginGracefulShutdown() - let secondStreamOutbound = AsyncThrowingStream.makeStream(of: RPCResponsePart.self) + let secondStreamOutbound = InProcessStream.makeStream(of: RPCResponsePart.self) let secondStream = RPCStream< RPCAsyncSequence, RPCWriter.Closable >( From dbe070e5e1bbdac62267c41be378d8e0053a4450 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Tue, 10 Sep 2024 08:13:11 +0100 Subject: [PATCH 3/4] format --- Sources/GRPCInProcessTransport/Internal/InProcessStream.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift b/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift index b2b1a5f65..5f15de619 100644 --- a/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift +++ b/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift @@ -72,7 +72,9 @@ package struct InProcessStream: AsyncSequence, Sendable { try await self.next(isolation: nil) } - package mutating func next(isolation actor: isolated (any Actor)?) async throws(any Error) -> Element? { + package mutating func next( + isolation actor: isolated (any Actor)? + ) async throws(any Error) -> Element? { try await self.base.next(isolation: `actor`) } } From e959374b3636795881187c1fd0364ce17d078d23 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Tue, 10 Sep 2024 08:43:51 +0100 Subject: [PATCH 4/4] fix stream type --- .../Internal/GRPCAsyncThrowingStream.swift} | 12 +++++------- .../InProcessClientTransport.swift | 4 ++-- .../ServerRPCExecutorTestHarness.swift | 4 ++-- .../Test Utilities/RPCWriter+Utilities.swift | 2 +- .../InProcessServerTransportTests.swift | 6 +++--- 5 files changed, 13 insertions(+), 15 deletions(-) rename Sources/{GRPCInProcessTransport/Internal/InProcessStream.swift => GRPCCore/Streaming/Internal/GRPCAsyncThrowingStream.swift} (89%) diff --git a/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift b/Sources/GRPCCore/Streaming/Internal/GRPCAsyncThrowingStream.swift similarity index 89% rename from Sources/GRPCInProcessTransport/Internal/InProcessStream.swift rename to Sources/GRPCCore/Streaming/Internal/GRPCAsyncThrowingStream.swift index 5f15de619..5b4bcb58b 100644 --- a/Sources/GRPCInProcessTransport/Internal/InProcessStream.swift +++ b/Sources/GRPCCore/Streaming/Internal/GRPCAsyncThrowingStream.swift @@ -14,15 +14,13 @@ * limitations under the License. */ -internal import GRPCCore - // This exists to provide a version of 'AsyncThrowingStream' which is constrained to 'Sendable' // elements. This is required in order for the continuation to be compatible with // 'RPCWriterProtocol'. (Adding a constrained conformance to 'RPCWriterProtocol' on // 'AsyncThrowingStream.Continuation' isn't possible because 'Sendable' is a marker protocol.) @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -package struct InProcessStream: AsyncSequence, Sendable { +package struct GRPCAsyncThrowingStream: AsyncSequence, Sendable { package typealias Element = Element package typealias Failure = any Error @@ -32,8 +30,8 @@ package struct InProcessStream: AsyncSequence, Sendable { of: Element.Type = Element.self ) -> (stream: Self, continuation: Self.Continuation) { let base = AsyncThrowingStream.makeStream(of: Element.self) - let stream = InProcessStream(base: base.stream) - let continuation = InProcessStream.Continuation(base: base.continuation) + let stream = GRPCAsyncThrowingStream(base: base.stream) + let continuation = GRPCAsyncThrowingStream.Continuation(base: base.continuation) return (stream, continuation) } @@ -81,7 +79,7 @@ package struct InProcessStream: AsyncSequence, Sendable { } @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -extension InProcessStream.Continuation: RPCWriterProtocol { +extension GRPCAsyncThrowingStream.Continuation: RPCWriterProtocol { package func write(_ element: Element) async throws { self.yield(element) } @@ -94,7 +92,7 @@ extension InProcessStream.Continuation: RPCWriterProtocol { } @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -extension InProcessStream.Continuation: ClosableRPCWriterProtocol { +extension GRPCAsyncThrowingStream.Continuation: ClosableRPCWriterProtocol { package func finish() async { self.finish(throwing: nil) } diff --git a/Sources/GRPCInProcessTransport/InProcessClientTransport.swift b/Sources/GRPCInProcessTransport/InProcessClientTransport.swift index 9b1862d45..2e29bdd82 100644 --- a/Sources/GRPCInProcessTransport/InProcessClientTransport.swift +++ b/Sources/GRPCInProcessTransport/InProcessClientTransport.swift @@ -232,8 +232,8 @@ public final class InProcessClientTransport: ClientTransport { options: CallOptions, _ closure: (RPCStream) async throws -> T ) async throws -> T { - let request = InProcessStream.makeStream(of: RPCRequestPart.self) - let response = InProcessStream.makeStream(of: RPCResponsePart.self) + let request = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self) + let response = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) let clientStream = RPCStream( descriptor: descriptor, diff --git a/Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift b/Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift index ca23d02a6..274913ab8 100644 --- a/Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift +++ b/Tests/GRPCCoreTests/Call/Server/Internal/ServerRPCExecutorTestSupport/ServerRPCExecutorTestHarness.swift @@ -80,8 +80,8 @@ struct ServerRPCExecutorTestHarness { RPCAsyncSequence ) async throws -> Void ) async throws { - let input = AsyncThrowingStream.makeStream(of: RPCRequestPart.self) - let output = AsyncThrowingStream.makeStream(of: RPCResponsePart.self) + let input = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self) + let output = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { diff --git a/Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift b/Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift index b5a7b0771..923ab267d 100644 --- a/Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift +++ b/Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift @@ -30,7 +30,7 @@ extension RPCWriter { } @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -private struct FailOnWrite: RPCWriterProtocol { +private struct FailOnWrite: RPCWriterProtocol { func write(_ element: Element) async throws { XCTFail("Unexpected write") } diff --git a/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift b/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift index 15d9361ca..7816bc79a 100644 --- a/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift +++ b/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift @@ -24,7 +24,7 @@ final class InProcessServerTransportTests: XCTestCase { func testStartListening() async throws { let transport = InProcessServerTransport() - let outbound = InProcessStream.makeStream(of: RPCResponsePart.self) + let outbound = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) let stream = RPCStream< RPCAsyncSequence, RPCWriter.Closable @@ -55,7 +55,7 @@ final class InProcessServerTransportTests: XCTestCase { func testStopListening() async throws { let transport = InProcessServerTransport() - let firstStreamOutbound = InProcessStream.makeStream(of: RPCResponsePart.self) + let firstStreamOutbound = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) let firstStream = RPCStream< RPCAsyncSequence, RPCWriter.Closable >( @@ -79,7 +79,7 @@ final class InProcessServerTransportTests: XCTestCase { transport.beginGracefulShutdown() - let secondStreamOutbound = InProcessStream.makeStream(of: RPCResponsePart.self) + let secondStreamOutbound = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) let secondStream = RPCStream< RPCAsyncSequence, RPCWriter.Closable >(