Skip to content

Commit 14aae56

Browse files
authored
Fix cluster moved error handling (#119)
1 parent 411e724 commit 14aae56

File tree

3 files changed

+84
-122
lines changed

3 files changed

+84
-122
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 66 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -155,26 +155,25 @@ public final class ValkeyClusterClient: Sendable {
155155
@inlinable
156156
public func send<Command: ValkeyCommand>(command: Command) async throws -> Command.Response {
157157
let hashSlots = command.keysAffected.map { HashSlot(key: $0) }
158-
let clientSelector: () async throws -> ValkeyClient = {
158+
var clientSelector: () async throws -> ValkeyClient = {
159159
try await self.client(for: hashSlots)
160160
}
161161

162-
var respToken = try await self.retryingSend(
163-
clientSelector: clientSelector,
164-
command: command,
165-
logger: logger
166-
)
167-
168-
// TODO: We might want to collect redirects here for diagnostic purposes.
169-
while let movedError = respToken.parseMovedError() {
170-
respToken = try await self.retryingSend(
171-
clientSelector: { try await self.client(for: movedError) },
172-
command: command,
173-
logger: logger
174-
)
162+
while true {
163+
do {
164+
let respToken = try await self.retryingSend(
165+
clientSelector: clientSelector,
166+
command: command,
167+
logger: logger
168+
)
169+
return try Command.Response(fromRESP: respToken)
170+
} catch let error as ValkeyClientError where error.errorCode == .commandError {
171+
guard let errorMessage = error.message, let movedError = ValkeyMovedError(errorMessage) else {
172+
throw error
173+
}
174+
clientSelector = { try await self.client(for: movedError) }
175+
}
175176
}
176-
177-
return try Command.Response(fromRESP: respToken)
178177
}
179178

180179
/// Starts running the cluster client.
@@ -195,7 +194,7 @@ public final class ValkeyClusterClient: Sendable {
195194
/// group.addTask {
196195
/// await client.run()
197196
/// }
198-
///
197+
///
199198
/// // use the client here
200199
/// let foo = try await client.get(key: "foo")
201200
/// }
@@ -209,7 +208,7 @@ public final class ValkeyClusterClient: Sendable {
209208
self.actionStreamContinuation.yield(.runClusterDiscovery(runNodeDiscovery: true))
210209

211210
await withTaskCancellationHandler {
212-
await withDiscardingTaskGroup() { taskGroup in
211+
await withDiscardingTaskGroup { taskGroup in
213212
await self.runUsingTaskGroup(&taskGroup)
214213
}
215214
} onCancel: {
@@ -427,7 +426,6 @@ public final class ValkeyClusterClient: Sendable {
427426
try await withCheckedThrowingContinuation {
428427
(continuation: CheckedContinuation<Void, any Error>) in
429428

430-
431429
let action = self.stateLock.withLock {
432430
$0.waitForHealthy(waiterID: waiterID, successNotifier: continuation)
433431
}
@@ -486,9 +484,6 @@ public final class ValkeyClusterClient: Sendable {
486484
return try await client._send(command)
487485
} catch ValkeyClusterError.noNodeToTalkTo {
488486
// TODO: Rerun node discovery!
489-
} catch let error as ValkeyClientError {
490-
fatalError("TODO: error received: \(error)")
491-
break
492487
}
493488
}
494489

@@ -528,13 +523,14 @@ public final class ValkeyClusterClient: Sendable {
528523
/// - Parameter runNodeDiscoveryFirst: Whether to run node discovery before querying for cluster topology.
529524
private func runClusterDiscovery(runNodeDiscoveryFirst: Bool) async {
530525
do {
531-
let voters = if runNodeDiscoveryFirst {
532-
try await self.runNodeDiscovery()
533-
} else {
534-
self.stateLock.withLock {
535-
$0.getInitialVoters()
526+
let voters =
527+
if runNodeDiscoveryFirst {
528+
try await self.runNodeDiscovery()
529+
} else {
530+
self.stateLock.withLock {
531+
$0.getInitialVoters()
532+
}
536533
}
537-
}
538534

539535
let clusterDescription = try await self.runClusterDiscoveryFindingConsensus(voters: voters)
540536
let action = self.stateLock.withLock {
@@ -543,9 +539,12 @@ public final class ValkeyClusterClient: Sendable {
543539

544540
self.runClusterDiscoverySucceededAction(action)
545541
} catch {
546-
self.logger.debug("Valkey cluster discovery failed", metadata: [
547-
"error": "\(error)",
548-
])
542+
self.logger.debug(
543+
"Valkey cluster discovery failed",
544+
metadata: [
545+
"error": "\(error)"
546+
]
547+
)
549548
let action = self.stateLock.withLock {
550549
$0.valkeyClusterDiscoveryFailed(error)
551550
}
@@ -567,30 +566,36 @@ public final class ValkeyClusterClient: Sendable {
567566
let actions = self.stateLock.withLock {
568567
$0.updateValkeyServiceNodes(mapped)
569568
}
570-
self.logger.debug("Discovered nodes", metadata: [
571-
"node_count": "\(nodes.count)",
572-
])
569+
self.logger.debug(
570+
"Discovered nodes",
571+
metadata: [
572+
"node_count": "\(nodes.count)"
573+
]
574+
)
573575
self.runUpdateValkeyNodesAction(actions)
574576
return actions.voters
575577
} catch {
576-
self.logger.debug("Failed to discover nodes", metadata: [
577-
"error": "\(error)",
578-
])
578+
self.logger.debug(
579+
"Failed to discover nodes",
580+
metadata: [
581+
"error": "\(error)"
582+
]
583+
)
579584
throw error
580585
}
581586
}
582587

583588
/// Establishes consensus on the cluster topology by querying multiple nodes.
584589
///
585-
/// This method uses a voting mechanism to establish consensus among multiple nodes
590+
/// This method uses a voting mechanism to establish consensus among multiple nodes
586591
/// about the current cluster topology. It requires a quorum of nodes to agree
587592
/// on the topology before accepting it.
588593
///
589594
/// - Parameter voters: The list of nodes that can vote on cluster topology.
590595
/// - Returns: The agreed-upon cluster description.
591596
/// - Throws: `ValkeyClusterError.clusterIsUnavailable` if consensus cannot be reached.
592597
private func runClusterDiscoveryFindingConsensus(voters: [ValkeyClusterVoter<ValkeyClient>]) async throws -> ValkeyClusterDescription {
593-
return try await withThrowingTaskGroup(of: (ValkeyClusterDescription, ValkeyNodeID).self) { taskGroup in
598+
try await withThrowingTaskGroup(of: (ValkeyClusterDescription, ValkeyNodeID).self) { taskGroup in
594599
for voter in voters {
595600
taskGroup.addTask {
596601
(try await voter.client.clusterShards(), voter.nodeID)
@@ -606,17 +611,23 @@ public final class ValkeyClusterClient: Sendable {
606611
do {
607612
let metrics = try election.voteReceived(for: description, from: nodeID)
608613

609-
self.logger.debug("Vote received", metadata: [
610-
"candidate_count": "\(metrics.candidateCount)",
611-
"candidate": "\(metrics.candidate)",
612-
"votes_received": "\(metrics.votesReceived)",
613-
"votes_needed": "\(metrics.votesNeeded)"
614-
])
614+
self.logger.debug(
615+
"Vote received",
616+
metadata: [
617+
"candidate_count": "\(metrics.candidateCount)",
618+
"candidate": "\(metrics.candidate)",
619+
"votes_received": "\(metrics.votesReceived)",
620+
"votes_needed": "\(metrics.votesNeeded)",
621+
]
622+
)
615623
} catch let error as ValkeyClusterError {
616-
self.logger.debug("Vote invalid", metadata: [
617-
"nodeID": "\(nodeID)",
618-
"error": "\(error)",
619-
])
624+
self.logger.debug(
625+
"Vote invalid",
626+
metadata: [
627+
"nodeID": "\(nodeID)",
628+
"error": "\(error)",
629+
]
630+
)
620631
continue
621632
}
622633

@@ -636,9 +647,12 @@ public final class ValkeyClusterClient: Sendable {
636647
}
637648

638649
case .failure(let error):
639-
self.logger.debug("Received an error while asking for cluster topology", metadata: [
640-
"error": "\(error)"
641-
])
650+
self.logger.debug(
651+
"Received an error while asking for cluster topology",
652+
metadata: [
653+
"error": "\(error)"
654+
]
655+
)
642656
}
643657
}
644658

Sources/Valkey/Cluster/ValkeyMovedError.swift

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -45,46 +45,20 @@ package struct ValkeyMovedError: Hashable, Sendable {
4545
}
4646
}
4747

48-
extension RESPToken {
48+
extension ValkeyMovedError {
4949
static let movedPrefix = "MOVED "
5050

51-
/// Attempts to parse a RESP error token as a Valkey MOVED error.
51+
/// Attempts to parse a Valkey MOVED error from a String.
5252
///
53-
/// This method extracts the hash slot, endpoint, and port information from a RESP error
54-
/// token if it represents a Valkey MOVED error. MOVED errors are returned by Valkey cluster
53+
/// This method extracts the hash slot, endpoint, and port information from the string
54+
/// if it represents a Valkey MOVED error. MOVED errors are returned by Valkey cluster
5555
/// nodes when a client attempts to access a key that belongs to a different node.
5656
///
5757
/// The error format is expected to be: `"MOVED <slot> <endpoint>:<port>"`
5858
///
5959
/// - Returns: A `ValkeyMovedError` if the token represents a valid MOVED error, or `nil` otherwise.
6060
@usableFromInline
61-
func parseMovedError() -> ValkeyMovedError? {
62-
let byteBuffer: ByteBuffer? =
63-
switch self.value {
64-
case .bulkError(let byteBuffer),
65-
.simpleError(let byteBuffer):
66-
byteBuffer
67-
68-
case .simpleString,
69-
.bulkString,
70-
.verbatimString,
71-
.number,
72-
.double,
73-
.boolean,
74-
.bigNumber,
75-
.array,
76-
.attribute,
77-
.map,
78-
.set,
79-
.push,
80-
.null:
81-
nil
82-
}
83-
84-
guard var byteBuffer else {
85-
return nil
86-
}
87-
let errorMessage = byteBuffer.readString(length: byteBuffer.readableBytes)!
61+
init?(_ errorMessage: String) {
8862
guard errorMessage.hasPrefix(Self.movedPrefix) else {
8963
return nil
9064
}
@@ -112,6 +86,6 @@ extension RESPToken {
11286
return nil
11387
}
11488

115-
return ValkeyMovedError(slot: slot, endpoint: Swift.String(endpoint), port: port)
89+
self = ValkeyMovedError(slot: slot, endpoint: Swift.String(endpoint), port: port)
11690
}
11791
}

Tests/ValkeyTests/Cluster/ValkeyMovedErrorTests.swift

Lines changed: 12 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,8 @@ struct ValkeyMovedErrorTests {
2222

2323
@Test("parseMovedError parses valid MOVED error")
2424
func testParseValidMovedError() async throws {
25-
// Create a RESPToken with a MOVED error
26-
let token = RESPToken(.simpleError("MOVED 1234 redis.example.com:6379"))
27-
2825
// Parse the moved error
29-
let movedError = token.parseMovedError()
26+
let movedError = ValkeyMovedError("MOVED 1234 redis.example.com:6379")
3027

3128
// Verify the moved error is parsed correctly
3229
#expect(movedError != nil)
@@ -39,11 +36,9 @@ struct ValkeyMovedErrorTests {
3936
func testParseValidMovedErrorFromBulkError() async throws {
4037
// Create a RESPToken with a MOVED error
4138
let errorMessage = "MOVED 5000 10.0.0.1:6380"
42-
let byteBuffer = ByteBuffer(string: errorMessage)
43-
let token = RESPToken(.bulkError(byteBuffer))
4439

4540
// Parse the moved error
46-
let movedError = token.parseMovedError()
41+
let movedError = ValkeyMovedError(errorMessage)
4742

4843
// Verify the moved error is parsed correctly
4944
#expect(movedError != nil)
@@ -52,54 +47,33 @@ struct ValkeyMovedErrorTests {
5247
#expect(movedError?.port == 6380)
5348
}
5449

55-
@Test("parseMovedError returns nil for non-error tokens")
56-
func testParseNonErrorToken() async throws {
57-
// Test with various non-error token types
58-
let nullToken = RESPToken(.null)
59-
#expect(nullToken.parseMovedError() == nil)
60-
61-
let stringToken = RESPToken(.simpleString("OK"))
62-
#expect(stringToken.parseMovedError() == nil)
63-
64-
let numberToken = RESPToken(.number(42))
65-
#expect(numberToken.parseMovedError() == nil)
66-
67-
let arrayToken = RESPToken(.array([.number(1), .number(2)]))
68-
#expect(arrayToken.parseMovedError() == nil)
69-
}
70-
7150
@Test("parseMovedError returns nil for error tokens without MOVED prefix")
7251
func testParseNonMovedError() async throws {
73-
let errorMessage = "ERR unknown command"
74-
let byteBuffer = ByteBuffer(string: errorMessage)
75-
let token = RESPToken(.simpleError(byteBuffer))
76-
77-
#expect(token.parseMovedError() == nil)
52+
#expect(ValkeyMovedError("ERR unknown command") == nil)
7853
}
7954

8055
@Test("parseMovedError returns nil for invalid MOVED format")
8156
func testParseInvalidMovedFormat() async throws {
8257
// Test with various invalid MOVED formats
8358

8459
// Missing slot number
85-
let missingSlot = RESPToken(.simpleError("MOVED redis.example.com:6379"))
86-
#expect(missingSlot.parseMovedError() == nil)
60+
#expect(ValkeyMovedError("MOVED redis.example.com:6379") == nil)
8761

8862
// Missing port number
89-
let missingPort = RESPToken(.simpleError("MOVED 1234 redis.example.com"))
90-
#expect(missingPort.parseMovedError() == nil)
63+
let missingPort = "MOVED 1234 redis.example.com"
64+
#expect(ValkeyMovedError(missingPort) == nil)
9165

9266
// Invalid slot number
93-
let invalidSlot = RESPToken(.simpleError("MOVED abc redis.example.com:6379"))
94-
#expect(invalidSlot.parseMovedError() == nil)
67+
let invalidSlot = "MOVED abc redis.example.com:6379"
68+
#expect(ValkeyMovedError(invalidSlot) == nil)
9569

9670
// Invalid port number
97-
let invalidPort = RESPToken(.simpleError("MOVED 1234 redis.example.com:port"))
98-
#expect(invalidPort.parseMovedError() == nil)
71+
let invalidPort = "MOVED 1234 redis.example.com:port"
72+
#expect(ValkeyMovedError(invalidPort) == nil)
9973

10074
// Slot number out of range
101-
let outOfRangeSlot = RESPToken(.simpleError("MOVED 999999 redis.example.com:6379"))
102-
#expect(outOfRangeSlot.parseMovedError() == nil)
75+
let outOfRangeSlot = "MOVED 999999 redis.example.com:6379"
76+
#expect(ValkeyMovedError(outOfRangeSlot) == nil)
10377
}
10478

10579
@Test("ValkeyMovedError is Hashable")

0 commit comments

Comments
 (0)