From 706f5b4d020b2435c075791c60d01b56ef1b418f Mon Sep 17 00:00:00 2001 From: George Barnett Date: Mon, 18 Nov 2024 11:11:31 +0000 Subject: [PATCH 1/4] Add 'with-' methods for client and server Motivation: In some situations, like examples, testing, and prototyping, it can be useful to have a client and server with scoped lifetimes. This is all achievable using task groups but in a number of situations having helpers is also useful. Modifications: - Add 'with-' methods for client and server - Update docs Result: Easier to use API for some scenarios. --- Sources/GRPCCore/GRPCClient.swift | 132 +++++++++++----------- Sources/GRPCCore/GRPCServer.swift | 92 +++++++++++++-- Tests/GRPCCoreTests/GRPCClientTests.swift | 23 ++-- Tests/GRPCCoreTests/GRPCServerTests.swift | 22 ++-- 4 files changed, 164 insertions(+), 105 deletions(-) diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift index 79e3deb4a..fc8ab4c19 100644 --- a/Sources/GRPCCore/GRPCClient.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -28,79 +28,25 @@ private import Synchronization /// /// However, in most cases you should prefer wrapping the ``GRPCClient`` with a generated stub. /// -/// You can set ``ServiceConfig``s on this client to override whatever configurations have been -/// set on the given transport. You can also use ``ClientInterceptor``s to implement cross-cutting -/// logic which apply to all RPCs. Example uses of interceptors include authentication and logging. +/// ## Creating a client /// -/// ## Creating and configuring a client -/// -/// The following example demonstrates how to create and configure a client. +/// You can create and run a client using ``withGRPCClient(transport:interceptors:handleClient:)`` +/// or ``withGRPCClient(transport:interceptorPipeline:handleClient:)`` which create, configure and +/// run the client providing scoped access to it via the `handleClient` closure. The client will +/// begin gracefully shutting down when the closure returns. /// /// ```swift -/// // Create a configuration object for the client and override the timeout for the 'Get' method on -/// // the 'echo.Echo' service. This configuration takes precedence over any set by the transport. -/// var configuration = GRPCClient.Configuration() -/// configuration.service.override = ServiceConfig( -/// methodConfig: [ -/// MethodConfig( -/// names: [ -/// MethodConfig.Name(service: "echo.Echo", method: "Get") -/// ], -/// timeout: .seconds(5) -/// ) -/// ] -/// ) -/// -/// // Configure a fallback timeout for all RPCs (indicated by an empty service and method name) if -/// // no configuration is provided in the overrides or by the transport. -/// configuration.service.defaults = ServiceConfig( -/// methodConfig: [ -/// MethodConfig( -/// names: [ -/// MethodConfig.Name(service: "", method: "") -/// ], -/// timeout: .seconds(10) -/// ) -/// ] -/// ) -/// -/// // Finally create a transport and instantiate the client, adding an interceptor. -/// let inProcessTransport = InProcessTransport() -/// -/// let client = GRPCClient( -/// transport: inProcessTransport.client, -/// interceptors: [StatsRecordingClientInterceptor()], -/// configuration: configuration -/// ) +/// let transport: any ClientTransport = ... +/// try await withGRPCClient(transport: transport) { client in +/// // ... +/// } /// ``` /// -/// ## Starting and stopping the client +/// ## Creating a client manually /// -/// Once you have configured the client, call ``run()`` to start it. Calling ``run()`` instructs the -/// transport to start connecting to the server. -/// -/// ```swift -/// // Start running the client. 'run()' must be running while RPCs are execute so it's executed in -/// // a task group. -/// try await withThrowingTaskGroup(of: Void.self) { group in -/// group.addTask { -/// try await client.run() -/// } -/// -/// // Execute a request against the "echo.Echo" service. -/// try await client.unary( -/// request: ClientRequest<[UInt8]>(message: [72, 101, 108, 108, 111, 33]), -/// descriptor: MethodDescriptor(service: "echo.Echo", method: "Get"), -/// serializer: IdentitySerializer(), -/// deserializer: IdentityDeserializer(), -/// ) { response in -/// print(response.message) -/// } -/// -/// // The RPC has completed, close the client. -/// client.beginGracefulShutdown() -/// } -/// ``` +/// If the `with`-style methods for creating clients isn't suitable for your application then you +/// can create and run a client manually. This requires you to call the ``run()`` method in a task +/// which instructs the client to start connecting to the server. /// /// The ``run()`` method won't return until the client has finished handling all requests. You can /// signal to the client that it should stop creating new request streams by calling ``beginGracefulShutdown()``. @@ -425,3 +371,55 @@ public final class GRPCClient: Sendable { ) } } + +/// Creates and runs a new client with the given transport and interceptors. +/// +/// - Parameters: +/// - transport: The transport used to establish a communication channel with a server. +/// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each +/// accepted RPC. The order in which interceptors are added reflects the order in which they +/// are called. The first interceptor added will be the first interceptor to intercept each +/// request. The last interceptor added will be the final interceptor to intercept each +/// request before calling the appropriate handler. +/// - handleClient: A closure which is called with the client. When the closure returns, the +/// client is shutdown gracefully. +public func withGRPCClient( + transport: some ClientTransport, + interceptors: [any ClientInterceptor] = [], + handleClient: (GRPCClient) async throws -> Result +) async throws -> Result { + try await withGRPCClient( + transport: transport, + interceptorPipeline: interceptors.map { .apply($0, to: .all) }, + handleClient: handleClient + ) +} + +/// Creates and runs a new client with the given transport and interceptors. +/// +/// - Parameters: +/// - transport: The transport used to establish a communication channel with a server. +/// - interceptorPipeline: A collection of ``ClientInterceptorPipelineOperation`` providing cross-cutting +/// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC. +/// The order in which interceptors are added reflects the order in which they are called. +/// The first interceptor added will be the first interceptor to intercept each request. +/// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler. +/// - handleClient: A closure which is called with the client. When the closure returns, the +/// client is shutdown gracefully. +/// - Returns: The result of the `handleClient` closure. +public func withGRPCClient( + transport: some ClientTransport, + interceptorPipeline: [ClientInterceptorPipelineOperation], + handleClient: (GRPCClient) async throws -> Result +) async throws -> Result { + try await withThrowingDiscardingTaskGroup { group in + let client = GRPCClient(transport: transport, interceptorPipeline: interceptorPipeline) + group.addTask { + try await client.run() + } + + let result = try await handleClient(client) + client.beginGracefulShutdown() + return result + } +} diff --git a/Sources/GRPCCore/GRPCServer.swift b/Sources/GRPCCore/GRPCServer.swift index 6ff82b9dd..4e997df09 100644 --- a/Sources/GRPCCore/GRPCServer.swift +++ b/Sources/GRPCCore/GRPCServer.swift @@ -29,13 +29,13 @@ private import Synchronization /// include request filtering, authentication, and logging. Once requests have been intercepted /// they are passed to a handler which in turn returns a response to send back to the client. /// -/// ## Creating and configuring a server +/// ## Configuring and starting a server /// -/// The following example demonstrates how to create and configure a server. +/// The following example demonstrates how to create and run a server. /// /// ```swift -/// // Create and an in-process transport. -/// let inProcessTransport = InProcessTransport() +/// // Create an transport +/// let transport: any ServerTransport = ... /// /// // Create the 'Greeter' and 'Echo' services. /// let greeter = GreeterService() @@ -44,19 +44,24 @@ private import Synchronization /// // Create an interceptor. /// let statsRecorder = StatsRecordingServerInterceptors() /// -/// // Finally create the server. -/// let server = GRPCServer( -/// transport: inProcessTransport.server, +/// // Run the server. +/// try await withGRPCServer( +/// transport: transport, /// services: [greeter, echo], /// interceptors: [statsRecorder] -/// ) +/// ) { server in +/// // ... +/// // The server begins shutting down when this closure returns +/// // ... +/// } /// ``` /// -/// ## Starting and stopping the server +/// ## Creating a client manually /// -/// Once you have configured the server call ``serve()`` to start it. Calling ``serve()`` starts the server's -/// transport too. A ``RuntimeError`` is thrown if the transport can't be started or encounters some other -/// runtime error. +/// If the `with`-style methods for creating a server isn't suitable for your application then you +/// can create and run it manually. This requires you to call the ``serve()`` method in a task +/// which instructs the server to start its transport and listen for new RPCs. A ``RuntimeError`` is +/// thrown if the transport can't be started or encounters some other runtime error. /// /// ```swift /// // Start running the server. @@ -235,3 +240,66 @@ public final class GRPCServer: Sendable { } } } + +/// Creates and runs a gRPC server. +/// +/// - Parameters: +/// - transport: The transport the server should listen on. +/// - services: Services offered by the server. +/// - interceptors: A collection of interceptors providing cross-cutting functionality to each +/// accepted RPC. The order in which interceptors are added reflects the order in which they +/// are called. The first interceptor added will be the first interceptor to intercept each +/// request. The last interceptor added will be the final interceptor to intercept each +/// request before calling the appropriate handler. +/// - handleServer: A closure which is called with the server. When the closure returns, the +/// server is shutdown gracefully. +/// - Returns: The result of the `handleServer` closure. +public func withGRPCServer( + transport: any ServerTransport, + services: [any RegistrableRPCService], + interceptors: [any ServerInterceptor] = [], + handleServer: (GRPCServer) async throws -> Result +) async throws -> Result { + try await withGRPCServer( + transport: transport, + services: services, + interceptorPipeline: interceptors.map { .apply($0, to: .all) }, + handleServer: handleServer + ) +} + +/// Creates and runs a gRPC server. +/// +/// - Parameters: +/// - transport: The transport the server should listen on. +/// - services: Services offered by the server. +/// - interceptorPipeline: A collection of interceptors providing cross-cutting functionality to each +/// accepted RPC. The order in which interceptors are added reflects the order in which they +/// are called. The first interceptor added will be the first interceptor to intercept each +/// request. The last interceptor added will be the final interceptor to intercept each +/// request before calling the appropriate handler. +/// - handleServer: A closure which is called with the server. When the closure returns, the +/// server is shutdown gracefully. +/// - Returns: The result of the `handleServer` closure. +public func withGRPCServer( + transport: any ServerTransport, + services: [any RegistrableRPCService], + interceptorPipeline: [ServerInterceptorPipelineOperation], + handleServer: (GRPCServer) async throws -> Result +) async throws -> Result { + return try await withThrowingDiscardingTaskGroup { group in + let server = GRPCServer( + transport: transport, + services: services, + interceptorPipeline: interceptorPipeline + ) + + group.addTask { + try await server.serve() + } + + let result = try await handleServer(server) + server.beginGracefulShutdown() + return result + } +} diff --git a/Tests/GRPCCoreTests/GRPCClientTests.swift b/Tests/GRPCCoreTests/GRPCClientTests.swift index ed5396da1..ca8331b61 100644 --- a/Tests/GRPCCoreTests/GRPCClientTests.swift +++ b/Tests/GRPCCoreTests/GRPCClientTests.swift @@ -29,20 +29,17 @@ final class GRPCClientTests: XCTestCase { let client = GRPCClient(transport: inProcess.client, interceptorPipeline: interceptorPipeline) let server = GRPCServer(transport: inProcess.server, services: services) - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - try await server.serve() - } - - group.addTask { - try await client.run() + try await withGRPCServer( + transport: inProcess.server, + services: services + ) { server in + try await withGRPCClient( + transport: inProcess.client, + interceptorPipeline: interceptorPipeline + ) { client in + try await Task.sleep(for: .milliseconds(100)) + try await body(client, server) } - - // Make sure both server and client are running - try await Task.sleep(for: .milliseconds(100)) - try await body(client, server) - client.beginGracefulShutdown() - server.beginGracefulShutdown() } } diff --git a/Tests/GRPCCoreTests/GRPCServerTests.swift b/Tests/GRPCCoreTests/GRPCServerTests.swift index 9b20785d5..b61fb2022 100644 --- a/Tests/GRPCCoreTests/GRPCServerTests.swift +++ b/Tests/GRPCCoreTests/GRPCServerTests.swift @@ -26,24 +26,20 @@ final class GRPCServerTests: XCTestCase { _ body: (InProcessTransport.Client, GRPCServer) async throws -> Void ) async throws { let inProcess = InProcessTransport() - let server = GRPCServer( + + try await withGRPCServer( transport: inProcess.server, services: services, interceptorPipeline: interceptorPipeline - ) - - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - try await server.serve() - } + ) { server in + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await inProcess.client.connect() + } - group.addTask { - try await inProcess.client.connect() + try await body(inProcess.client, server) + inProcess.client.beginGracefulShutdown() } - - try await body(inProcess.client, server) - inProcess.client.beginGracefulShutdown() - server.beginGracefulShutdown() } } From 94fce5f55cc5b3ce3d34c5e33d3f221b988f89f5 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Wed, 20 Nov 2024 17:33:16 +0000 Subject: [PATCH 2/4] Add isolation --- Sources/GRPCCore/GRPCClient.swift | 3 ++ Sources/GRPCCore/GRPCServer.swift | 3 ++ .../ClientServerWithMethods.swift | 44 +++++++++++++++++++ 3 files changed, 50 insertions(+) create mode 100644 Tests/GRPCInProcessTransportTests/ClientServerWithMethods.swift diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift index fc8ab4c19..b5cb28cf2 100644 --- a/Sources/GRPCCore/GRPCClient.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -386,11 +386,13 @@ public final class GRPCClient: Sendable { public func withGRPCClient( transport: some ClientTransport, interceptors: [any ClientInterceptor] = [], + isolation: isolated (any Actor)? = #isolation, handleClient: (GRPCClient) async throws -> Result ) async throws -> Result { try await withGRPCClient( transport: transport, interceptorPipeline: interceptors.map { .apply($0, to: .all) }, + isolation: isolation, handleClient: handleClient ) } @@ -410,6 +412,7 @@ public func withGRPCClient( public func withGRPCClient( transport: some ClientTransport, interceptorPipeline: [ClientInterceptorPipelineOperation], + isolation: isolated (any Actor)? = #isolation, handleClient: (GRPCClient) async throws -> Result ) async throws -> Result { try await withThrowingDiscardingTaskGroup { group in diff --git a/Sources/GRPCCore/GRPCServer.swift b/Sources/GRPCCore/GRPCServer.swift index 4e997df09..2b7054b40 100644 --- a/Sources/GRPCCore/GRPCServer.swift +++ b/Sources/GRPCCore/GRPCServer.swift @@ -258,12 +258,14 @@ public func withGRPCServer( transport: any ServerTransport, services: [any RegistrableRPCService], interceptors: [any ServerInterceptor] = [], + isolation: isolated (any Actor)? = #isolation, handleServer: (GRPCServer) async throws -> Result ) async throws -> Result { try await withGRPCServer( transport: transport, services: services, interceptorPipeline: interceptors.map { .apply($0, to: .all) }, + isolation: isolation, handleServer: handleServer ) } @@ -285,6 +287,7 @@ public func withGRPCServer( transport: any ServerTransport, services: [any RegistrableRPCService], interceptorPipeline: [ServerInterceptorPipelineOperation], + isolation: isolated (any Actor)? = #isolation, handleServer: (GRPCServer) async throws -> Result ) async throws -> Result { return try await withThrowingDiscardingTaskGroup { group in diff --git a/Tests/GRPCInProcessTransportTests/ClientServerWithMethods.swift b/Tests/GRPCInProcessTransportTests/ClientServerWithMethods.swift new file mode 100644 index 000000000..3f801d501 --- /dev/null +++ b/Tests/GRPCInProcessTransportTests/ClientServerWithMethods.swift @@ -0,0 +1,44 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +import GRPCInProcessTransport +import Testing + +@Suite("withGRPCServer / withGRPCClient") +struct WithMethods { + @Test("Actor isolation") + func actorIsolation() async throws { + let testActor = TestActor() + #expect(await !testActor.hasRun) + try await testActor.run() + #expect(await testActor.hasRun) + } +} + +fileprivate actor TestActor { + private(set) var hasRun = false + + func run() async throws { + let inProcess = InProcessTransport() + + try await withGRPCServer(transport: inProcess.server, services: []) { server in + try await withGRPCClient(transport: inProcess.client) { client in + self.hasRun = true + } + } + } +} From f81410f5cd7ad5b0076982c9794cc02ba7cc676b Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 21 Nov 2024 09:22:03 +0000 Subject: [PATCH 3/4] deflake test --- .../ClientServerWithMethods.swift | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/Tests/GRPCInProcessTransportTests/ClientServerWithMethods.swift b/Tests/GRPCInProcessTransportTests/ClientServerWithMethods.swift index 3f801d501..930b18183 100644 --- a/Tests/GRPCInProcessTransportTests/ClientServerWithMethods.swift +++ b/Tests/GRPCInProcessTransportTests/ClientServerWithMethods.swift @@ -36,8 +36,20 @@ fileprivate actor TestActor { let inProcess = InProcessTransport() try await withGRPCServer(transport: inProcess.server, services: []) { server in - try await withGRPCClient(transport: inProcess.client) { client in - self.hasRun = true + do { + try await withGRPCClient(transport: inProcess.client) { client in + self.hasRun = true + } + } catch { + // Starting the client can race with the closure returning which begins graceful shutdown. + // If that happens the client run method will throw an error as the client is being run + // when it's already been shutdown. That's okay and expected so rather than slowing down + // the closure tolerate that specific error. + if let error = error as? RuntimeError { + #expect(error.code == .clientIsStopped) + } else { + Issue.record(error) + } } } } From a40e45fdef902a061529067b248fee1d6d9ecb1d Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 21 Nov 2024 10:19:10 +0000 Subject: [PATCH 4/4] fix docs --- Sources/GRPCCore/GRPCClient.swift | 8 ++++++-- Sources/GRPCCore/GRPCServer.swift | 4 ++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift index b5cb28cf2..0ac39c8e0 100644 --- a/Sources/GRPCCore/GRPCClient.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -30,8 +30,8 @@ private import Synchronization /// /// ## Creating a client /// -/// You can create and run a client using ``withGRPCClient(transport:interceptors:handleClient:)`` -/// or ``withGRPCClient(transport:interceptorPipeline:handleClient:)`` which create, configure and +/// You can create and run a client using ``withGRPCClient(transport:interceptors:isolation:handleClient:)`` +/// or ``withGRPCClient(transport:interceptorPipeline:isolation:handleClient:)`` which create, configure and /// run the client providing scoped access to it via the `handleClient` closure. The client will /// begin gracefully shutting down when the closure returns. /// @@ -381,6 +381,8 @@ public final class GRPCClient: Sendable { /// are called. The first interceptor added will be the first interceptor to intercept each /// request. The last interceptor added will be the final interceptor to intercept each /// request before calling the appropriate handler. +/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the +/// code is nonisolated. /// - handleClient: A closure which is called with the client. When the closure returns, the /// client is shutdown gracefully. public func withGRPCClient( @@ -406,6 +408,8 @@ public func withGRPCClient( /// The order in which interceptors are added reflects the order in which they are called. /// The first interceptor added will be the first interceptor to intercept each request. /// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler. +/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the +/// code is nonisolated. /// - handleClient: A closure which is called with the client. When the closure returns, the /// client is shutdown gracefully. /// - Returns: The result of the `handleClient` closure. diff --git a/Sources/GRPCCore/GRPCServer.swift b/Sources/GRPCCore/GRPCServer.swift index 2b7054b40..f8f576e65 100644 --- a/Sources/GRPCCore/GRPCServer.swift +++ b/Sources/GRPCCore/GRPCServer.swift @@ -251,6 +251,8 @@ public final class GRPCServer: Sendable { /// are called. The first interceptor added will be the first interceptor to intercept each /// request. The last interceptor added will be the final interceptor to intercept each /// request before calling the appropriate handler. +/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the +/// code is nonisolated. /// - handleServer: A closure which is called with the server. When the closure returns, the /// server is shutdown gracefully. /// - Returns: The result of the `handleServer` closure. @@ -280,6 +282,8 @@ public func withGRPCServer( /// are called. The first interceptor added will be the first interceptor to intercept each /// request. The last interceptor added will be the final interceptor to intercept each /// request before calling the appropriate handler. +/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the +/// code is nonisolated. /// - handleServer: A closure which is called with the server. When the closure returns, the /// server is shutdown gracefully. /// - Returns: The result of the `handleServer` closure.