@@ -112,19 +112,12 @@ public final class GRPCClient: Sendable {
112
112
/// The transport which provides a bidirectional communication channel with the server.
113
113
private let transport : any ClientTransport
114
114
115
- /// A collection of interceptors providing cross-cutting functionality to each accepted RPC.
116
- ///
117
- /// The order in which interceptors are added reflects the order in which they are called. The
118
- /// first interceptor added will be the first interceptor to intercept each request. The last
119
- /// interceptor added will be the final interceptor to intercept each request before calling
120
- /// the appropriate handler.
121
- private let interceptors : [ any ClientInterceptor ]
122
-
123
115
/// The current state of the client.
124
- private let state : Mutex < State >
116
+ private let stateMachine : Mutex < StateMachine >
125
117
126
118
/// The state of the client.
127
119
private enum State : Sendable {
120
+
128
121
/// The client hasn't been started yet. Can transition to `running` or `stopped`.
129
122
case notStarted
130
123
/// The client is running and can send RPCs. Can transition to `stopping`.
@@ -187,22 +180,79 @@ public final class GRPCClient: Sendable {
187
180
}
188
181
}
189
182
183
+ private struct StateMachine {
184
+ var state : State
185
+
186
+ private let interceptorPipeline : [ ClientInterceptorPipelineOperation ]
187
+
188
+ /// A collection of interceptors providing cross-cutting functionality to each accepted RPC, keyed by the method to which they apply.
189
+ ///
190
+ /// The list of interceptors for each method is computed from `interceptorsPipeline` when calling a method for the first time.
191
+ /// This caching is done to avoid having to compute the applicable interceptors for each request made.
192
+ ///
193
+ /// The order in which interceptors are added reflects the order in which they are called. The
194
+ /// first interceptor added will be the first interceptor to intercept each request. The last
195
+ /// interceptor added will be the final interceptor to intercept each request before calling
196
+ /// the appropriate handler.
197
+ var interceptorsPerMethod : [ MethodDescriptor : [ any ClientInterceptor ] ]
198
+
199
+ init ( interceptorPipeline: [ ClientInterceptorPipelineOperation ] ) {
200
+ self . state = . notStarted
201
+ self . interceptorPipeline = interceptorPipeline
202
+ self . interceptorsPerMethod = [ : ]
203
+ }
204
+
205
+ mutating func checkExecutableAndGetApplicableInterceptors(
206
+ for method: MethodDescriptor
207
+ ) throws -> [ any ClientInterceptor ] {
208
+ try self . state. checkExecutable ( )
209
+
210
+ guard let applicableInterceptors = self . interceptorsPerMethod [ method] else {
211
+ let applicableInterceptors = self . interceptorPipeline
212
+ . filter { $0. applies ( to: method) }
213
+ . map { $0. interceptor }
214
+ self . interceptorsPerMethod [ method] = applicableInterceptors
215
+ return applicableInterceptors
216
+ }
217
+
218
+ return applicableInterceptors
219
+ }
220
+ }
221
+
190
222
/// Creates a new client with the given transport, interceptors and configuration.
191
223
///
192
224
/// - Parameters:
193
225
/// - transport: The transport used to establish a communication channel with a server.
194
- /// - interceptors: A collection of interceptors providing cross-cutting functionality to each
226
+ /// - interceptors: A collection of ``ClientInterceptor``s providing cross-cutting functionality to each
195
227
/// accepted RPC. The order in which interceptors are added reflects the order in which they
196
228
/// are called. The first interceptor added will be the first interceptor to intercept each
197
229
/// request. The last interceptor added will be the final interceptor to intercept each
198
230
/// request before calling the appropriate handler.
199
- public init (
231
+ convenience public init (
200
232
transport: some ClientTransport ,
201
233
interceptors: [ any ClientInterceptor ] = [ ]
234
+ ) {
235
+ self . init (
236
+ transport: transport,
237
+ interceptorPipeline: interceptors. map { . apply( $0, to: . all) }
238
+ )
239
+ }
240
+
241
+ /// Creates a new client with the given transport, interceptors and configuration.
242
+ ///
243
+ /// - Parameters:
244
+ /// - transport: The transport used to establish a communication channel with a server.
245
+ /// - interceptorPipeline: A collection of ``ClientInterceptorPipelineOperation`` providing cross-cutting
246
+ /// functionality to each accepted RPC. Only applicable interceptors from the pipeline will be applied to each RPC.
247
+ /// The order in which interceptors are added reflects the order in which they are called.
248
+ /// The first interceptor added will be the first interceptor to intercept each request.
249
+ /// The last interceptor added will be the final interceptor to intercept each request before calling the appropriate handler.
250
+ public init (
251
+ transport: some ClientTransport ,
252
+ interceptorPipeline: [ ClientInterceptorPipelineOperation ]
202
253
) {
203
254
self . transport = transport
204
- self . interceptors = interceptors
205
- self . state = Mutex ( . notStarted)
255
+ self . stateMachine = Mutex ( StateMachine ( interceptorPipeline: interceptorPipeline) )
206
256
}
207
257
208
258
/// Start the client.
@@ -213,11 +263,11 @@ public final class GRPCClient: Sendable {
213
263
/// The client, and by extension this function, can only be run once. If the client is already
214
264
/// running or has already been closed then a ``RuntimeError`` is thrown.
215
265
public func run( ) async throws {
216
- try self . state . withLock { try $0. run ( ) }
266
+ try self . stateMachine . withLock { try $0. state . run ( ) }
217
267
218
268
// When this function exits the client must have stopped.
219
269
defer {
220
- self . state . withLock { $0. stopped ( ) }
270
+ self . stateMachine . withLock { $0. state . stopped ( ) }
221
271
}
222
272
223
273
do {
@@ -237,7 +287,7 @@ public final class GRPCClient: Sendable {
237
287
/// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
238
288
/// executing ``run()`` if you want to abruptly stop in-flight RPCs.
239
289
public func beginGracefulShutdown( ) {
240
- let wasRunning = self . state . withLock { $0. beginGracefulShutdown ( ) }
290
+ let wasRunning = self . stateMachine . withLock { $0. state . beginGracefulShutdown ( ) }
241
291
if wasRunning {
242
292
self . transport. beginGracefulShutdown ( )
243
293
}
@@ -356,7 +406,9 @@ public final class GRPCClient: Sendable {
356
406
options: CallOptions ,
357
407
handler: @Sendable @escaping ( StreamingClientResponse < Response > ) async throws -> ReturnValue
358
408
) async throws -> ReturnValue {
359
- try self . state. withLock { try $0. checkExecutable ( ) }
409
+ let applicableInterceptors = try self . stateMachine. withLock {
410
+ try $0. checkExecutableAndGetApplicableInterceptors ( for: descriptor)
411
+ }
360
412
let methodConfig = self . transport. config ( forMethod: descriptor)
361
413
var options = options
362
414
options. formUnion ( with: methodConfig)
@@ -368,7 +420,7 @@ public final class GRPCClient: Sendable {
368
420
serializer: serializer,
369
421
deserializer: deserializer,
370
422
transport: self . transport,
371
- interceptors: self . interceptors ,
423
+ interceptors: applicableInterceptors ,
372
424
handler: handler
373
425
)
374
426
}
0 commit comments