Skip to content

Commit dce29af

Browse files
authored
ValkeyNodeClient (#149)
* Add ValkeyNode and use in cluster code Signed-off-by: Adam Fowler <[email protected]> * Use ValkeyNode internally Signed-off-by: Adam Fowler <[email protected]> * Remove ValkeyClient.triggerForceShutdown Signed-off-by: Adam Fowler <[email protected]> * inline, rename symbols, update comments Signed-off-by: Adam Fowler <[email protected]> * Fix redundant conformance error Signed-off-by: Adam Fowler <[email protected]> * ValkeyNodeClient Signed-off-by: Adam Fowler <[email protected]> * Add link to `ValkeyNodeClient` in top-level docs Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]>
1 parent dcb2761 commit dce29af

File tree

5 files changed

+320
-163
lines changed

5 files changed

+320
-163
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 22 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ public final class ValkeyClusterClient: Sendable {
6464

6565
@usableFromInline
6666
typealias StateMachine = ValkeyClusterClientStateMachine<
67-
ValkeyClient,
68-
ValkeyClientFactory,
67+
ValkeyNodeClient,
68+
ValkeyNodeClientFactory,
6969
ContinuousClock,
7070
CheckedContinuation<Void, any Error>,
7171
AsyncStream<Void>.Continuation
@@ -82,7 +82,7 @@ public final class ValkeyClusterClient: Sendable {
8282

8383
private enum RunAction {
8484
case runClusterDiscovery(runNodeDiscovery: Bool)
85-
case runClient(ValkeyClient)
85+
case runClient(ValkeyNodeClient)
8686
case runTimer(ValkeyClusterTimer)
8787
}
8888

@@ -113,7 +113,7 @@ public final class ValkeyClusterClient: Sendable {
113113
self.actionStream = stream
114114
self.actionStreamContinuation = continuation
115115

116-
let factory = ValkeyClientFactory(
116+
let factory = ValkeyNodeClientFactory(
117117
logger: logger,
118118
configuration: clientConfiguration,
119119
connectionFactory: ValkeyConnectionFactory(
@@ -153,8 +153,8 @@ public final class ValkeyClusterClient: Sendable {
153153
@inlinable
154154
public func send<Command: ValkeyCommand>(command: Command) async throws -> Command.Response {
155155
let hashSlots = command.keysAffected.map { HashSlot(key: $0) }
156-
var clientSelector: () async throws -> ValkeyClient = {
157-
try await self.client(for: hashSlots)
156+
var clientSelector: () async throws -> ValkeyNodeClient = {
157+
try await self.nodeClient(for: hashSlots)
158158
}
159159

160160
while !Task.isCancelled {
@@ -168,7 +168,7 @@ public final class ValkeyClusterClient: Sendable {
168168
throw error
169169
}
170170
self.logger.trace("Received move error", metadata: ["error": "\(movedError)"])
171-
clientSelector = { try await self.client(for: movedError) }
171+
clientSelector = { try await self.nodeClient(for: movedError) }
172172
}
173173
}
174174
throw CancellationError()
@@ -188,8 +188,8 @@ public final class ValkeyClusterClient: Sendable {
188188
operation: (ValkeyConnection) async throws -> sending Value
189189
) async throws -> Value {
190190
let hashSlots = keys.map { HashSlot(key: $0) }
191-
let client = try await self.client(for: hashSlots)
192-
return try await client.withConnection(isolation: isolation, operation: operation)
191+
let node = try await self.nodeClient(for: hashSlots)
192+
return try await node.withConnection(isolation: isolation, operation: operation)
193193
}
194194

195195
/// Starts running the cluster client.
@@ -361,13 +361,13 @@ public final class ValkeyClusterClient: Sendable {
361361
/// MOVED responses from Valkey nodes.
362362
///
363363
/// - Parameter moveError: The MOVED error response from a Valkey node.
364-
/// - Returns: A client connected to the node that can handle the request.
364+
/// - Returns: A ``ValkeyNode`` connected to the node that can handle the request.
365365
/// - Throws:
366366
/// - `ValkeyClusterError.waitedForDiscoveryAfterMovedErrorThreeTimes` if unable to resolve
367367
/// the MOVED error after multiple attempts
368368
/// - `ValkeyClusterError.clientRequestCancelled` if the request is cancelled
369369
@usableFromInline
370-
/* private */ func client(for moveError: ValkeyMovedError) async throws -> ValkeyClient {
370+
/* private */ func nodeClient(for moveError: ValkeyMovedError) async throws -> ValkeyNodeClient {
371371
var counter = 0
372372
while counter < 3 {
373373
defer { counter += 1 }
@@ -376,8 +376,8 @@ public final class ValkeyClusterClient: Sendable {
376376
}
377377

378378
switch action {
379-
case .connectionPool(let client):
380-
return client
379+
case .connectionPool(let node):
380+
return node
381381

382382
case .waitForDiscovery:
383383
break
@@ -414,25 +414,25 @@ public final class ValkeyClusterClient: Sendable {
414414
throw ValkeyClusterError.waitedForDiscoveryAfterMovedErrorThreeTimes
415415
}
416416

417-
/// Retrieves a client for communicating with nodes that manage the given hash slots.
417+
/// Retrieves a ``ValkeyNode`` for communicating with nodes that manage the given hash slots.
418418
///
419419
/// This is a lower-level method that can be used when you need direct access to a
420-
/// specific `ValkeyClient` instance for nodes managing particular hash slots. Most users
420+
/// specific `ValkeyNode` instance for nodes managing particular hash slots. Most users
421421
/// should prefer the higher-level `send(command:)` method.
422422
///
423423
/// - Parameter slots: The collection of hash slots to determine which node to connect to.
424-
/// - Returns: A `ValkeyClient` instance connected to the appropriate node.
424+
/// - Returns: A `ValkeyNode` instance connected to the appropriate node.
425425
/// - Throws:
426426
/// - `ValkeyClusterError.clusterIsUnavailable` if no healthy nodes are available
427427
/// - `ValkeyClusterError.clusterIsMissingSlotAssignment` if the slot assignment cannot be determined
428428
@inlinable
429-
func client(for slots: some (Collection<HashSlot> & Sendable)) async throws -> ValkeyClient {
429+
func nodeClient(for slots: some (Collection<HashSlot> & Sendable)) async throws -> ValkeyNodeClient {
430430
var retries = 0
431431
while retries < 3 {
432432
defer { retries += 1 }
433433

434434
do {
435-
return try self.stateLock.withLock { state -> ValkeyClient in
435+
return try self.stateLock.withLock { state -> ValkeyNodeClient in
436436
try state.poolFastPath(for: slots)
437437
}
438438
} catch ValkeyClusterError.clusterIsUnavailable {
@@ -539,7 +539,7 @@ public final class ValkeyClusterClient: Sendable {
539539
///
540540
/// - Returns: A list of voters that can participate in cluster topology election.
541541
/// - Throws: Any error encountered during node discovery.
542-
private func runNodeDiscovery() async throws -> [ValkeyClusterVoter<ValkeyClient>] {
542+
private func runNodeDiscovery() async throws -> [ValkeyClusterVoter<ValkeyNodeClient>] {
543543
do {
544544
self.logger.trace("Running node discovery")
545545
let nodes = try await self.nodeDiscovery.lookupNodes()
@@ -577,11 +577,11 @@ public final class ValkeyClusterClient: Sendable {
577577
/// - Parameter voters: The list of nodes that can vote on cluster topology.
578578
/// - Returns: The agreed-upon cluster description.
579579
/// - Throws: `ValkeyClusterError.clusterIsUnavailable` if consensus cannot be reached.
580-
private func runClusterDiscoveryFindingConsensus(voters: [ValkeyClusterVoter<ValkeyClient>]) async throws -> ValkeyClusterDescription {
580+
private func runClusterDiscoveryFindingConsensus(voters: [ValkeyClusterVoter<ValkeyNodeClient>]) async throws -> ValkeyClusterDescription {
581581
try await withThrowingTaskGroup(of: (ValkeyClusterDescription, ValkeyNodeID).self) { taskGroup in
582582
for voter in voters {
583583
taskGroup.addTask {
584-
(try await voter.client.clusterShards(), voter.nodeID)
584+
(try await voter.client.send(command: CLUSTER.SHARDS()), voter.nodeID)
585585
}
586586
}
587587

@@ -625,7 +625,7 @@ public final class ValkeyClusterClient: Sendable {
625625

626626
for voter in actions.voters {
627627
taskGroup.addTask {
628-
(try await voter.client.clusterShards(), voter.nodeID)
628+
(try await voter.client.send(command: CLUSTER.SHARDS()), voter.nodeID)
629629
}
630630
}
631631

@@ -650,80 +650,3 @@ public final class ValkeyClusterClient: Sendable {
650650
/// This allows the cluster client to be used anywhere a `ValkeyClientProtocol` is expected.
651651
@available(valkeySwift 1.0, *)
652652
extension ValkeyClusterClient: ValkeyClientProtocol {}
653-
654-
/// Extension that makes ``ValkeyClient`` conform to ``ValkeyNodeConnectionPool``.
655-
///
656-
/// This enables the ``ValkeyClusterClient`` to manage individual ``ValkeyClient`` instances.
657-
@available(valkeySwift 1.0, *)
658-
extension ValkeyClient: ValkeyNodeConnectionPool {
659-
/// Initiates a graceful shutdown of the client.
660-
///
661-
/// This method attempts to cleanly shut down the client's connections.
662-
/// If not implemented, it falls back to force shutdown.
663-
@usableFromInline
664-
package func triggerGracefulShutdown() {
665-
// TODO: Implement graceful shutdown
666-
self.triggerForceShutdown()
667-
}
668-
}
669-
670-
/// A factory for creating ``ValkeyClient`` instances to connect to specific nodes.
671-
///
672-
/// This factory is used by the ``ValkeyClusterClient`` to create client instances
673-
/// for each node in the cluster as needed.
674-
@available(valkeySwift 1.0, *)
675-
@usableFromInline
676-
package struct ValkeyClientFactory: ValkeyNodeConnectionPoolFactory {
677-
@usableFromInline
678-
package typealias ConnectionPool = ValkeyClient
679-
680-
var logger: Logger
681-
var configuration: ValkeyClientConfiguration
682-
var eventLoopGroup: any EventLoopGroup
683-
let connectionIDGenerator = ConnectionIDGenerator()
684-
let connectionFactory: ValkeyConnectionFactory
685-
686-
/// Creates a new `ValkeyClientFactory` instance.
687-
///
688-
/// - Parameters:
689-
/// - logger: The logger used for diagnostic information.
690-
/// - configuration: Configuration for the Valkey clients created by this factory.
691-
/// - eventLoopGroup: The event loop group to use for client connections.
692-
package init(
693-
logger: Logger,
694-
configuration: ValkeyClientConfiguration,
695-
connectionFactory: ValkeyConnectionFactory,
696-
eventLoopGroup: any EventLoopGroup
697-
) {
698-
self.logger = logger
699-
self.configuration = configuration
700-
self.connectionFactory = connectionFactory
701-
self.eventLoopGroup = eventLoopGroup
702-
}
703-
704-
/// Creates a connection pool (client) for a specific node in the cluster.
705-
///
706-
/// - Parameter nodeDescription: Description of the node to connect to.
707-
/// - Returns: A configured `ValkeyClient` instance ready to connect to the specified node.
708-
@usableFromInline
709-
package func makeConnectionPool(nodeDescription: ValkeyNodeDescription) -> ValkeyClient {
710-
let serverAddress = ValkeyServerAddress.hostname(
711-
nodeDescription.endpoint,
712-
port: nodeDescription.port
713-
)
714-
715-
var clientConfiguration = self.configuration
716-
if !nodeDescription.useTLS {
717-
// TODO: Should this throw? What about the other way around?
718-
clientConfiguration.tls = .disable
719-
}
720-
721-
return ValkeyClient(
722-
serverAddress,
723-
connectionIDGenerator: self.connectionIDGenerator,
724-
connectionFactory: self.connectionFactory,
725-
eventLoopGroup: self.eventLoopGroup,
726-
logger: self.logger
727-
)
728-
}
729-
}

Sources/Valkey/Documentation.docc/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ try await valkeyClient.withConnection { connection in
5151
- ``ValkeyClient``
5252
- ``ValkeyClientConfiguration``
5353
- ``ValkeyClientProtocol``
54+
- ``ValkeyNodeClient``
5455
- ``ValkeyServerAddress``
5556
- ``ValkeyConnection``
5657
- ``ValkeyConnectionConfiguration``

0 commit comments

Comments
 (0)