Skip to content

Commit e160fd0

Browse files
authored
Allow adding ClientInterceptors to specific services and methods (#2113)
## Motivation We want to allow users to customise the RPCs a registered interceptor should apply to on the client: - Intercept all requests - Intercept requests only meant for specific services - Intercept requests only meant for specific methods ## Modifications This PR adds a new `ClientInterceptorPipelineOperation` type that allows users to specify what the target of the interceptor should be. Existing APIs accepting `[any ClientInterceptor]` have been kept, but new initialisers taking `[ClientInterceptorPipelineOperation]` instead have been added. ## Result Users can have more control over to which requests interceptors are applied.
1 parent c41e0a7 commit e160fd0

File tree

8 files changed

+476
-30
lines changed

8 files changed

+476
-30
lines changed

Sources/GRPCCore/Call/Client/ClientInterceptor.swift

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@
2121
/// received from the transport. They are typically used for cross-cutting concerns like injecting
2222
/// metadata, validating messages, logging additional data, and tracing.
2323
///
24-
/// Interceptors are registered with a client and apply to all RPCs. If you need to modify the
25-
/// behavior of an interceptor on a per-RPC basis then you can use the
26-
/// ``ClientContext/descriptor`` to determine which RPC is being called and
27-
/// conditionalise behavior accordingly.
24+
/// Interceptors are registered with the server via ``ClientInterceptorPipelineOperation``s.
25+
/// You may register them for all services registered with a server, for RPCs directed to specific services, or
26+
/// for RPCs directed to specific methods. If you need to modify the behavior of an interceptor on a
27+
/// per-RPC basis in more detail, then you can use the ``ClientContext/descriptor`` to determine
28+
/// which RPC is being called and conditionalise behavior accordingly.
2829
///
2930
/// - TODO: Update example and documentation to show how to register an interceptor.
3031
///
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/// A `ClientInterceptorPipelineOperation` describes to which RPCs a client interceptor should be applied.
18+
///
19+
/// You can configure a client interceptor to be applied to:
20+
/// - all RPCs and services;
21+
/// - requests directed only to specific services; or
22+
/// - requests directed only to specific methods (of a specific service).
23+
///
24+
/// - SeeAlso: ``ClientInterceptor`` for more information on client interceptors, and
25+
/// ``ServerInterceptorPipelineOperation`` for the server-side version of this type.
26+
public struct ClientInterceptorPipelineOperation: Sendable {
27+
/// The subject of a ``ClientInterceptorPipelineOperation``.
28+
/// The subject of an interceptor can either be all services and methods, only specific services, or only specific methods.
29+
public struct Subject: Sendable {
30+
internal enum Wrapped: Sendable {
31+
case all
32+
case services(Set<ServiceDescriptor>)
33+
case methods(Set<MethodDescriptor>)
34+
}
35+
36+
private let wrapped: Wrapped
37+
38+
/// An operation subject specifying an interceptor that applies to all RPCs across all services will be registered with this client.
39+
public static var all: Self { .init(wrapped: .all) }
40+
41+
/// An operation subject specifying an interceptor that will be applied only to RPCs directed to the specified services.
42+
/// - Parameters:
43+
/// - services: The list of service names for which this interceptor should intercept RPCs.
44+
/// - Returns: A ``ClientInterceptorPipelineOperation``.
45+
public static func services(_ services: Set<ServiceDescriptor>) -> Self {
46+
Self(wrapped: .services(services))
47+
}
48+
49+
/// An operation subject specifying an interceptor that will be applied only to RPCs directed to the specified service methods.
50+
/// - Parameters:
51+
/// - methods: The list of method descriptors for which this interceptor should intercept RPCs.
52+
/// - Returns: A ``ClientInterceptorPipelineOperation``.
53+
public static func methods(_ methods: Set<MethodDescriptor>) -> Self {
54+
Self(wrapped: .methods(methods))
55+
}
56+
57+
@usableFromInline
58+
internal func applies(to descriptor: MethodDescriptor) -> Bool {
59+
switch self.wrapped {
60+
case .all:
61+
return true
62+
63+
case .services(let services):
64+
return services.map({ $0.fullyQualifiedService }).contains(descriptor.service)
65+
66+
case .methods(let methods):
67+
return methods.contains(descriptor)
68+
}
69+
}
70+
}
71+
72+
/// The interceptor specified for this operation.
73+
public let interceptor: any ClientInterceptor
74+
75+
@usableFromInline
76+
internal let subject: Subject
77+
78+
private init(interceptor: any ClientInterceptor, appliesTo: Subject) {
79+
self.interceptor = interceptor
80+
self.subject = appliesTo
81+
}
82+
83+
/// Create an operation, specifying which ``ClientInterceptor`` to apply and to which ``Subject``.
84+
/// - Parameters:
85+
/// - interceptor: The ``ClientInterceptor`` to register with the client.
86+
/// - subject: The ``Subject`` to which the `interceptor` applies.
87+
/// - Returns: A ``ClientInterceptorPipelineOperation``.
88+
public static func apply(_ interceptor: any ClientInterceptor, to subject: Subject) -> Self {
89+
Self(interceptor: interceptor, appliesTo: subject)
90+
}
91+
92+
/// Returns whether this ``ClientInterceptorPipelineOperation`` applies to the given `descriptor`.
93+
/// - Parameter descriptor: A ``MethodDescriptor`` for which to test whether this interceptor applies.
94+
/// - Returns: `true` if this interceptor applies to the given `descriptor`, or `false` otherwise.
95+
@inlinable
96+
internal func applies(to descriptor: MethodDescriptor) -> Bool {
97+
self.subject.applies(to: descriptor)
98+
}
99+
}

Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ struct ServerRPCExecutor {
2323
/// - stream: The accepted stream to execute the RPC on.
2424
/// - deserializer: A deserializer for messages received from the client.
2525
/// - serializer: A serializer for messages to send to the client.
26-
/// - interceptors: Server interceptors to apply to this RPC.
26+
/// - interceptors: Server interceptors to apply to this RPC. The
27+
/// interceptors will be called in the order of the array.
2728
/// - handler: A handler which turns the request into a response.
2829
@inlinable
2930
static func execute<Input, Output>(

Sources/GRPCCore/Call/Server/ServerInterceptorPipelineOperation.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
/// - requests directed only to specific services registered with your server; or
2222
/// - requests directed only to specific methods (of a specific service).
2323
///
24-
/// - SeeAlso: ``ServerInterceptor`` for more information on server interceptors.
24+
/// - SeeAlso: ``ServerInterceptor`` for more information on server interceptors, and
25+
/// ``ClientInterceptorPipelineOperation`` for the client-side version of this type.
2526
public struct ServerInterceptorPipelineOperation: Sendable {
2627
/// The subject of a ``ServerInterceptorPipelineOperation``.
2728
/// The subject of an interceptor can either be all services and methods, only specific services, or only specific methods.

Sources/GRPCCore/GRPCClient.swift

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -112,19 +112,12 @@ public final class GRPCClient: Sendable {
112112
/// The transport which provides a bidirectional communication channel with the server.
113113
private let transport: any ClientTransport
114114

115-
/// A collection of interceptors providing cross-cutting functionality to each accepted RPC.
116-
///
117-
/// The order in which interceptors are added reflects the order in which they are called. The
118-
/// first interceptor added will be the first interceptor to intercept each request. The last
119-
/// interceptor added will be the final interceptor to intercept each request before calling
120-
/// the appropriate handler.
121-
private let interceptors: [any ClientInterceptor]
122-
123115
/// The current state of the client.
124-
private let state: Mutex<State>
116+
private let stateMachine: Mutex<StateMachine>
125117

126118
/// The state of the client.
127119
private enum State: Sendable {
120+
128121
/// The client hasn't been started yet. Can transition to `running` or `stopped`.
129122
case notStarted
130123
/// The client is running and can send RPCs. Can transition to `stopping`.
@@ -187,22 +180,79 @@ public final class GRPCClient: Sendable {
187180
}
188181
}
189182

183+
private struct StateMachine {
184+
var state: State
185+
186+
private let interceptorPipeline: [ClientInterceptorPipelineOperation]
187+
188+
/// A collection of interceptors providing cross-cutting functionality to each accepted RPC, keyed by the method to which they apply.
189+
///
190+
/// The list of interceptors for each method is computed from `interceptorsPipeline` when calling a method for the first time.
191+
/// This caching is done to avoid having to compute the applicable interceptors for each request made.
192+
///
193+
/// The order in which interceptors are added reflects the order in which they are called. The
194+
/// first interceptor added will be the first interceptor to intercept each request. The last
195+
/// interceptor added will be the final interceptor to intercept each request before calling
196+
/// the appropriate handler.
197+
var interceptorsPerMethod: [MethodDescriptor: [any ClientInterceptor]]
198+
199+
init(interceptorPipeline: [ClientInterceptorPipelineOperation]) {
200+
self.state = .notStarted
201+
self.interceptorPipeline = interceptorPipeline
202+
self.interceptorsPerMethod = [:]
203+
}
204+
205+
mutating func checkExecutableAndGetApplicableInterceptors(
206+
for method: MethodDescriptor
207+
) throws -> [any ClientInterceptor] {
208+
try self.state.checkExecutable()
209+
210+
guard let applicableInterceptors = self.interceptorsPerMethod[method] else {
211+
let applicableInterceptors = self.interceptorPipeline
212+
.filter { $0.applies(to: method) }
213+
.map { $0.interceptor }
214+
self.interceptorsPerMethod[method] = applicableInterceptors
215+
return applicableInterceptors
216+
}
217+
218+
return applicableInterceptors
219+
}
220+
}
221+
190222
/// Creates a new client with the given transport, interceptors and configuration.
191223
///
192224
/// - Parameters:
193225
/// - transport: The transport used to establish a communication channel with a server.
194-
/// - interceptors: A collection of interceptors providing cross-cutting functionality to each
226+
/// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each
195227
/// accepted RPC. The order in which interceptors are added reflects the order in which they
196228
/// are called. The first interceptor added will be the first interceptor to intercept each
197229
/// request. The last interceptor added will be the final interceptor to intercept each
198230
/// request before calling the appropriate handler.
199-
public init(
231+
convenience public init(
200232
transport: some ClientTransport,
201233
interceptors: [any ClientInterceptor] = []
234+
) {
235+
self.init(
236+
transport: transport,
237+
interceptorPipeline: interceptors.map { .apply($0, to: .all) }
238+
)
239+
}
240+
241+
/// Creates a new client with the given transport, interceptors and configuration.
242+
///
243+
/// - Parameters:
244+
/// - transport: The transport used to establish a communication channel with a server.
245+
/// - interceptorPipeline: A collection of ``ClientInterceptorPipelineOperation`` providing cross-cutting
246+
/// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC.
247+
/// The order in which interceptors are added reflects the order in which they are called.
248+
/// The first interceptor added will be the first interceptor to intercept each request.
249+
/// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler.
250+
public init(
251+
transport: some ClientTransport,
252+
interceptorPipeline: [ClientInterceptorPipelineOperation]
202253
) {
203254
self.transport = transport
204-
self.interceptors = interceptors
205-
self.state = Mutex(.notStarted)
255+
self.stateMachine = Mutex(StateMachine(interceptorPipeline: interceptorPipeline))
206256
}
207257

208258
/// Start the client.
@@ -213,11 +263,11 @@ public final class GRPCClient: Sendable {
213263
/// The client, and by extension this function, can only be run once. If the client is already
214264
/// running or has already been closed then a ``RuntimeError`` is thrown.
215265
public func run() async throws {
216-
try self.state.withLock { try $0.run() }
266+
try self.stateMachine.withLock { try $0.state.run() }
217267

218268
// When this function exits the client must have stopped.
219269
defer {
220-
self.state.withLock { $0.stopped() }
270+
self.stateMachine.withLock { $0.state.stopped() }
221271
}
222272

223273
do {
@@ -237,7 +287,7 @@ public final class GRPCClient: Sendable {
237287
/// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
238288
/// executing ``run()`` if you want to abruptly stop in-flight RPCs.
239289
public func beginGracefulShutdown() {
240-
let wasRunning = self.state.withLock { $0.beginGracefulShutdown() }
290+
let wasRunning = self.stateMachine.withLock { $0.state.beginGracefulShutdown() }
241291
if wasRunning {
242292
self.transport.beginGracefulShutdown()
243293
}
@@ -356,7 +406,9 @@ public final class GRPCClient: Sendable {
356406
options: CallOptions,
357407
handler: @Sendable @escaping (StreamingClientResponse<Response>) async throws -> ReturnValue
358408
) async throws -> ReturnValue {
359-
try self.state.withLock { try $0.checkExecutable() }
409+
let applicableInterceptors = try self.stateMachine.withLock {
410+
try $0.checkExecutableAndGetApplicableInterceptors(for: descriptor)
411+
}
360412
let methodConfig = self.transport.config(forMethod: descriptor)
361413
var options = options
362414
options.formUnion(with: methodConfig)
@@ -368,7 +420,7 @@ public final class GRPCClient: Sendable {
368420
serializer: serializer,
369421
deserializer: deserializer,
370422
transport: self.transport,
371-
interceptors: self.interceptors,
423+
interceptors: applicableInterceptors,
372424
handler: handler
373425
)
374426
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Testing
18+
19+
@testable import GRPCCore
20+
21+
@Suite("ClientInterceptorPipelineOperation")
22+
struct ClientInterceptorPipelineOperationTests {
23+
@Test(
24+
"Applies to",
25+
arguments: [
26+
(
27+
.all,
28+
[.fooBar, .fooBaz, .barFoo, .barBaz],
29+
[]
30+
),
31+
(
32+
.services([ServiceDescriptor(package: "pkg", service: "foo")]),
33+
[.fooBar, .fooBaz],
34+
[.barFoo, .barBaz]
35+
),
36+
(
37+
.methods([.barFoo]),
38+
[.barFoo],
39+
[.fooBar, .fooBaz, .barBaz]
40+
),
41+
] as [(ClientInterceptorPipelineOperation.Subject, [MethodDescriptor], [MethodDescriptor])]
42+
)
43+
func appliesTo(
44+
operationSubject: ClientInterceptorPipelineOperation.Subject,
45+
applicableMethods: [MethodDescriptor],
46+
notApplicableMethods: [MethodDescriptor]
47+
) {
48+
let operation = ClientInterceptorPipelineOperation.apply(
49+
.requestCounter(.init()),
50+
to: operationSubject
51+
)
52+
53+
for applicableMethod in applicableMethods {
54+
#expect(operation.applies(to: applicableMethod))
55+
}
56+
57+
for notApplicableMethod in notApplicableMethods {
58+
#expect(!operation.applies(to: notApplicableMethod))
59+
}
60+
}
61+
}
62+
63+
extension MethodDescriptor {
64+
fileprivate static let fooBar = Self(service: "pkg.foo", method: "bar")
65+
fileprivate static let fooBaz = Self(service: "pkg.foo", method: "baz")
66+
fileprivate static let barFoo = Self(service: "pkg.bar", method: "foo")
67+
fileprivate static let barBaz = Self(service: "pkg.bar", method: "Baz")
68+
}

0 commit comments

Comments
 (0)