Skip to content

Commit 4f72314

Browse files
authored
Generic node description (#230)
* Add associatedtype NodeDescription to ValkeyNodeConnectionPoolFactory This allows us have separate node descriptions for different implementations of ValkeyRunningClientsStateMachine Signed-off-by: Adam Fowler <[email protected]> * Add separate ValkeyNodeConnectionPoolFactory for cluster and standalone Signed-off-by: Adam Fowler <[email protected]> * Remove ConnectionPool typealias Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]>
1 parent be8d32c commit 4f72314

7 files changed

+82
-51
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public final class ValkeyClusterClient: Sendable {
5858
@usableFromInline
5959
typealias StateMachine = ValkeyClusterClientStateMachine<
6060
ValkeyNodeClient,
61-
ValkeyNodeClientFactory,
61+
ValkeyClusterNodeClientFactory,
6262
ContinuousClock,
6363
CheckedContinuation<Void, any Error>,
6464
AsyncStream<Void>.Continuation
@@ -106,7 +106,7 @@ public final class ValkeyClusterClient: Sendable {
106106

107107
(self.actionStream, self.actionStreamContinuation) = AsyncStream.makeStream(of: RunAction.self)
108108

109-
let factory = ValkeyNodeClientFactory(
109+
let factory = ValkeyClusterNodeClientFactory(
110110
logger: logger,
111111
configuration: clientConfiguration,
112112
connectionFactory: ValkeyConnectionFactory(

Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ package struct ValkeyClusterClientStateMachine<
6060
Clock: _Concurrency.Clock,
6161
SuccessNotifier,
6262
TimerCancellationToken: Sendable
63-
> where ConnectionPool == ConnectionPoolFactory.ConnectionPool, Clock.Duration == Duration {
63+
>
64+
where
65+
ConnectionPool == ConnectionPoolFactory.ConnectionPool,
66+
Clock.Duration == Duration,
67+
ConnectionPoolFactory.NodeDescription == ValkeyNodeDescription
68+
{
6469

6570
@usableFromInline
6671
struct Timer {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
//
2+
// This source file is part of the valkey-swift project
3+
// Copyright (c) 2025 the valkey-swift project authors
4+
//
5+
// See LICENSE.txt for license information
6+
// SPDX-License-Identifier: Apache-2.0
7+
//
8+
import Logging
9+
import NIOCore
10+
11+
/// A factory for creating ``ValkeyNode`` instances to connect to specific nodes.
12+
///
13+
/// This factory is used by the ``ValkeyClusterClient`` to create client instances
14+
/// for each node in the cluster as needed.
15+
@available(valkeySwift 1.0, *)
16+
@usableFromInline
17+
package struct ValkeyClusterNodeClientFactory: ValkeyNodeConnectionPoolFactory {
18+
var logger: Logger
19+
var configuration: ValkeyClientConfiguration
20+
var eventLoopGroup: any EventLoopGroup
21+
let connectionIDGenerator = ConnectionIDGenerator()
22+
let connectionFactory: ValkeyConnectionFactory
23+
24+
/// Creates a new `ValkeyClusterNodeClientFactory` instance.
25+
///
26+
/// - Parameters:
27+
/// - logger: The logger used for diagnostic information.
28+
/// - configuration: Configuration for the Valkey clients created by this factory.
29+
/// - eventLoopGroup: The event loop group to use for client connections.
30+
package init(
31+
logger: Logger,
32+
configuration: ValkeyClientConfiguration,
33+
connectionFactory: ValkeyConnectionFactory,
34+
eventLoopGroup: any EventLoopGroup
35+
) {
36+
self.logger = logger
37+
self.configuration = configuration
38+
self.connectionFactory = connectionFactory
39+
self.eventLoopGroup = eventLoopGroup
40+
}
41+
42+
/// Creates a connection pool (client) for a specific node in the cluster.
43+
///
44+
/// - Parameter nodeDescription: Description of the node to connect to.
45+
/// - Returns: A configured `ValkeyNode` instance ready to connect to the specified node.
46+
@usableFromInline
47+
package func makeConnectionPool(nodeDescription: ValkeyNodeDescription) -> ValkeyNodeClient {
48+
let address = ValkeyServerAddress.hostname(nodeDescription.endpoint, port: nodeDescription.port)
49+
return ValkeyNodeClient(
50+
address,
51+
connectionIDGenerator: self.connectionIDGenerator,
52+
connectionFactory: self.connectionFactory,
53+
eventLoopGroup: self.eventLoopGroup,
54+
logger: self.logger
55+
)
56+
}
57+
}

Sources/Valkey/Node/ValkeyNodeClientFactory.swift

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ import NIOCore
1515
@available(valkeySwift 1.0, *)
1616
@usableFromInline
1717
package struct ValkeyNodeClientFactory: ValkeyNodeConnectionPoolFactory {
18-
@usableFromInline
19-
package typealias ConnectionPool = ValkeyNodeClient
20-
2118
var logger: Logger
2219
var configuration: ValkeyClientConfiguration
2320
var eventLoopGroup: any EventLoopGroup
@@ -47,29 +44,9 @@ package struct ValkeyNodeClientFactory: ValkeyNodeConnectionPoolFactory {
4744
/// - Parameter nodeDescription: Description of the node to connect to.
4845
/// - Returns: A configured `ValkeyNode` instance ready to connect to the specified node.
4946
@usableFromInline
50-
package func makeConnectionPool(nodeDescription: ValkeyNodeDescription) -> ValkeyNodeClient {
51-
let serverAddress = ValkeyServerAddress.hostname(
52-
nodeDescription.endpoint,
53-
port: nodeDescription.port
54-
)
55-
56-
return ValkeyNodeClient(
57-
serverAddress,
58-
connectionIDGenerator: self.connectionIDGenerator,
59-
connectionFactory: self.connectionFactory,
60-
eventLoopGroup: self.eventLoopGroup,
61-
logger: self.logger
62-
)
63-
}
64-
65-
/// Creates a connection pool (client) for a specific node in the cluster.
66-
///
67-
/// - Parameter nodeDescription: Description of the node to connect to.
68-
/// - Returns: A configured `ValkeyNode` instance ready to connect to the specified node.
69-
@usableFromInline
70-
package func makeConnectionPool(serverAddress: ValkeyServerAddress) -> ValkeyNodeClient {
47+
package func makeConnectionPool(nodeDescription: ValkeyServerAddress) -> ValkeyNodeClient {
7148
ValkeyNodeClient(
72-
serverAddress,
49+
nodeDescription,
7350
connectionIDGenerator: self.connectionIDGenerator,
7451
connectionFactory: self.connectionFactory,
7552
eventLoopGroup: self.eventLoopGroup,

Sources/Valkey/Cluster/ValkeyNodeConnectionPool.swift renamed to Sources/Valkey/Node/ValkeyNodeConnectionPool.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ package protocol ValkeyNodeConnectionPool: AnyObject, Sendable {
2525
@available(valkeySwift 1.0, *)
2626
package protocol ValkeyNodeConnectionPoolFactory: Sendable {
2727
associatedtype ConnectionPool: ValkeyNodeConnectionPool
28+
associatedtype NodeDescription
2829

2930
/// Create a shard connection pool based on the provided configuration
3031
func makeConnectionPool(
31-
nodeDescription: ValkeyNodeDescription
32+
nodeDescription: NodeDescription
3233
) -> ConnectionPool
3334
}

Sources/Valkey/Node/ValkeyRunningClients.swift

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,20 @@
1010
struct ValkeyRunningClientsStateMachine<
1111
ConnectionPool: Sendable,
1212
ConnectionPoolFactory: ValkeyNodeConnectionPoolFactory
13-
> where ConnectionPoolFactory.ConnectionPool == ConnectionPool {
13+
> where ConnectionPoolFactory.ConnectionPool == ConnectionPool, ConnectionPoolFactory.NodeDescription: Sendable & Identifiable {
14+
@usableFromInline typealias NodeDescription = ConnectionPoolFactory.NodeDescription
1415
@usableFromInline
1516
/* private */ struct NodeBundle: Sendable {
1617
@usableFromInline
17-
var nodeID: ValkeyNodeID { self.nodeDescription.id }
18+
var nodeID: NodeDescription.ID { self.nodeDescription.id }
1819
@usableFromInline
1920
var pool: ConnectionPool
2021
@usableFromInline
21-
var nodeDescription: ValkeyNodeDescription
22+
var nodeDescription: NodeDescription
2223
}
2324
let poolFactory: ConnectionPoolFactory
2425
@usableFromInline
25-
var clientMap: [ValkeyNodeID: NodeBundle]
26+
var clientMap: [NodeDescription.ID: NodeBundle]
2627
@inlinable
2728
var clients: some Collection<NodeBundle> { clientMap.values }
2829

@@ -33,35 +34,25 @@ struct ValkeyRunningClientsStateMachine<
3334

3435
struct PoolUpdateAction {
3536
var poolsToShutdown: [ConnectionPool]
36-
var poolsToRun: [(ConnectionPool, ValkeyNodeID)]
37+
var poolsToRun: [(ConnectionPool, NodeDescription.ID)]
3738

3839
static func empty() -> PoolUpdateAction { PoolUpdateAction(poolsToShutdown: [], poolsToRun: []) }
3940
}
4041

4142
mutating func updateNodes(
42-
_ newNodes: some Collection<ValkeyNodeDescription>,
43+
_ newNodes: some Collection<NodeDescription>,
4344
removeUnmentionedPools: Bool
4445
) -> PoolUpdateAction {
4546
var previousNodes = self.clientMap
4647
self.clientMap.removeAll(keepingCapacity: true)
47-
var newPools = [(ConnectionPool, ValkeyNodeID)]()
48+
var newPools = [(ConnectionPool, NodeDescription.ID)]()
4849
newPools.reserveCapacity(16)
4950
var poolsToShutdown = [ConnectionPool]()
5051

5152
for newNodeDescription in newNodes {
5253
// if we had a pool previously, let's continue to use it!
5354
if let existingPool = previousNodes.removeValue(forKey: newNodeDescription.id) {
54-
if newNodeDescription == existingPool.nodeDescription {
55-
// the existing pool matches the new node description. nothing todo
56-
self.clientMap[newNodeDescription.id] = existingPool
57-
} else {
58-
// the existing pool does not match new node description. For example tls may now be required.
59-
// shutdown the old pool and create a new one
60-
poolsToShutdown.append(existingPool.pool)
61-
let newPool = self.makePool(for: newNodeDescription)
62-
self.clientMap[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription)
63-
newPools.append((newPool, newNodeDescription.id))
64-
}
55+
self.clientMap[newNodeDescription.id] = existingPool
6556
} else {
6657
let newPool = self.makePool(for: newNodeDescription)
6758
self.clientMap[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription)
@@ -95,7 +86,7 @@ struct ValkeyRunningClientsStateMachine<
9586
}
9687

9788
mutating func addNode(
98-
_ node: ValkeyNodeDescription
89+
_ node: NodeDescription
9990
) -> AddNodeAction {
10091
if let pool = self.clientMap[node.id] {
10192
return .useExistingPool(pool.pool)
@@ -106,7 +97,7 @@ struct ValkeyRunningClientsStateMachine<
10697
}
10798

10899
@inlinable
109-
subscript(_ index: ValkeyNodeID) -> NodeBundle? {
100+
subscript(_ index: NodeDescription.ID) -> NodeBundle? {
110101
self.clientMap[index]
111102
}
112103

@@ -115,7 +106,7 @@ struct ValkeyRunningClientsStateMachine<
115106
self.clientMap.removeAll(keepingCapacity: false)
116107
}
117108

118-
func makePool(for description: ValkeyNodeDescription) -> ConnectionPool {
109+
func makePool(for description: NodeDescription) -> ConnectionPool {
119110
self.poolFactory.makeConnectionPool(nodeDescription: description)
120111
}
121112
}

Sources/Valkey/ValkeyClient.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public final class ValkeyClient: Sendable {
8282
self.eventLoopGroup = eventLoopGroup
8383
self.logger = logger
8484
self.runningAtomic = .init(false)
85-
self.node = self.nodeClientFactory.makeConnectionPool(serverAddress: address)
85+
self.node = self.nodeClientFactory.makeConnectionPool(nodeDescription: address)
8686
(self.actionStream, self.actionStreamContinuation) = AsyncStream.makeStream(of: RunAction.self)
8787
self.queueAction(.runNodeClient(self.node))
8888
}

0 commit comments

Comments
 (0)