Skip to content

Commit 3643b63

Browse files
authored
Breaking out running client code from the ValkeyClusterClient StateMachine (#178)
1 parent ba55436 commit 3643b63

File tree

2 files changed

+120
-71
lines changed

2 files changed

+120
-71
lines changed

Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift

Lines changed: 9 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ package struct ValkeyClusterClientStateMachine<
163163
@usableFromInline
164164
/* private */ var clusterState: ClusterState
165165
@usableFromInline
166-
/* private */ var runningClients: [ValkeyNodeID: NodeBundle] = [:]
166+
/* private */ var runningClients: ValkeyRunningClientsStateMachine<ConnectionPool, ConnectionPoolFactory>
167167
@usableFromInline
168168
/* private */ var configuration: ValkeyClusterClientStateMachineConfiguration
169169

@@ -182,7 +182,7 @@ package struct ValkeyClusterClientStateMachine<
182182
)
183183
)
184184
self.refreshState = .notRefreshing
185-
self.runningClients = [:]
185+
self.runningClients = .init(poolFactory: poolFactory)
186186
self.configuration = configuration
187187
self.clock = clock
188188
self.poolFactory = poolFactory
@@ -231,8 +231,8 @@ package struct ValkeyClusterClientStateMachine<
231231
case .refreshing:
232232
switch self.clusterState {
233233
case .unavailable, .degraded, .healthy:
234-
let action = self.updateNodes(newNodes, removeUnmentionedPools: false)
235-
let voters = self.allNodeClients().map { ValkeyClusterVoter(client: $0.pool, nodeID: $0.nodeID) }
234+
let action = self.runningClients.updateNodes(newNodes, removeUnmentionedPools: false)
235+
let voters = self.runningClients.clients.map { ValkeyClusterVoter(client: $0.pool, nodeID: $0.nodeID) }
236236
return .init(
237237
clientsToRun: action.poolsToRun.map(\.0),
238238
clientsToShutdown: action.poolsToShutdown,
@@ -246,7 +246,7 @@ package struct ValkeyClusterClientStateMachine<
246246
}
247247

248248
package func getInitialVoters() -> [ValkeyClusterVoter<ConnectionPool>] {
249-
self.allNodeClients().map { ValkeyClusterVoter(client: $0.pool, nodeID: $0.nodeID) }
249+
self.runningClients.clients.map { ValkeyClusterVoter(client: $0.pool, nodeID: $0.nodeID) }
250250
}
251251

252252
package struct ClusterDiscoverySucceededAction {
@@ -278,7 +278,7 @@ package struct ValkeyClusterClientStateMachine<
278278
self.refreshState = .waitingForRefresh(.init(id: refreshTimerID), previousRefresh: .init(consecutiveFailures: 0))
279279

280280
let newShards = description.shards
281-
let poolUpdate = self.updateNodes(
281+
let poolUpdate = self.runningClients.updateNodes(
282282
newShards.lazy.flatMap { $0.nodes.lazy.map { ValkeyNodeDescription(description: $0) } },
283283
removeUnmentionedPools: true
284284
)
@@ -694,7 +694,7 @@ package struct ValkeyClusterClientStateMachine<
694694

695695
case .refreshing:
696696
let newShards = description.shards
697-
let poolActions = self.updateNodes(
697+
let poolActions = self.runningClients.updateNodes(
698698
newShards.lazy.flatMap { $0.nodes.lazy.map { ValkeyNodeDescription(description: $0) } },
699699
removeUnmentionedPools: false
700700
)
@@ -719,80 +719,18 @@ package struct ValkeyClusterClientStateMachine<
719719
case .unavailable, .degraded, .healthy:
720720
self.clusterState = .shutdown
721721
let existingNodes = self.runningClients
722-
self.runningClients.removeAll(keepingCapacity: false)
723-
return existingNodes.values.lazy.map { $0.pool }
722+
self.runningClients.removeAll()
723+
return existingNodes.clients.lazy.map { $0.pool }
724724

725725
case .shutdown:
726726
return []
727727
}
728728
}
729729

730-
private struct PoolUpdateAction {
731-
var poolsToShutdown: [ConnectionPool]
732-
var poolsToRun: [(ConnectionPool, ValkeyNodeID)]
733-
734-
static func empty() -> PoolUpdateAction { PoolUpdateAction(poolsToShutdown: [], poolsToRun: []) }
735-
}
736-
737-
private mutating func updateNodes(
738-
_ newNodes: some Collection<ValkeyNodeDescription>,
739-
removeUnmentionedPools: Bool
740-
) -> PoolUpdateAction {
741-
var previousNodes = self.runningClients
742-
self.runningClients.removeAll(keepingCapacity: true)
743-
var newPools = [(ConnectionPool, ValkeyNodeID)]()
744-
newPools.reserveCapacity(16)
745-
var poolsToShutdown = [ConnectionPool]()
746-
747-
for newNodeDescription in newNodes {
748-
// if we had a pool previously, let's continue to use it!
749-
if let existingPool = previousNodes.removeValue(forKey: newNodeDescription.id) {
750-
if newNodeDescription == existingPool.nodeDescription {
751-
// the existing pool matches the new node description. nothing todo
752-
self.runningClients[newNodeDescription.id] = existingPool
753-
} else {
754-
// the existing pool does not match new node description. For example tls may now be required.
755-
// shutdown the old pool and create a new one
756-
poolsToShutdown.append(existingPool.pool)
757-
let newPool = self.makePool(for: newNodeDescription)
758-
self.runningClients[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription)
759-
newPools.append((newPool, newNodeDescription.id))
760-
}
761-
} else {
762-
let newPool = self.makePool(for: newNodeDescription)
763-
self.runningClients[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription)
764-
newPools.append((newPool, newNodeDescription.id))
765-
}
766-
}
767-
768-
if removeUnmentionedPools {
769-
poolsToShutdown.append(contentsOf: previousNodes.values.lazy.map { $0.pool })
770-
771-
return PoolUpdateAction(
772-
poolsToShutdown: poolsToShutdown,
773-
poolsToRun: newPools
774-
)
775-
}
776-
777-
// re-add pools that were not part of the node list.
778-
for (nodeID, poolDescription) in previousNodes {
779-
self.runningClients[nodeID] = poolDescription
780-
}
781-
782-
return PoolUpdateAction(
783-
poolsToShutdown: poolsToShutdown,
784-
poolsToRun: newPools
785-
)
786-
}
787-
788730
private func makePool(for description: ValkeyNodeDescription) -> ConnectionPool {
789731
self.poolFactory.makeConnectionPool(nodeDescription: description)
790732
}
791733

792-
private func allNodeClients() -> some Collection<NodeBundle> {
793-
self.runningClients.values
794-
}
795-
796734
private mutating func nextTimerID() -> Int {
797735
defer { self._nextTimerID += 1 }
798736
return self._nextTimerID
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the valkey-swift project
4+
//
5+
// Copyright (c) 2025 the valkey-swift authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See valkey-swift/CONTRIBUTORS.txt for the list of valkey-swift authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@usableFromInline
16+
struct ValkeyRunningClientsStateMachine<
17+
ConnectionPool: Sendable,
18+
ConnectionPoolFactory: ValkeyNodeConnectionPoolFactory
19+
> where ConnectionPoolFactory.ConnectionPool == ConnectionPool {
20+
@usableFromInline
21+
/* private */ struct NodeBundle: Sendable {
22+
@usableFromInline
23+
var nodeID: ValkeyNodeID { self.nodeDescription.id }
24+
@usableFromInline
25+
var pool: ConnectionPool
26+
@usableFromInline
27+
var nodeDescription: ValkeyNodeDescription
28+
}
29+
let poolFactory: ConnectionPoolFactory
30+
@usableFromInline
31+
var clientMap: [ValkeyNodeID: NodeBundle]
32+
@inlinable
33+
var clients: some Collection<NodeBundle> { clientMap.values }
34+
35+
init(poolFactory: ConnectionPoolFactory) {
36+
self.poolFactory = poolFactory
37+
self.clientMap = [:]
38+
}
39+
40+
struct PoolUpdateAction {
41+
var poolsToShutdown: [ConnectionPool]
42+
var poolsToRun: [(ConnectionPool, ValkeyNodeID)]
43+
44+
static func empty() -> PoolUpdateAction { PoolUpdateAction(poolsToShutdown: [], poolsToRun: []) }
45+
}
46+
47+
mutating func updateNodes(
48+
_ newNodes: some Collection<ValkeyNodeDescription>,
49+
removeUnmentionedPools: Bool
50+
) -> PoolUpdateAction {
51+
var previousNodes = self.clientMap
52+
self.clientMap.removeAll(keepingCapacity: true)
53+
var newPools = [(ConnectionPool, ValkeyNodeID)]()
54+
newPools.reserveCapacity(16)
55+
var poolsToShutdown = [ConnectionPool]()
56+
57+
for newNodeDescription in newNodes {
58+
// if we had a pool previously, let's continue to use it!
59+
if let existingPool = previousNodes.removeValue(forKey: newNodeDescription.id) {
60+
if newNodeDescription == existingPool.nodeDescription {
61+
// the existing pool matches the new node description. nothing todo
62+
self.clientMap[newNodeDescription.id] = existingPool
63+
} else {
64+
// the existing pool does not match new node description. For example tls may now be required.
65+
// shutdown the old pool and create a new one
66+
poolsToShutdown.append(existingPool.pool)
67+
let newPool = self.makePool(for: newNodeDescription)
68+
self.clientMap[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription)
69+
newPools.append((newPool, newNodeDescription.id))
70+
}
71+
} else {
72+
let newPool = self.makePool(for: newNodeDescription)
73+
self.clientMap[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription)
74+
newPools.append((newPool, newNodeDescription.id))
75+
}
76+
}
77+
78+
if removeUnmentionedPools {
79+
poolsToShutdown.append(contentsOf: previousNodes.values.lazy.map { $0.pool })
80+
81+
return PoolUpdateAction(
82+
poolsToShutdown: poolsToShutdown,
83+
poolsToRun: newPools
84+
)
85+
}
86+
87+
// re-add pools that were not part of the node list.
88+
for (nodeID, poolDescription) in previousNodes {
89+
self.clientMap[nodeID] = poolDescription
90+
}
91+
92+
return PoolUpdateAction(
93+
poolsToShutdown: poolsToShutdown,
94+
poolsToRun: newPools
95+
)
96+
}
97+
98+
@inlinable
99+
subscript(_ index: ValkeyNodeID) -> NodeBundle? {
100+
self.clientMap[index]
101+
}
102+
103+
@usableFromInline
104+
mutating func removeAll() {
105+
self.clientMap.removeAll(keepingCapacity: false)
106+
}
107+
108+
func makePool(for description: ValkeyNodeDescription) -> ConnectionPool {
109+
self.poolFactory.makeConnectionPool(nodeDescription: description)
110+
}
111+
}

0 commit comments

Comments
 (0)