Skip to content

Commit 6a0401d

Browse files
committed
Create StateMachine for GRPCClient
1 parent 6765abb commit 6a0401d

File tree

1 file changed

+51
-80
lines changed

1 file changed

+51
-80
lines changed

Sources/GRPCCore/GRPCClient.swift

Lines changed: 51 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -112,53 +112,27 @@ public final class GRPCClient: Sendable {
112112
/// The transport which provides a bidirectional communication channel with the server.
113113
private let transport: any ClientTransport
114114

115-
private let interceptorPipeline: [ClientInterceptorPipelineOperation]
116-
117115
/// The current state of the client.
118-
private let state: Mutex<State>
116+
private let stateMachine: Mutex<StateMachine>
119117

120118
/// The state of the client.
121119
private enum State: Sendable {
122120

123121
/// The client hasn't been started yet. Can transition to `running` or `stopped`.
124-
case notStarted(
125-
/// A collection of interceptors providing cross-cutting functionality to each accepted RPC, keyed by the method to which they apply.
126-
///
127-
/// The list of interceptors for each method is computed from `interceptorsPipeline` when calling a method for the first time.
128-
/// This caching is done to avoid having to compute the applicable interceptors for each request made.
129-
///
130-
/// The order in which interceptors are added reflects the order in which they are called. The
131-
/// first interceptor added will be the first interceptor to intercept each request. The last
132-
/// interceptor added will be the final interceptor to intercept each request before calling
133-
/// the appropriate handler.
134-
interceptorsPerMethod: [MethodDescriptor: [any ClientInterceptor]]
135-
)
122+
case notStarted
136123
/// The client is running and can send RPCs. Can transition to `stopping`.
137-
case running(
138-
/// A collection of interceptors providing cross-cutting functionality to each accepted RPC, keyed by the method to which they apply.
139-
///
140-
/// The list of interceptors for each method is computed from `interceptorsPipeline` when calling a method for the first time.
141-
/// This caching is done to avoid having to compute the applicable interceptors for each request made.
142-
///
143-
/// The order in which interceptors are added reflects the order in which they are called. The
144-
/// first interceptor added will be the first interceptor to intercept each request. The last
145-
/// interceptor added will be the final interceptor to intercept each request before calling
146-
/// the appropriate handler.
147-
interceptorsPerMethod: [MethodDescriptor: [any ClientInterceptor]]
148-
)
124+
case running
149125
/// The client is stopping and no new RPCs will be sent. Existing RPCs may run to
150126
/// completion. May transition to `stopped`.
151127
case stopping
152128
/// The client has stopped, no RPCs are in flight and no more will be accepted. This state
153129
/// is terminal.
154130
case stopped
155-
/// Temporary state to avoid CoWs.
156-
case _modifying
157131

158132
mutating func run() throws {
159133
switch self {
160-
case .notStarted(let interceptorsPerMethod):
161-
self = .running(interceptorsPerMethod: interceptorsPerMethod)
134+
case .notStarted:
135+
self = .running
162136

163137
case .running:
164138
throw RuntimeError(
@@ -171,9 +145,6 @@ public final class GRPCClient: Sendable {
171145
code: .clientIsStopped,
172146
message: "The client has stopped and can only be started once."
173147
)
174-
175-
case ._modifying:
176-
fatalError("Internal inconsistency")
177148
}
178149
}
179150

@@ -191,8 +162,6 @@ public final class GRPCClient: Sendable {
191162
return true
192163
case .stopping, .stopped:
193164
return false
194-
case ._modifying:
195-
fatalError("Internal inconsistency")
196165
}
197166
}
198167

@@ -207,12 +176,49 @@ public final class GRPCClient: Sendable {
207176
code: .clientIsStopped,
208177
message: "Client has been stopped. Can't make any more RPCs."
209178
)
210-
case ._modifying:
211-
fatalError("Internal inconsistency")
212179
}
213180
}
214181
}
215182

183+
private struct StateMachine {
184+
var state: State
185+
186+
private let interceptorPipeline: [ClientInterceptorPipelineOperation]
187+
188+
/// A collection of interceptors providing cross-cutting functionality to each accepted RPC, keyed by the method to which they apply.
189+
///
190+
/// The list of interceptors for each method is computed from `interceptorsPipeline` when calling a method for the first time.
191+
/// This caching is done to avoid having to compute the applicable interceptors for each request made.
192+
///
193+
/// The order in which interceptors are added reflects the order in which they are called. The
194+
/// first interceptor added will be the first interceptor to intercept each request. The last
195+
/// interceptor added will be the final interceptor to intercept each request before calling
196+
/// the appropriate handler.
197+
var interceptorsPerMethod: [MethodDescriptor: [any ClientInterceptor]]
198+
199+
init(interceptorPipeline: [ClientInterceptorPipelineOperation]) {
200+
self.state = .notStarted
201+
self.interceptorPipeline = interceptorPipeline
202+
self.interceptorsPerMethod = [:]
203+
}
204+
205+
mutating func checkExecutableAndGetApplicableInterceptors(
206+
for method: MethodDescriptor
207+
) throws -> [any ClientInterceptor] {
208+
try self.state.checkExecutable()
209+
210+
guard let applicableInterceptors = self.interceptorsPerMethod[method] else {
211+
let applicableInterceptors = self.interceptorPipeline
212+
.filter { $0.applies(to: method) }
213+
.map { $0.interceptor }
214+
self.interceptorsPerMethod[method] = applicableInterceptors
215+
return applicableInterceptors
216+
}
217+
218+
return applicableInterceptors
219+
}
220+
}
221+
216222
/// Creates a new client with the given transport, interceptors and configuration.
217223
///
218224
/// - Parameters:
@@ -246,8 +252,7 @@ public final class GRPCClient: Sendable {
246252
interceptorPipeline: [ClientInterceptorPipelineOperation]
247253
) {
248254
self.transport = transport
249-
self.interceptorPipeline = interceptorPipeline
250-
self.state = Mutex(.notStarted(interceptorsPerMethod: [:]))
255+
self.stateMachine = Mutex(StateMachine(interceptorPipeline: interceptorPipeline))
251256
}
252257

253258
/// Start the client.
@@ -258,11 +263,11 @@ public final class GRPCClient: Sendable {
258263
/// The client, and by extension this function, can only be run once. If the client is already
259264
/// running or has already been closed then a ``RuntimeError`` is thrown.
260265
public func run() async throws {
261-
try self.state.withLock { try $0.run() }
266+
try self.stateMachine.withLock { try $0.state.run() }
262267

263268
// When this function exits the client must have stopped.
264269
defer {
265-
self.state.withLock { $0.stopped() }
270+
self.stateMachine.withLock { $0.state.stopped() }
266271
}
267272

268273
do {
@@ -282,7 +287,7 @@ public final class GRPCClient: Sendable {
282287
/// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task
283288
/// executing ``run()`` if you want to abruptly stop in-flight RPCs.
284289
public func beginGracefulShutdown() {
285-
let wasRunning = self.state.withLock { $0.beginGracefulShutdown() }
290+
let wasRunning = self.stateMachine.withLock { $0.state.beginGracefulShutdown() }
286291
if wasRunning {
287292
self.transport.beginGracefulShutdown()
288293
}
@@ -401,47 +406,13 @@ public final class GRPCClient: Sendable {
401406
options: CallOptions,
402407
handler: @Sendable @escaping (StreamingClientResponse<Response>) async throws -> ReturnValue
403408
) async throws -> ReturnValue {
404-
try self.state.withLock { try $0.checkExecutable() }
409+
let applicableInterceptors = try self.stateMachine.withLock {
410+
try $0.checkExecutableAndGetApplicableInterceptors(for: descriptor)
411+
}
405412
let methodConfig = self.transport.config(forMethod: descriptor)
406413
var options = options
407414
options.formUnion(with: methodConfig)
408415

409-
let applicableInterceptors = self.state.withLock {
410-
switch $0 {
411-
case .notStarted(var interceptorsPerMethod):
412-
if let interceptors = interceptorsPerMethod[descriptor] {
413-
return interceptors
414-
} else {
415-
$0 = ._modifying
416-
let interceptors = self.interceptorPipeline
417-
.filter { $0.applies(to: descriptor) }
418-
.map { $0.interceptor }
419-
interceptorsPerMethod[descriptor] = interceptors
420-
$0 = .notStarted(interceptorsPerMethod: interceptorsPerMethod)
421-
return interceptors
422-
}
423-
424-
case .running(var interceptorsPerMethod):
425-
if let interceptors = interceptorsPerMethod[descriptor] {
426-
return interceptors
427-
} else {
428-
$0 = ._modifying
429-
let interceptors = self.interceptorPipeline
430-
.filter { $0.applies(to: descriptor) }
431-
.map { $0.interceptor }
432-
interceptorsPerMethod[descriptor] = interceptors
433-
$0 = .running(interceptorsPerMethod: interceptorsPerMethod)
434-
return interceptors
435-
}
436-
437-
case .stopping, .stopped:
438-
fatalError("The checkExecutable call should have failed.")
439-
440-
case ._modifying:
441-
fatalError("Internal inconsistency")
442-
}
443-
}
444-
445416
return try await ClientRPCExecutor.execute(
446417
request: request,
447418
method: descriptor,

0 commit comments

Comments
 (0)