Skip to content

Commit 3b0fe70

Browse files
authored
Add 'with-' methods for client and server (#2121)
Motivation: In some situations, like examples, testing, and prototyping, it can be useful to have a client and server with scoped lifetimes. This is all achievable using task groups but in a number of situations having helpers is also useful. Modifications: - Add 'with-' methods for client and server - Update docs Result: Easier to use API for some scenarios.
1 parent 8d0bf6f commit 3b0fe70

File tree

5 files changed

+234
-105
lines changed

5 files changed

+234
-105
lines changed

Sources/GRPCCore/GRPCClient.swift

Lines changed: 72 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -28,79 +28,25 @@ private import Synchronization
2828
///
2929
/// However, in most cases you should prefer wrapping the ``GRPCClient`` with a generated stub.
3030
///
31-
/// You can set ``ServiceConfig``s on this client to override whatever configurations have been
32-
/// set on the given transport. You can also use ``ClientInterceptor``s to implement cross-cutting
33-
/// logic which apply to all RPCs. Example uses of interceptors include authentication and logging.
31+
/// ## Creating a client
3432
///
35-
/// ## Creating and configuring a client
36-
///
37-
/// The following example demonstrates how to create and configure a client.
33+
/// You can create and run a client using ``withGRPCClient(transport:interceptors:isolation:handleClient:)``
34+
/// or ``withGRPCClient(transport:interceptorPipeline:isolation:handleClient:)`` which create, configure and
35+
/// run the client providing scoped access to it via the `handleClient` closure. The client will
36+
/// begin gracefully shutting down when the closure returns.
3837
///
3938
/// ```swift
40-
/// // Create a configuration object for the client and override the timeout for the 'Get' method on
41-
/// // the 'echo.Echo' service. This configuration takes precedence over any set by the transport.
42-
/// var configuration = GRPCClient.Configuration()
43-
/// configuration.service.override = ServiceConfig(
44-
/// methodConfig: [
45-
/// MethodConfig(
46-
/// names: [
47-
/// MethodConfig.Name(service: "echo.Echo", method: "Get")
48-
/// ],
49-
/// timeout: .seconds(5)
50-
/// )
51-
/// ]
52-
/// )
53-
///
54-
/// // Configure a fallback timeout for all RPCs (indicated by an empty service and method name) if
55-
/// // no configuration is provided in the overrides or by the transport.
56-
/// configuration.service.defaults = ServiceConfig(
57-
/// methodConfig: [
58-
/// MethodConfig(
59-
/// names: [
60-
/// MethodConfig.Name(service: "", method: "")
61-
/// ],
62-
/// timeout: .seconds(10)
63-
/// )
64-
/// ]
65-
/// )
66-
///
67-
/// // Finally create a transport and instantiate the client, adding an interceptor.
68-
/// let inProcessTransport = InProcessTransport()
69-
///
70-
/// let client = GRPCClient(
71-
/// transport: inProcessTransport.client,
72-
/// interceptors: [StatsRecordingClientInterceptor()],
73-
/// configuration: configuration
74-
/// )
39+
/// let transport: any ClientTransport = ...
40+
/// try await withGRPCClient(transport: transport) { client in
41+
/// // ...
42+
/// }
7543
/// ```
7644
///
77-
/// ## Starting and stopping the client
45+
/// ## Creating a client manually
7846
///
79-
/// Once you have configured the client, call ``run()`` to start it. Calling ``run()`` instructs the
80-
/// transport to start connecting to the server.
81-
///
82-
/// ```swift
83-
/// // Start running the client. 'run()' must be running while RPCs are execute so it's executed in
84-
/// // a task group.
85-
/// try await withThrowingTaskGroup(of: Void.self) { group in
86-
/// group.addTask {
87-
/// try await client.run()
88-
/// }
89-
///
90-
/// // Execute a request against the "echo.Echo" service.
91-
/// try await client.unary(
92-
/// request: ClientRequest<[UInt8]>(message: [72, 101, 108, 108, 111, 33]),
93-
/// descriptor: MethodDescriptor(service: "echo.Echo", method: "Get"),
94-
/// serializer: IdentitySerializer(),
95-
/// deserializer: IdentityDeserializer(),
96-
/// ) { response in
97-
/// print(response.message)
98-
/// }
99-
///
100-
/// // The RPC has completed, close the client.
101-
/// client.beginGracefulShutdown()
102-
/// }
103-
/// ```
47+
/// If the `with`-style methods for creating clients isn't suitable for your application then you
48+
/// can create and run a client manually. This requires you to call the ``run()`` method in a task
49+
/// which instructs the client to start connecting to the server.
10450
///
10551
/// The ``run()`` method won't return until the client has finished handling all requests. You can
10652
/// signal to the client that it should stop creating new request streams by calling ``beginGracefulShutdown()``.
@@ -425,3 +371,62 @@ public final class GRPCClient: Sendable {
425371
)
426372
}
427373
}
374+
375+
/// Creates and runs a new client with the given transport and interceptors.
376+
///
377+
/// - Parameters:
378+
/// - transport: The transport used to establish a communication channel with a server.
379+
/// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each
380+
/// accepted RPC. The order in which interceptors are added reflects the order in which they
381+
/// are called. The first interceptor added will be the first interceptor to intercept each
382+
/// request. The last interceptor added will be the final interceptor to intercept each
383+
/// request before calling the appropriate handler.
384+
/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
385+
/// code is nonisolated.
386+
/// - handleClient: A closure which is called with the client. When the closure returns, the
387+
/// client is shutdown gracefully.
388+
public func withGRPCClient<Result: Sendable>(
389+
transport: some ClientTransport,
390+
interceptors: [any ClientInterceptor] = [],
391+
isolation: isolated (any Actor)? = #isolation,
392+
handleClient: (GRPCClient) async throws -> Result
393+
) async throws -> Result {
394+
try await withGRPCClient(
395+
transport: transport,
396+
interceptorPipeline: interceptors.map { .apply($0, to: .all) },
397+
isolation: isolation,
398+
handleClient: handleClient
399+
)
400+
}
401+
402+
/// Creates and runs a new client with the given transport and interceptors.
403+
///
404+
/// - Parameters:
405+
/// - transport: The transport used to establish a communication channel with a server.
406+
/// - interceptorPipeline: A collection of ``ClientInterceptorPipelineOperation`` providing cross-cutting
407+
/// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC.
408+
/// The order in which interceptors are added reflects the order in which they are called.
409+
/// The first interceptor added will be the first interceptor to intercept each request.
410+
/// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler.
411+
/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
412+
/// code is nonisolated.
413+
/// - handleClient: A closure which is called with the client. When the closure returns, the
414+
/// client is shutdown gracefully.
415+
/// - Returns: The result of the `handleClient` closure.
416+
public func withGRPCClient<Result: Sendable>(
417+
transport: some ClientTransport,
418+
interceptorPipeline: [ClientInterceptorPipelineOperation],
419+
isolation: isolated (any Actor)? = #isolation,
420+
handleClient: (GRPCClient) async throws -> Result
421+
) async throws -> Result {
422+
try await withThrowingDiscardingTaskGroup { group in
423+
let client = GRPCClient(transport: transport, interceptorPipeline: interceptorPipeline)
424+
group.addTask {
425+
try await client.run()
426+
}
427+
428+
let result = try await handleClient(client)
429+
client.beginGracefulShutdown()
430+
return result
431+
}
432+
}

Sources/GRPCCore/GRPCServer.swift

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ private import Synchronization
2929
/// include request filtering, authentication, and logging. Once requests have been intercepted
3030
/// they are passed to a handler which in turn returns a response to send back to the client.
3131
///
32-
/// ## Creating and configuring a server
32+
/// ## Configuring and starting a server
3333
///
34-
/// The following example demonstrates how to create and configure a server.
34+
/// The following example demonstrates how to create and run a server.
3535
///
3636
/// ```swift
37-
/// // Create and an in-process transport.
38-
/// let inProcessTransport = InProcessTransport()
37+
/// // Create an transport
38+
/// let transport: any ServerTransport = ...
3939
///
4040
/// // Create the 'Greeter' and 'Echo' services.
4141
/// let greeter = GreeterService()
@@ -44,19 +44,24 @@ private import Synchronization
4444
/// // Create an interceptor.
4545
/// let statsRecorder = StatsRecordingServerInterceptors()
4646
///
47-
/// // Finally create the server.
48-
/// let server = GRPCServer(
49-
/// transport: inProcessTransport.server,
47+
/// // Run the server.
48+
/// try await withGRPCServer(
49+
/// transport: transport,
5050
/// services: [greeter, echo],
5151
/// interceptors: [statsRecorder]
52-
/// )
52+
/// ) { server in
53+
/// // ...
54+
/// // The server begins shutting down when this closure returns
55+
/// // ...
56+
/// }
5357
/// ```
5458
///
55-
/// ## Starting and stopping the server
59+
/// ## Creating a client manually
5660
///
57-
/// Once you have configured the server call ``serve()`` to start it. Calling ``serve()`` starts the server's
58-
/// transport too. A ``RuntimeError`` is thrown if the transport can't be started or encounters some other
59-
/// runtime error.
61+
/// If the `with`-style methods for creating a server isn't suitable for your application then you
62+
/// can create and run it manually. This requires you to call the ``serve()`` method in a task
63+
/// which instructs the server to start its transport and listen for new RPCs. A ``RuntimeError`` is
64+
/// thrown if the transport can't be started or encounters some other runtime error.
6065
///
6166
/// ```swift
6267
/// // Start running the server.
@@ -235,3 +240,73 @@ public final class GRPCServer: Sendable {
235240
}
236241
}
237242
}
243+
244+
/// Creates and runs a gRPC server.
245+
///
246+
/// - Parameters:
247+
/// - transport: The transport the server should listen on.
248+
/// - services: Services offered by the server.
249+
/// - interceptors: A collection of interceptors providing cross-cutting functionality to each
250+
/// accepted RPC. The order in which interceptors are added reflects the order in which they
251+
/// are called. The first interceptor added will be the first interceptor to intercept each
252+
/// request. The last interceptor added will be the final interceptor to intercept each
253+
/// request before calling the appropriate handler.
254+
/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
255+
/// code is nonisolated.
256+
/// - handleServer: A closure which is called with the server. When the closure returns, the
257+
/// server is shutdown gracefully.
258+
/// - Returns: The result of the `handleServer` closure.
259+
public func withGRPCServer<Result: Sendable>(
260+
transport: any ServerTransport,
261+
services: [any RegistrableRPCService],
262+
interceptors: [any ServerInterceptor] = [],
263+
isolation: isolated (any Actor)? = #isolation,
264+
handleServer: (GRPCServer) async throws -> Result
265+
) async throws -> Result {
266+
try await withGRPCServer(
267+
transport: transport,
268+
services: services,
269+
interceptorPipeline: interceptors.map { .apply($0, to: .all) },
270+
isolation: isolation,
271+
handleServer: handleServer
272+
)
273+
}
274+
275+
/// Creates and runs a gRPC server.
276+
///
277+
/// - Parameters:
278+
/// - transport: The transport the server should listen on.
279+
/// - services: Services offered by the server.
280+
/// - interceptorPipeline: A collection of interceptors providing cross-cutting functionality to each
281+
/// accepted RPC. The order in which interceptors are added reflects the order in which they
282+
/// are called. The first interceptor added will be the first interceptor to intercept each
283+
/// request. The last interceptor added will be the final interceptor to intercept each
284+
/// request before calling the appropriate handler.
285+
/// - isolation: A reference to the actor to which the enclosing code is isolated, or nil if the
286+
/// code is nonisolated.
287+
/// - handleServer: A closure which is called with the server. When the closure returns, the
288+
/// server is shutdown gracefully.
289+
/// - Returns: The result of the `handleServer` closure.
290+
public func withGRPCServer<Result: Sendable>(
291+
transport: any ServerTransport,
292+
services: [any RegistrableRPCService],
293+
interceptorPipeline: [ServerInterceptorPipelineOperation],
294+
isolation: isolated (any Actor)? = #isolation,
295+
handleServer: (GRPCServer) async throws -> Result
296+
) async throws -> Result {
297+
return try await withThrowingDiscardingTaskGroup { group in
298+
let server = GRPCServer(
299+
transport: transport,
300+
services: services,
301+
interceptorPipeline: interceptorPipeline
302+
)
303+
304+
group.addTask {
305+
try await server.serve()
306+
}
307+
308+
let result = try await handleServer(server)
309+
server.beginGracefulShutdown()
310+
return result
311+
}
312+
}

Tests/GRPCCoreTests/GRPCClientTests.swift

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,17 @@ final class GRPCClientTests: XCTestCase {
2929
let client = GRPCClient(transport: inProcess.client, interceptorPipeline: interceptorPipeline)
3030
let server = GRPCServer(transport: inProcess.server, services: services)
3131

32-
try await withThrowingTaskGroup(of: Void.self) { group in
33-
group.addTask {
34-
try await server.serve()
35-
}
36-
37-
group.addTask {
38-
try await client.run()
32+
try await withGRPCServer(
33+
transport: inProcess.server,
34+
services: services
35+
) { server in
36+
try await withGRPCClient(
37+
transport: inProcess.client,
38+
interceptorPipeline: interceptorPipeline
39+
) { client in
40+
try await Task.sleep(for: .milliseconds(100))
41+
try await body(client, server)
3942
}
40-
41-
// Make sure both server and client are running
42-
try await Task.sleep(for: .milliseconds(100))
43-
try await body(client, server)
44-
client.beginGracefulShutdown()
45-
server.beginGracefulShutdown()
4643
}
4744
}
4845

Tests/GRPCCoreTests/GRPCServerTests.swift

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,24 +26,20 @@ final class GRPCServerTests: XCTestCase {
2626
_ body: (InProcessTransport.Client, GRPCServer) async throws -> Void
2727
) async throws {
2828
let inProcess = InProcessTransport()
29-
let server = GRPCServer(
29+
30+
try await withGRPCServer(
3031
transport: inProcess.server,
3132
services: services,
3233
interceptorPipeline: interceptorPipeline
33-
)
34-
35-
try await withThrowingTaskGroup(of: Void.self) { group in
36-
group.addTask {
37-
try await server.serve()
38-
}
34+
) { server in
35+
try await withThrowingTaskGroup(of: Void.self) { group in
36+
group.addTask {
37+
try await inProcess.client.connect()
38+
}
3939

40-
group.addTask {
41-
try await inProcess.client.connect()
40+
try await body(inProcess.client, server)
41+
inProcess.client.beginGracefulShutdown()
4242
}
43-
44-
try await body(inProcess.client, server)
45-
inProcess.client.beginGracefulShutdown()
46-
server.beginGracefulShutdown()
4743
}
4844
}
4945

0 commit comments

Comments
 (0)