diff --git a/Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift b/Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift index 75698c78..bec94acc 100644 --- a/Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift +++ b/Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift @@ -163,7 +163,7 @@ package struct ValkeyClusterClientStateMachine< @usableFromInline /* private */ var clusterState: ClusterState @usableFromInline - /* private */ var runningClients: [ValkeyNodeID: NodeBundle] = [:] + /* private */ var runningClients: ValkeyRunningClients @usableFromInline /* private */ var configuration: ValkeyClusterClientStateMachineConfiguration @@ -182,7 +182,7 @@ package struct ValkeyClusterClientStateMachine< ) ) self.refreshState = .notRefreshing - self.runningClients = [:] + self.runningClients = .init(poolFactory: poolFactory) self.configuration = configuration self.clock = clock self.poolFactory = poolFactory @@ -231,8 +231,8 @@ package struct ValkeyClusterClientStateMachine< case .refreshing: switch self.clusterState { case .unavailable, .degraded, .healthy: - let action = self.updateNodes(newNodes, removeUnmentionedPools: false) - let voters = self.allNodeClients().map { ValkeyClusterVoter(client: $0.pool, nodeID: $0.nodeID) } + let action = self.runningClients.updateNodes(newNodes, removeUnmentionedPools: false) + let voters = self.runningClients.clients.map { ValkeyClusterVoter(client: $0.pool, nodeID: $0.nodeID) } return .init( clientsToRun: action.poolsToRun.map(\.0), clientsToShutdown: action.poolsToShutdown, @@ -246,7 +246,7 @@ package struct ValkeyClusterClientStateMachine< } package func getInitialVoters() -> [ValkeyClusterVoter] { - self.allNodeClients().map { ValkeyClusterVoter(client: $0.pool, nodeID: $0.nodeID) } + self.runningClients.clients.map { ValkeyClusterVoter(client: $0.pool, nodeID: $0.nodeID) } } package struct ClusterDiscoverySucceededAction { @@ -278,7 +278,7 @@ package struct ValkeyClusterClientStateMachine< self.refreshState = .waitingForRefresh(.init(id: refreshTimerID), previousRefresh: .init(consecutiveFailures: 0)) let newShards = description.shards - let poolUpdate = self.updateNodes( + let poolUpdate = self.runningClients.updateNodes( newShards.lazy.flatMap { $0.nodes.lazy.map { ValkeyNodeDescription(description: $0) } }, removeUnmentionedPools: true ) @@ -694,7 +694,7 @@ package struct ValkeyClusterClientStateMachine< case .refreshing: let newShards = description.shards - let poolActions = self.updateNodes( + let poolActions = self.runningClients.updateNodes( newShards.lazy.flatMap { $0.nodes.lazy.map { ValkeyNodeDescription(description: $0) } }, removeUnmentionedPools: false ) @@ -719,80 +719,18 @@ package struct ValkeyClusterClientStateMachine< case .unavailable, .degraded, .healthy: self.clusterState = .shutdown let existingNodes = self.runningClients - self.runningClients.removeAll(keepingCapacity: false) - return existingNodes.values.lazy.map { $0.pool } + self.runningClients.removeAll() + return existingNodes.clients.lazy.map { $0.pool } case .shutdown: return [] } } - private struct PoolUpdateAction { - var poolsToShutdown: [ConnectionPool] - var poolsToRun: [(ConnectionPool, ValkeyNodeID)] - - static func empty() -> PoolUpdateAction { PoolUpdateAction(poolsToShutdown: [], poolsToRun: []) } - } - - private mutating func updateNodes( - _ newNodes: some Collection, - removeUnmentionedPools: Bool - ) -> PoolUpdateAction { - var previousNodes = self.runningClients - self.runningClients.removeAll(keepingCapacity: true) - var newPools = [(ConnectionPool, ValkeyNodeID)]() - newPools.reserveCapacity(16) - var poolsToShutdown = [ConnectionPool]() - - for newNodeDescription in newNodes { - // if we had a pool previously, let's continue to use it! - if let existingPool = previousNodes.removeValue(forKey: newNodeDescription.id) { - if newNodeDescription == existingPool.nodeDescription { - // the existing pool matches the new node description. nothing todo - self.runningClients[newNodeDescription.id] = existingPool - } else { - // the existing pool does not match new node description. For example tls may now be required. - // shutdown the old pool and create a new one - poolsToShutdown.append(existingPool.pool) - let newPool = self.makePool(for: newNodeDescription) - self.runningClients[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription) - newPools.append((newPool, newNodeDescription.id)) - } - } else { - let newPool = self.makePool(for: newNodeDescription) - self.runningClients[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription) - newPools.append((newPool, newNodeDescription.id)) - } - } - - if removeUnmentionedPools { - poolsToShutdown.append(contentsOf: previousNodes.values.lazy.map { $0.pool }) - - return PoolUpdateAction( - poolsToShutdown: poolsToShutdown, - poolsToRun: newPools - ) - } - - // re-add pools that were not part of the node list. - for (nodeID, poolDescription) in previousNodes { - self.runningClients[nodeID] = poolDescription - } - - return PoolUpdateAction( - poolsToShutdown: poolsToShutdown, - poolsToRun: newPools - ) - } - private func makePool(for description: ValkeyNodeDescription) -> ConnectionPool { self.poolFactory.makeConnectionPool(nodeDescription: description) } - private func allNodeClients() -> some Collection { - self.runningClients.values - } - private mutating func nextTimerID() -> Int { defer { self._nextTimerID += 1 } return self._nextTimerID diff --git a/Sources/Valkey/Node/ValkeyRunningClients.swift b/Sources/Valkey/Node/ValkeyRunningClients.swift new file mode 100644 index 00000000..558950a2 --- /dev/null +++ b/Sources/Valkey/Node/ValkeyRunningClients.swift @@ -0,0 +1,111 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the valkey-swift project +// +// Copyright (c) 2025 the valkey-swift authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See valkey-swift/CONTRIBUTORS.txt for the list of valkey-swift authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@usableFromInline +struct ValkeyRunningClients< + ConnectionPool: Sendable, + ConnectionPoolFactory: ValkeyNodeConnectionPoolFactory +> where ConnectionPoolFactory.ConnectionPool == ConnectionPool { + @usableFromInline + /* private */ struct NodeBundle: Sendable { + @usableFromInline + var nodeID: ValkeyNodeID { self.nodeDescription.id } + @usableFromInline + var pool: ConnectionPool + @usableFromInline + var nodeDescription: ValkeyNodeDescription + } + let poolFactory: ConnectionPoolFactory + @usableFromInline + var clientMap: [ValkeyNodeID: NodeBundle] + @inlinable + var clients: some Collection { clientMap.values } + + init(poolFactory: ConnectionPoolFactory) { + self.poolFactory = poolFactory + self.clientMap = [:] + } + + struct PoolUpdateAction { + var poolsToShutdown: [ConnectionPool] + var poolsToRun: [(ConnectionPool, ValkeyNodeID)] + + static func empty() -> PoolUpdateAction { PoolUpdateAction(poolsToShutdown: [], poolsToRun: []) } + } + + mutating func updateNodes( + _ newNodes: some Collection, + removeUnmentionedPools: Bool + ) -> PoolUpdateAction { + var previousNodes = self.clientMap + self.clientMap.removeAll(keepingCapacity: true) + var newPools = [(ConnectionPool, ValkeyNodeID)]() + newPools.reserveCapacity(16) + var poolsToShutdown = [ConnectionPool]() + + for newNodeDescription in newNodes { + // if we had a pool previously, let's continue to use it! + if let existingPool = previousNodes.removeValue(forKey: newNodeDescription.id) { + if newNodeDescription == existingPool.nodeDescription { + // the existing pool matches the new node description. nothing todo + self.clientMap[newNodeDescription.id] = existingPool + } else { + // the existing pool does not match new node description. For example tls may now be required. + // shutdown the old pool and create a new one + poolsToShutdown.append(existingPool.pool) + let newPool = self.makePool(for: newNodeDescription) + self.clientMap[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription) + newPools.append((newPool, newNodeDescription.id)) + } + } else { + let newPool = self.makePool(for: newNodeDescription) + self.clientMap[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription) + newPools.append((newPool, newNodeDescription.id)) + } + } + + if removeUnmentionedPools { + poolsToShutdown.append(contentsOf: previousNodes.values.lazy.map { $0.pool }) + + return PoolUpdateAction( + poolsToShutdown: poolsToShutdown, + poolsToRun: newPools + ) + } + + // re-add pools that were not part of the node list. + for (nodeID, poolDescription) in previousNodes { + self.clientMap[nodeID] = poolDescription + } + + return PoolUpdateAction( + poolsToShutdown: poolsToShutdown, + poolsToRun: newPools + ) + } + + @inlinable + subscript(_ index: ValkeyNodeID) -> NodeBundle? { + self.clientMap[index] + } + + @usableFromInline + mutating func removeAll() { + self.clientMap.removeAll(keepingCapacity: false) + } + + func makePool(for description: ValkeyNodeDescription) -> ConnectionPool { + self.poolFactory.makeConnectionPool(nodeDescription: description) + } +}