Skip to content

Commit c4631ef

Browse files
authored
Add ValkeyClusterClient.withConnection (#142)
* Add ValkeyClusterClient.withConnection Signed-off-by: Adam Fowler <[email protected]> * Remove outer loop Signed-off-by: Adam Fowler <[email protected]> * some Collection Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]>
1 parent cb2bca1 commit c4631ef

File tree

2 files changed

+41
-40
lines changed

2 files changed

+41
-40
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -157,21 +157,38 @@ public final class ValkeyClusterClient: Sendable {
157157
try await self.client(for: hashSlots)
158158
}
159159

160-
while true {
160+
while !Task.isCancelled {
161161
do {
162-
let respToken = try await self.retryingSend(
163-
clientSelector: clientSelector,
164-
command: command,
165-
logger: logger
166-
)
167-
return try Command.Response(fromRESP: respToken)
162+
let client = try await clientSelector()
163+
return try await client.send(command: command)
164+
} catch ValkeyClusterError.noNodeToTalkTo {
165+
// TODO: Rerun node discovery!
168166
} catch let error as ValkeyClientError where error.errorCode == .commandError {
169167
guard let errorMessage = error.message, let movedError = ValkeyMovedError(errorMessage) else {
170168
throw error
171169
}
172170
clientSelector = { try await self.client(for: movedError) }
173171
}
174172
}
173+
throw CancellationError()
174+
}
175+
176+
/// Get connection from cluster and run operation using connection
177+
///
178+
/// - Parameters:
179+
/// - keys: Keys affected by operation. This is used to choose the cluster node
180+
/// - isolation: Actor isolation
181+
/// - operation: Closure handling Valkey connection
182+
/// - Returns: Value returned by closure
183+
@inlinable
184+
public func withConnection<Value>(
185+
forKeys keys: some Collection<ValkeyKey>,
186+
isolation: isolated (any Actor)? = #isolation,
187+
operation: (ValkeyConnection) async throws -> sending Value
188+
) async throws -> Value {
189+
let hashSlots = keys.map { HashSlot(key: $0) }
190+
let client = try await self.client(for: hashSlots)
191+
return try await client.withConnection(isolation: isolation, operation: operation)
175192
}
176193

177194
/// Starts running the cluster client.
@@ -455,39 +472,6 @@ public final class ValkeyClusterClient: Sendable {
455472
throw ValkeyClusterError.clusterIsMissingSlotAssignment
456473
}
457474

458-
/// Sends a command to the Valkey cluster with automatic retries and error handling.
459-
///
460-
/// This internal method handles:
461-
/// - Client selection strategy using the provided selector function
462-
/// - Automatic retries for certain types of failures
463-
/// - Task cancellation
464-
///
465-
/// - Parameters:
466-
/// - clientSelector: A function that resolves to the appropriate client for this command.
467-
/// - command: The command to send to the cluster.
468-
/// - logger: The logger to use for diagnostic information.
469-
/// - Returns: The raw RESP token response.
470-
/// - Throws:
471-
/// - `CancellationError` if the task is cancelled
472-
/// - Other errors if command execution fails after retries
473-
@inlinable
474-
func retryingSend(
475-
clientSelector: () async throws -> ValkeyClient,
476-
command: some ValkeyCommand,
477-
logger: Logger
478-
) async throws -> RESPToken {
479-
while !Task.isCancelled {
480-
do {
481-
let client = try await clientSelector()
482-
return try await client._send(command)
483-
} catch ValkeyClusterError.noNodeToTalkTo {
484-
// TODO: Rerun node discovery!
485-
}
486-
}
487-
488-
throw CancellationError()
489-
}
490-
491475
/// Generates a new unique request ID for tracking internal operations.
492476
///
493477
/// - Returns: A unique integer ID for tracking requests and waiters.

Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,23 @@ struct ClusterIntegrationTests {
3636
}
3737
}
3838

39+
@available(valkeySwift 1.0, *)
40+
func testWithConnection() async throws {
41+
var logger = Logger(label: "ValkeyCluster")
42+
logger.logLevel = .trace
43+
let firstNodeHostname = ClusterIntegrationTests.firstNodeHostname!
44+
let firstNodePort = ClusterIntegrationTests.firstNodePort ?? 6379
45+
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)]) { (client, logger) in
46+
try await Self.withKey(connection: client) { key in
47+
try await client.withConnection(forKeys: [key]) { connection in
48+
_ = try await connection.set(key, value: "Hello")
49+
let response = try await connection.get(key)
50+
#expect(response.map { String(buffer: $0) } == "Hello")
51+
}
52+
}
53+
}
54+
}
55+
3956
@available(valkeySwift 1.0, *)
4057
static func withKey<Value>(
4158
connection: some ValkeyConnectionProtocol,

0 commit comments

Comments
 (0)