diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift index 6c2729548..a3f9fe2d9 100644 --- a/Sources/GRPCCore/GRPCClient.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -65,11 +65,10 @@ private import Synchronization /// ) /// /// // Finally create a transport and instantiate the client, adding an interceptor. -/// let inProcessServerTransport = InProcessServerTransport() -/// let inProcessClientTransport = InProcessClientTransport(serverTransport: inProcessServerTransport) +/// let inProcessTransport = InProcessTransport() /// /// let client = GRPCClient( -/// transport: inProcessClientTransport, +/// transport: inProcessTransport.client, /// interceptors: [StatsRecordingClientInterceptor()], /// configuration: configuration /// ) diff --git a/Sources/GRPCCore/GRPCServer.swift b/Sources/GRPCCore/GRPCServer.swift index b3d99b7de..feb218b19 100644 --- a/Sources/GRPCCore/GRPCServer.swift +++ b/Sources/GRPCCore/GRPCServer.swift @@ -35,7 +35,7 @@ private import Synchronization /// /// ```swift /// // Create and an in-process transport. -/// let inProcessTransport = InProcessServerTransport() +/// let inProcessTransport = InProcessTransport() /// /// // Create the 'Greeter' and 'Echo' services. /// let greeter = GreeterService() @@ -46,7 +46,7 @@ private import Synchronization /// /// // Finally create the server. /// let server = GRPCServer( -/// transport: inProcessTransport, +/// transport: inProcessTransport.server, /// services: [greeter, echo], /// interceptors: [statsRecorder] /// ) diff --git a/Sources/GRPCInProcessTransport/InProcessClientTransport.swift b/Sources/GRPCInProcessTransport/InProcessClientTransport.swift deleted file mode 100644 index 822138f33..000000000 --- a/Sources/GRPCInProcessTransport/InProcessClientTransport.swift +++ /dev/null @@ -1,353 +0,0 @@ -/* - * Copyright 2023, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -public import GRPCCore -private import Synchronization - -/// An in-process implementation of a ``ClientTransport``. -/// -/// This is useful when you're interested in testing your application without any actual networking layers -/// involved, as the client and server will communicate directly with each other via in-process streams. -/// -/// To use this client, you'll have to provide an ``InProcessServerTransport`` upon creation, as well -/// as a ``ServiceConfig``. -/// -/// Once you have a client, you must keep a long-running task executing ``connect()``, which -/// will return only once all streams have been finished and ``beginGracefulShutdown()`` has been called on this client; or -/// when the containing task is cancelled. -/// -/// To execute requests using this client, use ``withStream(descriptor:options:_:)``. If this function is -/// called before ``connect()`` is called, then any streams will remain pending and the call will -/// block until ``connect()`` is called or the task is cancelled. -/// -/// - SeeAlso: ``ClientTransport`` -@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -public final class InProcessClientTransport: ClientTransport { - private enum State: Sendable { - struct UnconnectedState { - var serverTransport: InProcessServerTransport - var pendingStreams: [AsyncStream.Continuation] - - init(serverTransport: InProcessServerTransport) { - self.serverTransport = serverTransport - self.pendingStreams = [] - } - } - - struct ConnectedState { - var serverTransport: InProcessServerTransport - var nextStreamID: Int - var openStreams: - [Int: ( - RPCStream, - RPCStream< - RPCAsyncSequence, RPCWriter.Closable - > - )] - var signalEndContinuation: AsyncStream.Continuation - - init( - fromUnconnected state: UnconnectedState, - signalEndContinuation: AsyncStream.Continuation - ) { - self.serverTransport = state.serverTransport - self.nextStreamID = 0 - self.openStreams = [:] - self.signalEndContinuation = signalEndContinuation - } - } - - struct ClosedState { - var openStreams: - [Int: ( - RPCStream, - RPCStream< - RPCAsyncSequence, RPCWriter.Closable - > - )] - var signalEndContinuation: AsyncStream.Continuation? - - init() { - self.openStreams = [:] - self.signalEndContinuation = nil - } - - init(fromConnected state: ConnectedState) { - self.openStreams = state.openStreams - self.signalEndContinuation = state.signalEndContinuation - } - } - - case unconnected(UnconnectedState) - case connected(ConnectedState) - case closed(ClosedState) - } - - public typealias Inbound = RPCAsyncSequence - public typealias Outbound = RPCWriter.Closable - - public let retryThrottle: RetryThrottle? - - private let methodConfig: MethodConfigs - private let state: Mutex - - /// Creates a new in-process client transport. - /// - /// - Parameters: - /// - server: The in-process server transport to connect to. - /// - serviceConfig: Service configuration. - public init( - server: InProcessServerTransport, - serviceConfig: ServiceConfig = ServiceConfig() - ) { - self.retryThrottle = serviceConfig.retryThrottling.map { RetryThrottle(policy: $0) } - self.methodConfig = MethodConfigs(serviceConfig: serviceConfig) - self.state = Mutex(.unconnected(.init(serverTransport: server))) - } - - /// Establish and maintain a connection to the remote destination. - /// - /// Maintains a long-lived connection, or set of connections, to a remote destination. - /// Connections may be added or removed over time as required by the implementation and the - /// demand for streams by the client. - /// - /// Implementations of this function will typically create a long-lived task group which - /// maintains connections. The function exits when all open streams have been closed and new connections - /// are no longer required by the caller who signals this by calling ``beginGracefulShutdown()``, or by cancelling the - /// task this function runs in. - public func connect() async throws { - let (stream, continuation) = AsyncStream.makeStream() - try self.state.withLock { state in - switch state { - case .unconnected(let unconnectedState): - state = .connected( - .init( - fromUnconnected: unconnectedState, - signalEndContinuation: continuation - ) - ) - for pendingStream in unconnectedState.pendingStreams { - pendingStream.finish() - } - case .connected: - throw RPCError( - code: .failedPrecondition, - message: "Already connected to server." - ) - case .closed: - throw RPCError( - code: .failedPrecondition, - message: "Can't connect to server, transport is closed." - ) - } - } - - for await _ in stream { - // This for-await loop will exit (and thus `connect()` will return) - // only when the task is cancelled, or when the stream's continuation is - // finished - whichever happens first. - // The continuation will be finished when `close()` is called and there - // are no more open streams. - } - - // If at this point there are any open streams, it's because Cancellation - // occurred and all open streams must now be closed. - let openStreams = self.state.withLock { state in - switch state { - case .unconnected: - // We have transitioned to connected, and we can't transition back. - fatalError("Invalid state") - case .connected(let connectedState): - state = .closed(.init()) - return connectedState.openStreams.values - case .closed(let closedState): - return closedState.openStreams.values - } - } - - for (clientStream, serverStream) in openStreams { - await clientStream.outbound.finish(throwing: CancellationError()) - await serverStream.outbound.finish(throwing: CancellationError()) - } - } - - /// Signal to the transport that no new streams may be created. - /// - /// Existing streams may run to completion naturally but calling ``withStream(descriptor:options:_:)`` - /// will result in an ``RPCError`` with code ``RPCError/Code/failedPrecondition`` being thrown. - /// - /// If you want to forcefully cancel all active streams then cancel the task running ``connect()``. - public func beginGracefulShutdown() { - let maybeContinuation: AsyncStream.Continuation? = self.state.withLock { state in - switch state { - case .unconnected: - state = .closed(.init()) - return nil - case .connected(let connectedState): - if connectedState.openStreams.count == 0 { - state = .closed(.init()) - return connectedState.signalEndContinuation - } else { - state = .closed(.init(fromConnected: connectedState)) - return nil - } - case .closed: - return nil - } - } - maybeContinuation?.finish() - } - - /// Opens a stream using the transport, and uses it as input into a user-provided closure. - /// - /// - Important: The opened stream is closed after the closure is finished. - /// - /// This transport implementation throws ``RPCError/Code/failedPrecondition`` if the transport - /// is closing or has been closed. - /// - /// This implementation will queue any streams (and thus block this call) if this function is called before - /// ``connect()``, until a connection is established - at which point all streams will be - /// created. - /// - /// - Parameters: - /// - descriptor: A description of the method to open a stream for. - /// - options: Options specific to the stream. - /// - closure: A closure that takes the opened stream as parameter. - /// - Returns: Whatever value was returned from `closure`. - public func withStream( - descriptor: MethodDescriptor, - options: CallOptions, - _ closure: (RPCStream) async throws -> T - ) async throws -> T { - let request = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self) - let response = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) - - let clientStream = RPCStream( - descriptor: descriptor, - inbound: RPCAsyncSequence(wrapping: response.stream), - outbound: RPCWriter.Closable(wrapping: request.continuation) - ) - - let serverStream = RPCStream( - descriptor: descriptor, - inbound: RPCAsyncSequence(wrapping: request.stream), - outbound: RPCWriter.Closable(wrapping: response.continuation) - ) - - let waitForConnectionStream: AsyncStream? = self.state.withLock { state in - if case .unconnected(var unconnectedState) = state { - let (stream, continuation) = AsyncStream.makeStream() - unconnectedState.pendingStreams.append(continuation) - state = .unconnected(unconnectedState) - return stream - } - return nil - } - - if let waitForConnectionStream { - for await _ in waitForConnectionStream { - // This loop will exit either when the task is cancelled or when the - // client connects and this stream can be opened. - } - try Task.checkCancellation() - } - - let acceptStream: Result = self.state.withLock { state in - switch state { - case .unconnected: - // The state cannot be unconnected because if it was, then the above - // for-await loop on `pendingStream` would have not returned. - // The only other option is for the task to have been cancelled, - // and that's why we check for cancellation right after the loop. - fatalError("Invalid state.") - - case .connected(var connectedState): - let streamID = connectedState.nextStreamID - do { - try connectedState.serverTransport.acceptStream(serverStream) - connectedState.openStreams[streamID] = (clientStream, serverStream) - connectedState.nextStreamID += 1 - state = .connected(connectedState) - return .success(streamID) - } catch let acceptStreamError as RPCError { - return .failure(acceptStreamError) - } catch { - return .failure(RPCError(code: .unknown, message: "Unknown error: \(error).")) - } - - case .closed: - let error = RPCError(code: .failedPrecondition, message: "The client transport is closed.") - return .failure(error) - } - } - - switch acceptStream { - case .success(let streamID): - let streamHandlingResult: Result - do { - let result = try await closure(clientStream) - streamHandlingResult = .success(result) - } catch { - streamHandlingResult = .failure(error) - } - - await clientStream.outbound.finish() - self.removeStream(id: streamID) - - return try streamHandlingResult.get() - - case .failure(let error): - await serverStream.outbound.finish(throwing: error) - await clientStream.outbound.finish(throwing: error) - throw error - } - } - - private func removeStream(id streamID: Int) { - let maybeEndContinuation = self.state.withLock { state in - switch state { - case .unconnected: - // The state cannot be unconnected at this point, because if we made - // it this far, it's because the transport was connected. - // Once connected, it's impossible to transition back to unconnected, - // so this is an invalid state. - fatalError("Invalid state") - case .connected(var connectedState): - connectedState.openStreams.removeValue(forKey: streamID) - state = .connected(connectedState) - case .closed(var closedState): - closedState.openStreams.removeValue(forKey: streamID) - state = .closed(closedState) - if closedState.openStreams.isEmpty { - // This was the last open stream: signal the closure of the client. - return closedState.signalEndContinuation - } - } - return nil - } - maybeEndContinuation?.finish() - } - - /// Returns the execution configuration for a given method. - /// - /// - Parameter descriptor: The method to lookup configuration for. - /// - Returns: Execution configuration for the method, if it exists. - public func config( - forMethod descriptor: MethodDescriptor - ) -> MethodConfig? { - self.methodConfig[descriptor] - } -} diff --git a/Sources/GRPCInProcessTransport/InProcessServerTransport.swift b/Sources/GRPCInProcessTransport/InProcessServerTransport.swift deleted file mode 100644 index 2bb2ed57d..000000000 --- a/Sources/GRPCInProcessTransport/InProcessServerTransport.swift +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2023, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -public import GRPCCore - -/// An in-process implementation of a ``ServerTransport``. -/// -/// This is useful when you're interested in testing your application without any actual networking layers -/// involved, as the client and server will communicate directly with each other via in-process streams. -/// -/// To use this server, you call ``listen(_:)`` and iterate over the returned `AsyncSequence` to get all -/// RPC requests made from clients (as ``RPCStream``s). -/// To stop listening to new requests, call ``beginGracefulShutdown()``. -/// -/// - SeeAlso: ``ClientTransport`` -@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -public struct InProcessServerTransport: ServerTransport, Sendable { - public typealias Inbound = RPCAsyncSequence - public typealias Outbound = RPCWriter.Closable - - private let newStreams: AsyncStream> - private let newStreamsContinuation: AsyncStream>.Continuation - - /// Creates a new instance of ``InProcessServerTransport``. - public init() { - (self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream() - } - - /// Publish a new ``RPCStream``, which will be returned by the transport's ``events`` - /// successful case. - /// - /// - Parameter stream: The new ``RPCStream`` to publish. - /// - Throws: ``RPCError`` with code ``RPCError/Code-swift.struct/failedPrecondition`` - /// if the server transport stopped listening to new streams (i.e., if ``beginGracefulShutdown()`` has been called). - internal func acceptStream(_ stream: RPCStream) throws { - let yieldResult = self.newStreamsContinuation.yield(stream) - if case .terminated = yieldResult { - throw RPCError( - code: .failedPrecondition, - message: "The server transport is closed." - ) - } - } - - public func listen( - streamHandler: @escaping @Sendable ( - _ stream: RPCStream, - _ context: ServerContext - ) async -> Void - ) async throws { - await withDiscardingTaskGroup { group in - for await stream in self.newStreams { - group.addTask { - let context = ServerContext(descriptor: stream.descriptor) - await streamHandler(stream, context) - } - } - } - } - - /// Stop listening to any new ``RPCStream`` publications. - /// - /// - SeeAlso: ``ServerTransport`` - public func beginGracefulShutdown() { - self.newStreamsContinuation.finish() - } -} diff --git a/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift b/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift new file mode 100644 index 000000000..08c6b3476 --- /dev/null +++ b/Sources/GRPCInProcessTransport/InProcessTransport+Client.swift @@ -0,0 +1,359 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public import GRPCCore +private import Synchronization + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension InProcessTransport { + /// An in-process implementation of a ``ClientTransport``. + /// + /// This is useful when you're interested in testing your application without any actual networking layers + /// involved, as the client and server will communicate directly with each other via in-process streams. + /// + /// To use this client, you'll have to provide a ``ServerTransport`` upon creation, as well + /// as a ``ServiceConfig``. + /// + /// Once you have a client, you must keep a long-running task executing ``connect()``, which + /// will return only once all streams have been finished and ``beginGracefulShutdown()`` has been called on this client; or + /// when the containing task is cancelled. + /// + /// To execute requests using this client, use ``withStream(descriptor:options:_:)``. If this function is + /// called before ``connect()`` is called, then any streams will remain pending and the call will + /// block until ``connect()`` is called or the task is cancelled. + /// + /// - SeeAlso: ``ClientTransport`` + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + public final class Client: ClientTransport { + private enum State: Sendable { + struct UnconnectedState { + var serverTransport: InProcessTransport.Server + var pendingStreams: [AsyncStream.Continuation] + + init(serverTransport: InProcessTransport.Server) { + self.serverTransport = serverTransport + self.pendingStreams = [] + } + } + + struct ConnectedState { + var serverTransport: InProcessTransport.Server + var nextStreamID: Int + var openStreams: + [Int: ( + RPCStream, + RPCStream< + RPCAsyncSequence, RPCWriter.Closable + > + )] + var signalEndContinuation: AsyncStream.Continuation + + init( + fromUnconnected state: UnconnectedState, + signalEndContinuation: AsyncStream.Continuation + ) { + self.serverTransport = state.serverTransport + self.nextStreamID = 0 + self.openStreams = [:] + self.signalEndContinuation = signalEndContinuation + } + } + + struct ClosedState { + var openStreams: + [Int: ( + RPCStream, + RPCStream< + RPCAsyncSequence, RPCWriter.Closable + > + )] + var signalEndContinuation: AsyncStream.Continuation? + + init() { + self.openStreams = [:] + self.signalEndContinuation = nil + } + + init(fromConnected state: ConnectedState) { + self.openStreams = state.openStreams + self.signalEndContinuation = state.signalEndContinuation + } + } + + case unconnected(UnconnectedState) + case connected(ConnectedState) + case closed(ClosedState) + } + + public typealias Inbound = RPCAsyncSequence + public typealias Outbound = RPCWriter.Closable + + public let retryThrottle: RetryThrottle? + + private let methodConfig: MethodConfigs + private let state: Mutex + + /// Creates a new in-process client transport. + /// + /// - Parameters: + /// - server: The in-process server transport to connect to. + /// - serviceConfig: Service configuration. + public init( + server: InProcessTransport.Server, + serviceConfig: ServiceConfig = ServiceConfig() + ) { + self.retryThrottle = serviceConfig.retryThrottling.map { RetryThrottle(policy: $0) } + self.methodConfig = MethodConfigs(serviceConfig: serviceConfig) + self.state = Mutex(.unconnected(.init(serverTransport: server))) + } + + /// Establish and maintain a connection to the remote destination. + /// + /// Maintains a long-lived connection, or set of connections, to a remote destination. + /// Connections may be added or removed over time as required by the implementation and the + /// demand for streams by the client. + /// + /// Implementations of this function will typically create a long-lived task group which + /// maintains connections. The function exits when all open streams have been closed and new connections + /// are no longer required by the caller who signals this by calling ``beginGracefulShutdown()``, or by cancelling the + /// task this function runs in. + public func connect() async throws { + let (stream, continuation) = AsyncStream.makeStream() + try self.state.withLock { state in + switch state { + case .unconnected(let unconnectedState): + state = .connected( + .init( + fromUnconnected: unconnectedState, + signalEndContinuation: continuation + ) + ) + for pendingStream in unconnectedState.pendingStreams { + pendingStream.finish() + } + case .connected: + throw RPCError( + code: .failedPrecondition, + message: "Already connected to server." + ) + case .closed: + throw RPCError( + code: .failedPrecondition, + message: "Can't connect to server, transport is closed." + ) + } + } + + for await _ in stream { + // This for-await loop will exit (and thus `connect()` will return) + // only when the task is cancelled, or when the stream's continuation is + // finished - whichever happens first. + // The continuation will be finished when `close()` is called and there + // are no more open streams. + } + + // If at this point there are any open streams, it's because Cancellation + // occurred and all open streams must now be closed. + let openStreams = self.state.withLock { state in + switch state { + case .unconnected: + // We have transitioned to connected, and we can't transition back. + fatalError("Invalid state") + case .connected(let connectedState): + state = .closed(.init()) + return connectedState.openStreams.values + case .closed(let closedState): + return closedState.openStreams.values + } + } + + for (clientStream, serverStream) in openStreams { + await clientStream.outbound.finish(throwing: CancellationError()) + await serverStream.outbound.finish(throwing: CancellationError()) + } + } + + /// Signal to the transport that no new streams may be created. + /// + /// Existing streams may run to completion naturally but calling ``withStream(descriptor:options:_:)`` + /// will result in an ``RPCError`` with code ``RPCError/Code/failedPrecondition`` being thrown. + /// + /// If you want to forcefully cancel all active streams then cancel the task running ``connect()``. + public func beginGracefulShutdown() { + let maybeContinuation: AsyncStream.Continuation? = self.state.withLock { state in + switch state { + case .unconnected: + state = .closed(.init()) + return nil + case .connected(let connectedState): + if connectedState.openStreams.count == 0 { + state = .closed(.init()) + return connectedState.signalEndContinuation + } else { + state = .closed(.init(fromConnected: connectedState)) + return nil + } + case .closed: + return nil + } + } + maybeContinuation?.finish() + } + + /// Opens a stream using the transport, and uses it as input into a user-provided closure. + /// + /// - Important: The opened stream is closed after the closure is finished. + /// + /// This transport implementation throws ``RPCError/Code/failedPrecondition`` if the transport + /// is closing or has been closed. + /// + /// This implementation will queue any streams (and thus block this call) if this function is called before + /// ``connect()``, until a connection is established - at which point all streams will be + /// created. + /// + /// - Parameters: + /// - descriptor: A description of the method to open a stream for. + /// - options: Options specific to the stream. + /// - closure: A closure that takes the opened stream as parameter. + /// - Returns: Whatever value was returned from `closure`. + public func withStream( + descriptor: MethodDescriptor, + options: CallOptions, + _ closure: (RPCStream) async throws -> T + ) async throws -> T { + let request = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self) + let response = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) + + let clientStream = RPCStream( + descriptor: descriptor, + inbound: RPCAsyncSequence(wrapping: response.stream), + outbound: RPCWriter.Closable(wrapping: request.continuation) + ) + + let serverStream = RPCStream( + descriptor: descriptor, + inbound: RPCAsyncSequence(wrapping: request.stream), + outbound: RPCWriter.Closable(wrapping: response.continuation) + ) + + let waitForConnectionStream: AsyncStream? = self.state.withLock { state in + if case .unconnected(var unconnectedState) = state { + let (stream, continuation) = AsyncStream.makeStream() + unconnectedState.pendingStreams.append(continuation) + state = .unconnected(unconnectedState) + return stream + } + return nil + } + + if let waitForConnectionStream { + for await _ in waitForConnectionStream { + // This loop will exit either when the task is cancelled or when the + // client connects and this stream can be opened. + } + try Task.checkCancellation() + } + + let acceptStream: Result = self.state.withLock { state in + switch state { + case .unconnected: + // The state cannot be unconnected because if it was, then the above + // for-await loop on `pendingStream` would have not returned. + // The only other option is for the task to have been cancelled, + // and that's why we check for cancellation right after the loop. + fatalError("Invalid state.") + + case .connected(var connectedState): + let streamID = connectedState.nextStreamID + do { + try connectedState.serverTransport.acceptStream(serverStream) + connectedState.openStreams[streamID] = (clientStream, serverStream) + connectedState.nextStreamID += 1 + state = .connected(connectedState) + return .success(streamID) + } catch let acceptStreamError as RPCError { + return .failure(acceptStreamError) + } catch { + return .failure(RPCError(code: .unknown, message: "Unknown error: \(error).")) + } + + case .closed: + let error = RPCError( + code: .failedPrecondition, + message: "The client transport is closed." + ) + return .failure(error) + } + } + + switch acceptStream { + case .success(let streamID): + let streamHandlingResult: Result + do { + let result = try await closure(clientStream) + streamHandlingResult = .success(result) + } catch { + streamHandlingResult = .failure(error) + } + + await clientStream.outbound.finish() + self.removeStream(id: streamID) + + return try streamHandlingResult.get() + + case .failure(let error): + await serverStream.outbound.finish(throwing: error) + await clientStream.outbound.finish(throwing: error) + throw error + } + } + + private func removeStream(id streamID: Int) { + let maybeEndContinuation = self.state.withLock { state in + switch state { + case .unconnected: + // The state cannot be unconnected at this point, because if we made + // it this far, it's because the transport was connected. + // Once connected, it's impossible to transition back to unconnected, + // so this is an invalid state. + fatalError("Invalid state") + case .connected(var connectedState): + connectedState.openStreams.removeValue(forKey: streamID) + state = .connected(connectedState) + case .closed(var closedState): + closedState.openStreams.removeValue(forKey: streamID) + state = .closed(closedState) + if closedState.openStreams.isEmpty { + // This was the last open stream: signal the closure of the client. + return closedState.signalEndContinuation + } + } + return nil + } + maybeEndContinuation?.finish() + } + + /// Returns the execution configuration for a given method. + /// + /// - Parameter descriptor: The method to lookup configuration for. + /// - Returns: Execution configuration for the method, if it exists. + public func config( + forMethod descriptor: MethodDescriptor + ) -> MethodConfig? { + self.methodConfig[descriptor] + } + } +} diff --git a/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift b/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift new file mode 100644 index 000000000..0637255af --- /dev/null +++ b/Sources/GRPCInProcessTransport/InProcessTransport+Server.swift @@ -0,0 +1,83 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public import GRPCCore + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension InProcessTransport { + /// An in-process implementation of a ``ServerTransport``. + /// + /// This is useful when you're interested in testing your application without any actual networking layers + /// involved, as the client and server will communicate directly with each other via in-process streams. + /// + /// To use this server, you call ``listen(_:)`` and iterate over the returned `AsyncSequence` to get all + /// RPC requests made from clients (as ``RPCStream``s). + /// To stop listening to new requests, call ``beginGracefulShutdown()``. + /// + /// - SeeAlso: ``ClientTransport`` + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + public struct Server: ServerTransport, Sendable { + public typealias Inbound = RPCAsyncSequence + public typealias Outbound = RPCWriter.Closable + + private let newStreams: AsyncStream> + private let newStreamsContinuation: AsyncStream>.Continuation + + /// Creates a new instance of ``Server``. + public init() { + (self.newStreams, self.newStreamsContinuation) = AsyncStream.makeStream() + } + + /// Publish a new ``RPCStream``, which will be returned by the transport's ``events`` + /// successful case. + /// + /// - Parameter stream: The new ``RPCStream`` to publish. + /// - Throws: ``RPCError`` with code ``RPCError/Code-swift.struct/failedPrecondition`` + /// if the server transport stopped listening to new streams (i.e., if ``beginGracefulShutdown()`` has been called). + internal func acceptStream(_ stream: RPCStream) throws { + let yieldResult = self.newStreamsContinuation.yield(stream) + if case .terminated = yieldResult { + throw RPCError( + code: .failedPrecondition, + message: "The server transport is closed." + ) + } + } + + public func listen( + streamHandler: @escaping @Sendable ( + _ stream: RPCStream, + _ context: ServerContext + ) async -> Void + ) async throws { + await withDiscardingTaskGroup { group in + for await stream in self.newStreams { + group.addTask { + let context = ServerContext(descriptor: stream.descriptor) + await streamHandler(stream, context) + } + } + } + } + + /// Stop listening to any new ``RPCStream`` publications. + /// + /// - SeeAlso: ``ServerTransport`` + public func beginGracefulShutdown() { + self.newStreamsContinuation.finish() + } + } +} diff --git a/Sources/GRPCInProcessTransport/InProcessTransport.swift b/Sources/GRPCInProcessTransport/InProcessTransport.swift index 32a2002e9..e1bee0ae5 100644 --- a/Sources/GRPCInProcessTransport/InProcessTransport.swift +++ b/Sources/GRPCInProcessTransport/InProcessTransport.swift @@ -16,24 +16,17 @@ public import GRPCCore -public enum InProcessTransport { - /// Returns a pair containing an ``InProcessServerTransport`` and an ``InProcessClientTransport``. - /// - /// This function is purely for convenience and does no more than constructing a server transport - /// and a client using that server transport. +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +public struct InProcessTransport: Sendable { + public let server: Self.Server + public let client: Self.Client + + /// Initializes a new ``InProcessTransport`` pairing a ``Client`` and a ``Server``. /// /// - Parameters: /// - serviceConfig: Configuration describing how methods should be executed. - /// - Returns: A tuple containing the connected server and client in-process transports. - @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) - public static func makePair( - serviceConfig: ServiceConfig = ServiceConfig() - ) -> (server: InProcessServerTransport, client: InProcessClientTransport) { - let server = InProcessServerTransport() - let client = InProcessClientTransport( - server: server, - serviceConfig: serviceConfig - ) - return (server, client) + public init(serviceConfig: ServiceConfig = ServiceConfig()) { + self.server = Self.Server() + self.client = Self.Client(server: self.server, serviceConfig: serviceConfig) } } diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift index 0500b23ec..4198c9b31 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift @@ -18,10 +18,10 @@ import GRPCCore import GRPCInProcessTransport @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -extension InProcessServerTransport { +extension InProcessTransport.Server { func spawnClientTransport( throttle: RetryThrottle = RetryThrottle(maxTokens: 10, tokenRatio: 0.1) - ) -> InProcessClientTransport { - return InProcessClientTransport(server: self) + ) -> InProcessTransport.Client { + return InProcessTransport.Client(server: self) } } diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift index 06f200ae6..7bfef6912 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift @@ -48,13 +48,13 @@ struct ClientRPCExecutorTestHarness { switch transport { case .inProcess: - let server = InProcessServerTransport() + let server = InProcessTransport.Server() let client = server.spawnClientTransport() self.serverTransport = StreamCountingServerTransport(wrapping: server) self.clientTransport = StreamCountingClientTransport(wrapping: client) case .throwsOnStreamCreation(let code): - let server = InProcessServerTransport() // Will never be called. + let server = InProcessTransport.Server() // Will never be called. let client = ThrowOnStreamCreationTransport(code: code) self.serverTransport = StreamCountingServerTransport(wrapping: server) self.clientTransport = StreamCountingClientTransport(wrapping: client) diff --git a/Tests/GRPCCoreTests/GRPCClientTests.swift b/Tests/GRPCCoreTests/GRPCClientTests.swift index af566279d..b5558ae48 100644 --- a/Tests/GRPCCoreTests/GRPCClientTests.swift +++ b/Tests/GRPCCoreTests/GRPCClientTests.swift @@ -25,7 +25,7 @@ final class GRPCClientTests: XCTestCase { interceptors: [any ClientInterceptor] = [], _ body: (GRPCClient, GRPCServer) async throws -> Void ) async throws { - let inProcess = InProcessTransport.makePair() + let inProcess = InProcessTransport() let client = GRPCClient(transport: inProcess.client, interceptors: interceptors) let server = GRPCServer(transport: inProcess.server, services: services) @@ -329,7 +329,7 @@ final class GRPCClientTests: XCTestCase { } func testCancelRunningClient() async throws { - let inProcess = InProcessTransport.makePair() + let inProcess = InProcessTransport() let client = GRPCClient(transport: inProcess.client) try await withThrowingTaskGroup(of: Void.self) { group in @@ -377,8 +377,8 @@ final class GRPCClientTests: XCTestCase { } func testRunStoppedClient() async throws { - let (_, clientTransport) = InProcessTransport.makePair() - let client = GRPCClient(transport: clientTransport) + let inProcess = InProcessTransport() + let client = GRPCClient(transport: inProcess.client) // Run the client. let task = Task { try await client.run() } task.cancel() @@ -393,8 +393,8 @@ final class GRPCClientTests: XCTestCase { } func testRunAlreadyRunningClient() async throws { - let (_, clientTransport) = InProcessTransport.makePair() - let client = GRPCClient(transport: clientTransport) + let inProcess = InProcessTransport() + let client = GRPCClient(transport: inProcess.client) // Run the client. let task = Task { try await client.run() } // Make sure the client is run for the first time here. diff --git a/Tests/GRPCCoreTests/GRPCServerTests.swift b/Tests/GRPCCoreTests/GRPCServerTests.swift index d8771d81e..a513f5d15 100644 --- a/Tests/GRPCCoreTests/GRPCServerTests.swift +++ b/Tests/GRPCCoreTests/GRPCServerTests.swift @@ -23,9 +23,9 @@ final class GRPCServerTests: XCTestCase { func withInProcessClientConnectedToServer( services: [any RegistrableRPCService], interceptors: [any ServerInterceptor] = [], - _ body: (InProcessClientTransport, GRPCServer) async throws -> Void + _ body: (InProcessTransport.Client, GRPCServer) async throws -> Void ) async throws { - let inProcess = InProcessTransport.makePair() + let inProcess = InProcessTransport() let server = GRPCServer( transport: inProcess.server, services: services, @@ -317,7 +317,7 @@ final class GRPCServerTests: XCTestCase { } func testCancelRunningServer() async throws { - let inProcess = InProcessTransport.makePair() + let inProcess = InProcessTransport() let task = Task { let server = GRPCServer(transport: inProcess.server, services: [BinaryEcho()]) try await server.serve() @@ -338,7 +338,7 @@ final class GRPCServerTests: XCTestCase { } func testTestRunStoppedServer() async throws { - let server = GRPCServer(transport: InProcessServerTransport(), services: []) + let server = GRPCServer(transport: InProcessTransport.Server(), services: []) // Run the server. let task = Task { try await server.serve() } task.cancel() diff --git a/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift b/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift index d33b2774d..5730ccf46 100644 --- a/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift +++ b/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift @@ -149,7 +149,7 @@ final class InProcessClientTransportTests: XCTestCase { } func testOpenStreamSuccessfullyAndThenClose() async throws { - let server = InProcessServerTransport() + let server = InProcessTransport.Server() let client = makeClient(server: server) try await withThrowingTaskGroup(of: Void.self) { group in @@ -208,8 +208,8 @@ final class InProcessClientTransportTests: XCTestCase { ] ) - var client = InProcessClientTransport( - server: InProcessServerTransport(), + var client = InProcessTransport.Client( + server: InProcessTransport.Server(), serviceConfig: serviceConfig ) @@ -232,8 +232,8 @@ final class InProcessClientTransportTests: XCTestCase { executionPolicy: .retry(retryPolicy) ) serviceConfig.methodConfig.append(overrideConfiguration) - client = InProcessClientTransport( - server: InProcessServerTransport(), + client = InProcessTransport.Client( + server: InProcessTransport.Server(), serviceConfig: serviceConfig ) @@ -249,7 +249,7 @@ final class InProcessClientTransportTests: XCTestCase { } func testOpenMultipleStreamsThenClose() async throws { - let server = InProcessServerTransport() + let server = InProcessTransport.Server() let client = makeClient(server: server) try await withThrowingTaskGroup(of: Void.self) { group in @@ -285,8 +285,8 @@ final class InProcessClientTransportTests: XCTestCase { } func makeClient( - server: InProcessServerTransport = InProcessServerTransport() - ) -> InProcessClientTransport { + server: InProcessTransport.Server = InProcessTransport.Server() + ) -> InProcessTransport.Client { let defaultPolicy = RetryPolicy( maxAttempts: 10, initialBackoff: .seconds(1), @@ -304,7 +304,7 @@ final class InProcessClientTransportTests: XCTestCase { ] ) - return InProcessClientTransport( + return InProcessTransport.Client( server: server, serviceConfig: serviceConfig ) diff --git a/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift b/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift index 7cd8fed20..5d451fb9d 100644 --- a/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift +++ b/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift @@ -22,7 +22,7 @@ import XCTest @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) final class InProcessServerTransportTests: XCTestCase { func testStartListening() async throws { - let transport = InProcessServerTransport() + let transport = InProcessTransport.Server() let outbound = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) let stream = RPCStream< @@ -54,7 +54,7 @@ final class InProcessServerTransportTests: XCTestCase { } func testStopListening() async throws { - let transport = InProcessServerTransport() + let transport = InProcessTransport.Server() let firstStreamOutbound = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self) let firstStream = RPCStream<