Skip to content

Commit b116c42

Browse files
authored
Add ability to client to run other workloads than the connection manager (#174)
* Add ability to client to run arbitrary workloads Signed-off-by: Adam Fowler <[email protected]> * Add some tests Signed-off-by: Adam Fowler <[email protected]> * Use ActionRunner in ValkeyClusterClient Signed-off-by: Adam Fowler <[email protected]> * Ditch ActionRunner protocol Signed-off-by: Adam Fowler <[email protected]> * Revert some symbol names Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]>
1 parent 3643b63 commit b116c42

File tree

2 files changed

+80
-53
lines changed

2 files changed

+80
-53
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 49 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ public final class ValkeyClusterClient: Sendable {
8585
case runClient(ValkeyNodeClient)
8686
case runTimer(ValkeyClusterTimer)
8787
}
88-
8988
private let actionStream: AsyncStream<RunAction>
9089
private let actionStreamContinuation: AsyncStream<RunAction>.Continuation
9190

@@ -109,9 +108,7 @@ public final class ValkeyClusterClient: Sendable {
109108
) {
110109
self.logger = logger
111110

112-
let (stream, continuation) = AsyncStream.makeStream(of: RunAction.self)
113-
self.actionStream = stream
114-
self.actionStreamContinuation = continuation
111+
(self.actionStream, self.actionStreamContinuation) = AsyncStream.makeStream(of: RunAction.self)
115112

116113
let factory = ValkeyNodeClientFactory(
117114
logger: logger,
@@ -220,12 +217,17 @@ public final class ValkeyClusterClient: Sendable {
220217
public func run() async {
221218
let circuitBreakerTimer = self.stateLock.withLock { $0.start() }
222219

223-
self.actionStreamContinuation.yield(.runTimer(circuitBreakerTimer))
224-
self.actionStreamContinuation.yield(.runClusterDiscovery(runNodeDiscovery: true))
220+
self.queueAction(.runTimer(circuitBreakerTimer))
221+
self.queueAction(.runClusterDiscovery(runNodeDiscovery: true))
225222

226223
await withTaskCancellationHandler {
227-
await withDiscardingTaskGroup { taskGroup in
228-
await self.runUsingTaskGroup(&taskGroup)
224+
/// Run discarding task group running actions
225+
await withDiscardingTaskGroup { group in
226+
for await action in self.actionStream {
227+
group.addTask {
228+
await self.runAction(action)
229+
}
230+
}
229231
}
230232
} onCancel: {
231233
_ = self.stateLock.withLock {
@@ -238,51 +240,47 @@ public final class ValkeyClusterClient: Sendable {
238240

239241
// MARK: - Private methods -
240242

243+
private func queueAction(_ action: RunAction) {
244+
self.actionStreamContinuation.yield(action)
245+
}
246+
241247
/// Manages the primary task group that handles all client operations.
242248
///
243249
/// - Parameter taskGroup: The task group to add tasks to.
244-
private func runUsingTaskGroup(_ taskGroup: inout DiscardingTaskGroup) async {
245-
for await action in self.actionStream {
246-
switch action {
247-
case .runClusterDiscovery(let runNodeDiscovery):
248-
taskGroup.addTask {
249-
await self.runClusterDiscovery(runNodeDiscoveryFirst: runNodeDiscovery)
250-
}
250+
private func runAction(_ action: RunAction) async {
251+
switch action {
252+
case .runClusterDiscovery(let runNodeDiscovery):
253+
await self.runClusterDiscovery(runNodeDiscoveryFirst: runNodeDiscovery)
251254

252-
case .runClient(let client):
253-
taskGroup.addTask {
254-
await client.run()
255-
}
255+
case .runClient(let client):
256+
await client.run()
256257

257-
case .runTimer(let timer):
258+
case .runTimer(let timer):
259+
await withTaskGroup(of: Void.self) { taskGroup in
258260
taskGroup.addTask {
259-
await withTaskGroup(of: Void.self) { taskGroup in
260-
taskGroup.addTask {
261-
do {
262-
try await self.clock.sleep(for: timer.duration)
263-
// timer has hit
264-
let timerFiredAction = self.stateLock.withLock {
265-
$0.timerFired(timer)
266-
}
267-
self.runTimerFiredAction(timerFiredAction)
268-
} catch {
269-
// do nothing
270-
}
271-
}
272-
273-
let (stream, continuation) = AsyncStream.makeStream(of: Void.self)
274-
taskGroup.addTask {
275-
var iterator = stream.makeAsyncIterator()
276-
await iterator.next()
261+
do {
262+
try await self.clock.sleep(for: timer.duration)
263+
// timer has hit
264+
let timerFiredAction = self.stateLock.withLock {
265+
$0.timerFired(timer)
277266
}
267+
self.runTimerFiredAction(timerFiredAction)
268+
} catch {
269+
// do nothing
270+
}
271+
}
278272

279-
let token = self.stateLock.withLock {
280-
$0.registerTimerCancellationToken(continuation, for: timer)
281-
}
273+
let (stream, continuation) = AsyncStream.makeStream(of: Void.self)
274+
taskGroup.addTask {
275+
var iterator = stream.makeAsyncIterator()
276+
await iterator.next()
277+
}
282278

283-
token?.finish()
284-
}
279+
let token = self.stateLock.withLock {
280+
$0.registerTimerCancellationToken(continuation, for: timer)
285281
}
282+
283+
token?.finish()
286284
}
287285
}
288286
}
@@ -300,7 +298,7 @@ public final class ValkeyClusterClient: Sendable {
300298
}
301299

302300
if let runDiscovery = action.runDiscovery {
303-
self.actionStreamContinuation.yield(.runClusterDiscovery(runNodeDiscovery: runDiscovery.runNodeDiscoveryFirst))
301+
self.queueAction(.runClusterDiscovery(runNodeDiscovery: runDiscovery.runNodeDiscoveryFirst))
304302
}
305303
}
306304

@@ -309,7 +307,7 @@ public final class ValkeyClusterClient: Sendable {
309307
/// - Parameter action: The update action containing clients to run and shut down.
310308
private func runUpdateValkeyNodesAction(_ action: StateMachine.UpdateValkeyNodesAction) {
311309
for client in action.clientsToRun {
312-
self.actionStreamContinuation.yield(.runClient(client))
310+
self.queueAction(.runClient(client))
313311
}
314312

315313
for client in action.clientsToShutdown {
@@ -328,11 +326,11 @@ public final class ValkeyClusterClient: Sendable {
328326
action.cancelTimer?.yield()
329327

330328
if let newTimer = action.createTimer {
331-
self.actionStreamContinuation.yield(.runTimer(newTimer))
329+
self.queueAction(.runTimer(newTimer))
332330
}
333331

334332
for client in action.clientsToRun {
335-
self.actionStreamContinuation.yield(.runClient(client))
333+
self.queueAction(.runClient(client))
336334
}
337335

338336
for client in action.clientsToShutdown {
@@ -345,11 +343,11 @@ public final class ValkeyClusterClient: Sendable {
345343
/// - Parameter action: The action containing operations to perform after failed discovery.
346344
private func runClusterDiscoveryFailedAction(_ action: StateMachine.ClusterDiscoveryFailedAction) {
347345
if let retryTimer = action.retryTimer {
348-
self.actionStreamContinuation.yield(.runTimer(retryTimer))
346+
self.queueAction(.runTimer(retryTimer))
349347
}
350348

351349
if let circuitBreakerTimer = action.circuitBreakerTimer {
352-
self.actionStreamContinuation.yield(.runTimer(circuitBreakerTimer))
350+
self.queueAction(.runTimer(circuitBreakerTimer))
353351
}
354352
}
355353

@@ -489,10 +487,10 @@ public final class ValkeyClusterClient: Sendable {
489487
private func runMovedToDegraded(_ action: StateMachine.PoolForMovedErrorAction.MoveToDegraded) {
490488
if let cancelToken = action.runDiscoveryAndCancelTimer {
491489
cancelToken.yield()
492-
self.actionStreamContinuation.yield(.runClusterDiscovery(runNodeDiscovery: false))
490+
self.queueAction(.runClusterDiscovery(runNodeDiscovery: false))
493491
}
494492

495-
self.actionStreamContinuation.yield(.runTimer(action.circuitBreakerTimer))
493+
self.queueAction(.runTimer(action.circuitBreakerTimer))
496494
}
497495

498496
/// Runs the cluster discovery process to determine the current cluster topology.

Sources/Valkey/ValkeyClient.swift

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ public final class ValkeyClient: Sendable {
4242
/// running atomic
4343
let runningAtomic: Atomic<Bool>
4444

45+
private enum RunAction: Sendable {
46+
case runNodeClient(ValkeyNodeClient)
47+
}
48+
private let actionStream: AsyncStream<RunAction>
49+
private let actionStreamContinuation: AsyncStream<RunAction>.Continuation
50+
4551
/// Creates a new Valkey client
4652
///
4753
/// - Parameters:
@@ -84,6 +90,8 @@ public final class ValkeyClient: Sendable {
8490
self.logger = logger
8591
self.runningAtomic = .init(false)
8692
self.node = self.nodeClientFactory.makeConnectionPool(serverAddress: address)
93+
(self.actionStream, self.actionStreamContinuation) = AsyncStream.makeStream(of: RunAction.self)
94+
self.queueAction(.runNodeClient(self.node))
8795
}
8896
}
8997

@@ -95,10 +103,17 @@ extension ValkeyClient {
95103
precondition(!atomicOp.original, "ValkeyClient.run() should just be called once!")
96104
#if ServiceLifecycleSupport
97105
await cancelWhenGracefulShutdown {
98-
await self.node.run()
106+
/// Run discarding task group running actions
107+
await withDiscardingTaskGroup { group in
108+
for await action in self.actionStream {
109+
group.addTask {
110+
await self.runAction(action)
111+
}
112+
}
113+
}
99114
}
100115
#else
101-
await self.node.run()
116+
await self.runActionTaskGroup()
102117
#endif
103118
}
104119

@@ -117,6 +132,20 @@ extension ValkeyClient {
117132
}
118133
}
119134

135+
@available(valkeySwift 1.0, *)
136+
extension ValkeyClient {
137+
private func queueAction(_ action: RunAction) {
138+
self.actionStreamContinuation.yield(action)
139+
}
140+
141+
private func runAction(_ action: RunAction) async {
142+
switch action {
143+
case .runNodeClient(let nodeClient):
144+
await nodeClient.run()
145+
}
146+
}
147+
}
148+
120149
/// Extend ValkeyClient so we can call commands directly from it
121150
@available(valkeySwift 1.0, *)
122151
extension ValkeyClient: ValkeyClientProtocol {

0 commit comments

Comments
 (0)