Skip to content

Commit 62a2854

Browse files
authored
Cluster: Retry command on receiving specific errors (#215)
* Cluster: Retry command on receiving specific errors The list of errors that cause a retry are TRYAGAIN, MASTERDOWN, CLUSTERDOWN and LOADING. This list was obtained from valkey-glide. The retry wait time calculation was also taken from valkey-glide Signed-off-by: Adam Fowler <[email protected]> * Fix documentation issue Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]>
1 parent 064e92f commit 62a2854

File tree

3 files changed

+107
-4
lines changed

3 files changed

+107
-4
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public final class ValkeyClusterClient: Sendable {
7373
/* private */ let stateLock: Mutex<StateMachine>
7474
@usableFromInline
7575
/* private */ let nextRequestIDGenerator = Atomic(0)
76+
@usableFromInline
77+
/* private */ let clientConfiguration: ValkeyClientConfiguration
7678

7779
private enum RunAction {
7880
case runClusterDiscovery(runNodeDiscovery: Bool)
@@ -101,6 +103,7 @@ public final class ValkeyClusterClient: Sendable {
101103
connectionFactory: (@Sendable (ValkeyServerAddress, any EventLoop) async throws -> any Channel)? = nil
102104
) {
103105
self.logger = logger
106+
self.clientConfiguration = clientConfiguration
104107

105108
(self.actionStream, self.actionStreamContinuation) = AsyncStream.makeStream(of: RunAction.self)
106109

@@ -149,10 +152,12 @@ public final class ValkeyClusterClient: Sendable {
149152
}
150153

151154
var asking = false
155+
var attempt = 0
152156
while !Task.isCancelled {
153157
do {
154158
let client = try await clientSelector()
155159
if asking {
160+
asking = false
156161
// if asking we need to call ASKING beforehand otherwise we will get a MOVE error
157162
return try await client.execute(
158163
ASKING(),
@@ -164,12 +169,25 @@ public final class ValkeyClusterClient: Sendable {
164169
} catch ValkeyClusterError.noNodeToTalkTo {
165170
// TODO: Rerun node discovery!
166171
} catch let error as ValkeyClientError where error.errorCode == .commandError {
167-
guard let errorMessage = error.message, let redirectError = ValkeyClusterRedirectionError(errorMessage) else {
172+
guard let errorMessage = error.message else {
168173
throw error
169174
}
170-
self.logger.trace("Received redirect error", metadata: ["error": "\(redirectError)"])
171-
clientSelector = { try await self.nodeClient(for: redirectError) }
172-
asking = (redirectError.redirection == .ask)
175+
attempt += 1
176+
if let redirectError = ValkeyClusterRedirectionError(errorMessage) {
177+
self.logger.trace("Received redirect error", metadata: ["error": "\(redirectError)"])
178+
clientSelector = { try await self.nodeClient(for: redirectError) }
179+
asking = (redirectError.redirection == .ask)
180+
} else {
181+
let prefix = errorMessage.prefix { $0 != " " }
182+
switch prefix {
183+
case "TRYAGAIN", "MASTERDOWN", "CLUSTERDOWN", "LOADING":
184+
self.logger.trace("Received cluster error", metadata: ["error": "\(prefix)"])
185+
let wait = self.clientConfiguration.retryParameters.calculateWaitTime(retry: attempt)
186+
try await Task.sleep(for: wait)
187+
default:
188+
throw error
189+
}
190+
}
173191
}
174192
}
175193
throw CancellationError()

Sources/Valkey/ValkeyClientConfiguration.swift

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,20 @@
99
import NIOSSL
1010
import _ValkeyConnectionPool
1111

12+
#if canImport(Darwin)
13+
import Darwin
14+
#elseif canImport(Glibc)
15+
import Glibc
16+
#elseif canImport(Musl)
17+
import Musl
18+
#elseif canImport(WinSDK)
19+
import WinSDK
20+
#elseif canImport(Bionic)
21+
import Bionic
22+
#else
23+
#error("Unsupported platform")
24+
#endif
25+
1226
/// Configuration for the Valkey client.
1327
@available(valkeySwift 1.0, *)
1428
public struct ValkeyClientConfiguration: Sendable {
@@ -64,6 +78,43 @@ public struct ValkeyClientConfiguration: Sendable {
6478
}
6579
}
6680

81+
/// Retry parameters for when a client needs to retry a command
82+
public struct RetryParameters: Sendable {
83+
let exponentBase: Double
84+
let factor: Double
85+
let minWaitTime: Double
86+
let maxWaitTime: Double
87+
88+
/// Initialize RetryParameters
89+
/// - Parameters:
90+
/// - exponentBase: Exponent base number
91+
/// - factor: Duration to multiple exponent by get base wait value
92+
/// - minWaitTime: Minimum wait time
93+
/// - maxWaitTime: Maximum wait time
94+
public init(
95+
exponentBase: Double = 2,
96+
factor: Duration = .milliseconds(10.0),
97+
minWaitTime: Duration = .seconds(1.28),
98+
maxWaitTime: Duration = .seconds(655.36)
99+
) {
100+
self.exponentBase = exponentBase
101+
self.factor = factor / .milliseconds(1)
102+
self.minWaitTime = minWaitTime / .milliseconds(1)
103+
self.maxWaitTime = maxWaitTime / .milliseconds(1)
104+
}
105+
106+
/// Calculate wait time for retry number
107+
///
108+
/// This code is a copy from the `RetryParam` type in cluster_clients.rs of valkey-glide,
109+
@usableFromInline
110+
func calculateWaitTime(retry: Int) -> Duration {
111+
let baseWait = pow(self.exponentBase, Double(retry)) * self.factor
112+
let clampedWait = max(min(baseWait, self.maxWaitTime), self.minWaitTime)
113+
let jitteredWait = Double.random(in: minWaitTime...clampedWait)
114+
return .milliseconds(jitteredWait)
115+
}
116+
}
117+
67118
/// The connection pool definition for Valkey connections.
68119
public struct ConnectionPool: Hashable, Sendable {
69120
/// The minimum number of connections to preserve in the pool.
@@ -108,6 +159,8 @@ public struct ValkeyClientConfiguration: Sendable {
108159
public var connectionPool: ConnectionPool
109160
/// The keep alive behavior for the connection.
110161
public var keepAliveBehavior: KeepAliveBehavior
162+
/// Retry parameters for when a client needs to retry a command
163+
public var retryParameters: RetryParameters
111164
/// The timeout the client uses to determine if a connection is considered dead.
112165
///
113166
/// The connection is considered dead if a response isn't received within this time.
@@ -130,20 +183,23 @@ public struct ValkeyClientConfiguration: Sendable {
130183
/// - authentication: The authentication credentials.
131184
/// - connectionPool: The connection pool configuration.
132185
/// - keepAliveBehavior: The connection keep alive behavior.
186+
/// - retryParameters: Retry parameters for when client returns an error that requires a retry
133187
/// - commandTimeout: The timeout for a connection response.
134188
/// - blockingCommandTimeout: The timeout for a blocking command response.
135189
/// - tls: The TLS configuration.
136190
public init(
137191
authentication: Authentication? = nil,
138192
connectionPool: ConnectionPool = .init(),
139193
keepAliveBehavior: KeepAliveBehavior = .init(),
194+
retryParameters: RetryParameters = .init(),
140195
commandTimeout: Duration = .seconds(30),
141196
blockingCommandTimeout: Duration = .seconds(120),
142197
tls: TLS = .disable
143198
) {
144199
self.authentication = authentication
145200
self.connectionPool = connectionPool
146201
self.keepAliveBehavior = keepAliveBehavior
202+
self.retryParameters = retryParameters
147203
self.commandTimeout = commandTimeout
148204
self.blockingCommandTimeout = blockingCommandTimeout
149205
self.tls = tls

Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,35 @@ struct ClusterIntegrationTests {
112112
}
113113
}
114114

115+
@Test
116+
@available(valkeySwift 1.0, *)
117+
func testHashSlotMigrationAndTryAgain() async throws {
118+
var logger = Logger(label: "ValkeyCluster")
119+
logger.logLevel = .trace
120+
let firstNodeHostname = clusterFirstNodeHostname!
121+
let firstNodePort = clusterFirstNodePort ?? 6379
122+
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)], logger: logger) { client in
123+
let keySuffix = "{\(UUID().uuidString)}"
124+
try await Self.withKey(connection: client, suffix: keySuffix) { key in
125+
try await Self.withKey(connection: client, suffix: keySuffix) { key2 in
126+
let hashSlot = HashSlot(key: key)
127+
try await client.lpush(key, elements: ["testing"])
128+
129+
try await testMigratingHashSlot(hashSlot, client: client) {
130+
} duringMigrate: {
131+
try await client.rpoplpush(source: key, destination: key2)
132+
}
133+
}
134+
}
135+
}
136+
}
137+
115138
@available(valkeySwift 1.0, *)
116139
func testMigratingHashSlot(
117140
_ hashSlot: HashSlot,
118141
client: ValkeyClusterClient,
119142
beforeMigrate: () async throws -> Void,
143+
duringMigrate: sending () async throws -> Void = {},
120144
afterMigrate: () async throws -> Void = {},
121145
finished: () async throws -> Void = {}
122146
) async throws {
@@ -139,6 +163,8 @@ struct ClusterIntegrationTests {
139163
_ = try await nodeBClient.execute(CLUSTER.SETSLOT(slot: numericCast(hashSlot.rawValue), subcommand: .importing(clientAID)))
140164
_ = try await nodeAClient.execute(CLUSTER.SETSLOT(slot: numericCast(hashSlot.rawValue), subcommand: .migrating(clientBID)))
141165

166+
async let duringMigrateTask: Void = duringMigrate()
167+
142168
try await beforeMigrate()
143169

144170
// get keys associated with slot and migrate them
@@ -156,6 +182,9 @@ struct ClusterIntegrationTests {
156182
_ = try await nodeAClient.execute(CLUSTER.SETSLOT(slot: numericCast(hashSlot.rawValue), subcommand: .node(clientBID)))
157183
_ = try await nodeBClient.execute(CLUSTER.SETSLOT(slot: numericCast(hashSlot.rawValue), subcommand: .node(clientBID)))
158184

185+
// wait for during migrate
186+
try await duringMigrateTask
187+
159188
try await finished()
160189
result = .success(())
161190
} catch {

0 commit comments

Comments
 (0)