Skip to content

Breaking out running client code from the ValkeyClusterClient StateMachine #178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 9 additions & 71 deletions Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ package struct ValkeyClusterClientStateMachine<
@usableFromInline
/* private */ var clusterState: ClusterState
@usableFromInline
/* private */ var runningClients: [ValkeyNodeID: NodeBundle] = [:]
/* private */ var runningClients: ValkeyRunningClients<ConnectionPool, ConnectionPoolFactory>
@usableFromInline
/* private */ var configuration: ValkeyClusterClientStateMachineConfiguration

Expand All @@ -182,7 +182,7 @@ package struct ValkeyClusterClientStateMachine<
)
)
self.refreshState = .notRefreshing
self.runningClients = [:]
self.runningClients = .init(poolFactory: poolFactory)
self.configuration = configuration
self.clock = clock
self.poolFactory = poolFactory
Expand Down Expand Up @@ -231,8 +231,8 @@ package struct ValkeyClusterClientStateMachine<
case .refreshing:
switch self.clusterState {
case .unavailable, .degraded, .healthy:
let action = self.updateNodes(newNodes, removeUnmentionedPools: false)
let voters = self.allNodeClients().map { ValkeyClusterVoter(client: $0.pool, nodeID: $0.nodeID) }
let action = self.runningClients.updateNodes(newNodes, removeUnmentionedPools: false)
let voters = self.runningClients.clients.map { ValkeyClusterVoter(client: $0.pool, nodeID: $0.nodeID) }
return .init(
clientsToRun: action.poolsToRun.map(\.0),
clientsToShutdown: action.poolsToShutdown,
Expand All @@ -246,7 +246,7 @@ package struct ValkeyClusterClientStateMachine<
}

package func getInitialVoters() -> [ValkeyClusterVoter<ConnectionPool>] {
self.allNodeClients().map { ValkeyClusterVoter(client: $0.pool, nodeID: $0.nodeID) }
self.runningClients.clients.map { ValkeyClusterVoter(client: $0.pool, nodeID: $0.nodeID) }
}

package struct ClusterDiscoverySucceededAction {
Expand Down Expand Up @@ -278,7 +278,7 @@ package struct ValkeyClusterClientStateMachine<
self.refreshState = .waitingForRefresh(.init(id: refreshTimerID), previousRefresh: .init(consecutiveFailures: 0))

let newShards = description.shards
let poolUpdate = self.updateNodes(
let poolUpdate = self.runningClients.updateNodes(
newShards.lazy.flatMap { $0.nodes.lazy.map { ValkeyNodeDescription(description: $0) } },
removeUnmentionedPools: true
)
Expand Down Expand Up @@ -694,7 +694,7 @@ package struct ValkeyClusterClientStateMachine<

case .refreshing:
let newShards = description.shards
let poolActions = self.updateNodes(
let poolActions = self.runningClients.updateNodes(
newShards.lazy.flatMap { $0.nodes.lazy.map { ValkeyNodeDescription(description: $0) } },
removeUnmentionedPools: false
)
Expand All @@ -719,80 +719,18 @@ package struct ValkeyClusterClientStateMachine<
case .unavailable, .degraded, .healthy:
self.clusterState = .shutdown
let existingNodes = self.runningClients
self.runningClients.removeAll(keepingCapacity: false)
return existingNodes.values.lazy.map { $0.pool }
self.runningClients.removeAll()
return existingNodes.clients.lazy.map { $0.pool }

case .shutdown:
return []
}
}

private struct PoolUpdateAction {
var poolsToShutdown: [ConnectionPool]
var poolsToRun: [(ConnectionPool, ValkeyNodeID)]

static func empty() -> PoolUpdateAction { PoolUpdateAction(poolsToShutdown: [], poolsToRun: []) }
}

private mutating func updateNodes(
_ newNodes: some Collection<ValkeyNodeDescription>,
removeUnmentionedPools: Bool
) -> PoolUpdateAction {
var previousNodes = self.runningClients
self.runningClients.removeAll(keepingCapacity: true)
var newPools = [(ConnectionPool, ValkeyNodeID)]()
newPools.reserveCapacity(16)
var poolsToShutdown = [ConnectionPool]()

for newNodeDescription in newNodes {
// if we had a pool previously, let's continue to use it!
if let existingPool = previousNodes.removeValue(forKey: newNodeDescription.id) {
if newNodeDescription == existingPool.nodeDescription {
// the existing pool matches the new node description. nothing todo
self.runningClients[newNodeDescription.id] = existingPool
} else {
// the existing pool does not match new node description. For example tls may now be required.
// shutdown the old pool and create a new one
poolsToShutdown.append(existingPool.pool)
let newPool = self.makePool(for: newNodeDescription)
self.runningClients[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription)
newPools.append((newPool, newNodeDescription.id))
}
} else {
let newPool = self.makePool(for: newNodeDescription)
self.runningClients[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription)
newPools.append((newPool, newNodeDescription.id))
}
}

if removeUnmentionedPools {
poolsToShutdown.append(contentsOf: previousNodes.values.lazy.map { $0.pool })

return PoolUpdateAction(
poolsToShutdown: poolsToShutdown,
poolsToRun: newPools
)
}

// re-add pools that were not part of the node list.
for (nodeID, poolDescription) in previousNodes {
self.runningClients[nodeID] = poolDescription
}

return PoolUpdateAction(
poolsToShutdown: poolsToShutdown,
poolsToRun: newPools
)
}

private func makePool(for description: ValkeyNodeDescription) -> ConnectionPool {
self.poolFactory.makeConnectionPool(nodeDescription: description)
}

private func allNodeClients() -> some Collection<NodeBundle> {
self.runningClients.values
}

private mutating func nextTimerID() -> Int {
defer { self._nextTimerID += 1 }
return self._nextTimerID
Expand Down
111 changes: 111 additions & 0 deletions Sources/Valkey/Node/ValkeyRunningClients.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the valkey-swift project
//
// Copyright (c) 2025 the valkey-swift authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See valkey-swift/CONTRIBUTORS.txt for the list of valkey-swift authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@usableFromInline
struct ValkeyRunningClients<
ConnectionPool: Sendable,
ConnectionPoolFactory: ValkeyNodeConnectionPoolFactory
> where ConnectionPoolFactory.ConnectionPool == ConnectionPool {
@usableFromInline
/* private */ struct NodeBundle: Sendable {
@usableFromInline
var nodeID: ValkeyNodeID { self.nodeDescription.id }
@usableFromInline
var pool: ConnectionPool
@usableFromInline
var nodeDescription: ValkeyNodeDescription
}
let poolFactory: ConnectionPoolFactory
@usableFromInline
var clientMap: [ValkeyNodeID: NodeBundle]
@inlinable
var clients: some Collection<NodeBundle> { clientMap.values }

init(poolFactory: ConnectionPoolFactory) {
self.poolFactory = poolFactory
self.clientMap = [:]
}

struct PoolUpdateAction {
var poolsToShutdown: [ConnectionPool]
var poolsToRun: [(ConnectionPool, ValkeyNodeID)]

static func empty() -> PoolUpdateAction { PoolUpdateAction(poolsToShutdown: [], poolsToRun: []) }
}

mutating func updateNodes(
_ newNodes: some Collection<ValkeyNodeDescription>,
removeUnmentionedPools: Bool
) -> PoolUpdateAction {
var previousNodes = self.clientMap
self.clientMap.removeAll(keepingCapacity: true)
var newPools = [(ConnectionPool, ValkeyNodeID)]()
newPools.reserveCapacity(16)
var poolsToShutdown = [ConnectionPool]()

for newNodeDescription in newNodes {
// if we had a pool previously, let's continue to use it!
if let existingPool = previousNodes.removeValue(forKey: newNodeDescription.id) {
if newNodeDescription == existingPool.nodeDescription {
// the existing pool matches the new node description. nothing todo
self.clientMap[newNodeDescription.id] = existingPool
} else {
// the existing pool does not match new node description. For example tls may now be required.
// shutdown the old pool and create a new one
poolsToShutdown.append(existingPool.pool)
let newPool = self.makePool(for: newNodeDescription)
self.clientMap[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription)
newPools.append((newPool, newNodeDescription.id))
}
} else {
let newPool = self.makePool(for: newNodeDescription)
self.clientMap[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription)
newPools.append((newPool, newNodeDescription.id))
}
}

if removeUnmentionedPools {
poolsToShutdown.append(contentsOf: previousNodes.values.lazy.map { $0.pool })

return PoolUpdateAction(
poolsToShutdown: poolsToShutdown,
poolsToRun: newPools
)
}

// re-add pools that were not part of the node list.
for (nodeID, poolDescription) in previousNodes {
self.clientMap[nodeID] = poolDescription
}

return PoolUpdateAction(
poolsToShutdown: poolsToShutdown,
poolsToRun: newPools
)
}

@inlinable
subscript(_ index: ValkeyNodeID) -> NodeBundle? {
self.clientMap[index]
}

@usableFromInline
mutating func removeAll() {
self.clientMap.removeAll(keepingCapacity: false)
}

func makePool(for description: ValkeyNodeDescription) -> ConnectionPool {
self.poolFactory.makeConnectionPool(nodeDescription: description)
}
}