Skip to content

Commit 4380a49

Browse files
authored
Update internal slot to shard map based on MOVED error (#100)
1 parent f4ef3d8 commit 4380a49

File tree

4 files changed

+257
-8
lines changed

4 files changed

+257
-8
lines changed

Notice.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,13 @@ This product contains a version of the ConnectionPoolModule from postgres-nio
7878
* https://github.com/vapor/postgres-nio/blob/main/LICENSE
7979
* HOMEPAGE:
8080
* https://github.com/vapor/postgres-nio
81+
82+
---
83+
84+
This product was influenced by valkey-glide.
85+
- It adapted valkey-glide's hashslot update logic after a MOVED error.
86+
87+
* LICENSE (Apache License 2.0)
88+
* https://github.com/valkey-io/valkey-glide/blob/main/LICENSE
89+
* HOMEPAGE:
90+
* https://github.com/valkey-io/valkey-glide

Sources/Valkey/Cluster/HashSlotShardMap.swift

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ extension ValkeyShardNodeIDs: ExpressibleByArrayLiteral {
5151

5252
/// This object allows us to efficiently look up the Valkey shard given a hash slot.
5353
///
54-
/// The `HashSlotShardMap` maintains an internal array where each element corresponds to one hash slot (0-16383).
54+
/// The ``HashSlotShardMap`` maintains an internal array where each element corresponds to one hash slot (0-16383).
5555
/// This makes looking up the shard as efficient as a simple array access operation.
5656
///
5757
/// Hash slots are assigned to shards in a Valkey cluster, and each key is mapped to a specific slot
@@ -167,6 +167,71 @@ package struct HashSlotShardMap: Sendable {
167167
}
168168
}
169169

170+
@usableFromInline
171+
package enum UpdateSlotsResult: Equatable {
172+
case updatedSlotToExistingNode
173+
case updatedSlotToUnknownNode
174+
}
175+
176+
/// Handles MOVED errors by updating the client's slot and node mappings based on the new primary's role:
177+
///
178+
/// 1. **No Change**: If the new primary is already the current slot owner, no updates are needed.
179+
/// 2. **Failover**: If the new primary is a replica within the same shard (indicating a failover),
180+
/// the slot ownership is updated by promoting the replica to the primary in the existing shard addresses.
181+
/// 3. **Slot Migration**: If the new primary is an existing primary in another shard, this indicates a slot migration,
182+
/// and the slot mapping is updated to point to the new shard addresses.
183+
/// 4. **Replica Moved to a Different Shard**: If the new primary is a replica in a different shard, it can be due to:
184+
/// - The replica became the primary of its shard after a failover, with new slots migrated to it.
185+
/// - The replica has moved to a different shard as the primary.
186+
/// Since further information is unknown, the replica is removed from its original shard and added as the primary of a new shard.
187+
/// 5. **New Node**: If the new primary is unknown, it is added as a new node in a new shard, possibly indicating scale-out.
188+
///
189+
/// This logic was first implemented in `valkey-glide` (see `Notice.txt`) and adopted for Swift here.
190+
@usableFromInline
191+
package mutating func updateSlots(with movedError: ValkeyMovedError) -> UpdateSlotsResult {
192+
if let shardIndex = self.slotToShardID[Int(movedError.slot.rawValue)].value {
193+
// if the slot had a shard assignment before
194+
var shard = self.shardIDToShard[shardIndex]
195+
196+
// 1. No change
197+
if shard.master == movedError.nodeID {
198+
return .updatedSlotToExistingNode
199+
}
200+
201+
// 2. Failover
202+
if shard.replicas.contains(movedError.nodeID) {
203+
// lets promote the replica to be the primary and remove the old primary for now
204+
shard.master = movedError.nodeID
205+
shard.replicas.removeAll { $0 == movedError.nodeID }
206+
self.shardIDToShard[shardIndex] = shard
207+
return .updatedSlotToExistingNode
208+
}
209+
}
210+
211+
// 3. Slot migration to an existing primary
212+
if let newShardIndex = self.shardIDToShard.firstIndex(where: { $0.master == movedError.nodeID }) {
213+
self.slotToShardID[Int(movedError.slot.rawValue)] = .init(newShardIndex)
214+
return .updatedSlotToExistingNode
215+
}
216+
217+
// 4. Replica moved to a different shard
218+
if let ogShardIndexOfNewPrimary = self.shardIDToShard.firstIndex(where: { $0.replicas.contains(movedError.nodeID) }) {
219+
// remove replica from its og shard
220+
self.shardIDToShard[ogShardIndexOfNewPrimary].replicas.removeAll(where: { $0 == movedError.nodeID })
221+
// create a new shard with the replica
222+
let newShardIndex = self.shardIDToShard.endIndex
223+
self.shardIDToShard.append(.init(master: movedError.nodeID))
224+
self.slotToShardID[Int(movedError.slot.rawValue)] = .init(newShardIndex)
225+
return .updatedSlotToExistingNode
226+
}
227+
228+
// 5. totally new node
229+
let newShardIndex = self.shardIDToShard.endIndex
230+
self.shardIDToShard.append(.init(master: movedError.nodeID))
231+
self.slotToShardID[Int(movedError.slot.rawValue)] = .init(newShardIndex)
232+
return .updatedSlotToUnknownNode
233+
}
234+
170235
/// An internal type representing an optional shard ID with efficient storage.
171236
///
172237
/// This type uses a special sentinel value to represent a missing shard ID

Sources/Valkey/Cluster/ValkeyMovedError.swift

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,24 @@ import NIOCore
2323
/// This error provides the necessary information for clients to redirect their
2424
/// request to the correct node in the cluster.
2525
@usableFromInline
26-
struct ValkeyMovedError: Hashable, Sendable {
26+
package struct ValkeyMovedError: Hashable, Sendable {
2727
/// The hash slot number that triggered the redirection.
28-
var slot: HashSlot
29-
28+
package var slot: HashSlot
29+
3030
/// The hostname or IP address of the node that owns the requested hash slot.
31-
var endpoint: String
32-
31+
package var endpoint: String
32+
3333
/// The port number of the node that owns the requested hash slot.
34-
var port: Int
34+
package var port: Int
35+
36+
package init(slot: HashSlot, endpoint: String, port: Int) {
37+
self.slot = slot
38+
self.endpoint = endpoint
39+
self.port = port
40+
}
3541

3642
@usableFromInline
37-
var nodeID: ValkeyNodeID {
43+
package var nodeID: ValkeyNodeID {
3844
ValkeyNodeID(endpoint: self.endpoint, port: self.port)
3945
}
4046
}

Tests/ValkeyTests/Cluster/HashSlotShardMapTests.swift

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,4 +667,172 @@ struct HashSlotShardMapTests {
667667
#expect(shardNodes.replicas.contains(expectedReplica1))
668668
#expect(shardNodes.replicas.contains(expectedReplica2))
669669
}
670+
671+
func makeExampleCusterWithNShardsAndMReplicasPerShard(shards: Int, replicas: Int) -> ValkeyClusterDescription {
672+
let defaultRangeSize = Int(HashSlot.max.rawValue + 1) / shards
673+
var range: ClosedRange<HashSlot> = 0...0
674+
675+
var result = [ValkeyClusterDescription.Shard]()
676+
var nodeIndex = 1
677+
678+
for i in 0..<shards {
679+
if i == 0 {
680+
if shards == 1 {
681+
range = HashSlot.min...HashSlot.max
682+
} else {
683+
range = HashSlot.min...(range.upperBound.advanced(by: defaultRangeSize - 1))
684+
}
685+
} else if i == shards - 1 {
686+
range = (range.upperBound.advanced(by: 1))...HashSlot.max
687+
} else {
688+
range = (range.upperBound.advanced(by: 1))...(range.upperBound.advanced(by: defaultRangeSize - 1))
689+
}
690+
691+
var shard = ValkeyClusterDescription.Shard(slots: [range], nodes: [])
692+
for _ in 0..<(replicas + 1) {
693+
defer { nodeIndex += 1 }
694+
shard.nodes.append(
695+
.init(
696+
id: "node-\(nodeIndex)",
697+
port: nil,
698+
tlsPort: 6379,
699+
ip: "192.168.64.\(nodeIndex)",
700+
hostname: "node-\(nodeIndex).valkey.io",
701+
endpoint: "node-\(nodeIndex).valkey.io",
702+
role: .replica,
703+
replicationOffset: 14,
704+
health: .online
705+
)
706+
)
707+
}
708+
let primaryIndex = shard.nodes.indices.randomElement()!
709+
shard.nodes[primaryIndex].role = .master
710+
711+
result.append(shard)
712+
}
713+
714+
return ValkeyClusterDescription(result)
715+
}
716+
717+
@Test("Case 1: MovedError specifies the already exisiting shard primary node")
718+
func movedErrorSpecifiesTheAlreadyExisitingShardPrimaryNode() throws {
719+
let clusterDescription = self.makeExampleCusterWithNShardsAndMReplicasPerShard(shards: 3, replicas: 1)
720+
721+
var map = HashSlotShardMap()
722+
map.updateCluster(clusterDescription.shards)
723+
724+
let ogShard = try map.nodeID(for: CollectionOfOne(2))
725+
let update = map.updateSlots(with: ValkeyMovedError(slot: 2, endpoint: ogShard.master.endpoint, port: ogShard.master.port))
726+
#expect(update == .updatedSlotToExistingNode)
727+
let updatedShard = try map.nodeID(for: CollectionOfOne(2))
728+
#expect(updatedShard == ogShard)
729+
}
730+
731+
@Test("Case 2: MovedError specifies a previous shard replica node")
732+
func movedErrorSpecifiesAPreviousShardReplicaNode() throws {
733+
let clusterDescription = self.makeExampleCusterWithNShardsAndMReplicasPerShard(shards: 3, replicas: 3)
734+
735+
var map = HashSlotShardMap()
736+
map.updateCluster(clusterDescription.shards)
737+
738+
let ogShard = try map.nodeID(for: CollectionOfOne(2))
739+
let luckyReplica = ogShard.replicas.randomElement()!
740+
741+
let update = map.updateSlots(with: ValkeyMovedError(slot: 2, endpoint: luckyReplica.endpoint, port: luckyReplica.port))
742+
#expect(update == .updatedSlotToExistingNode)
743+
let updatedShard = try map.nodeID(for: CollectionOfOne(2))
744+
#expect(updatedShard.master == luckyReplica)
745+
#expect(updatedShard != ogShard)
746+
747+
// test neighboring hashes have seen an update as well
748+
let updatedShard1 = try map.nodeID(for: CollectionOfOne(1))
749+
let updatedShard3 = try map.nodeID(for: CollectionOfOne(3))
750+
751+
#expect(updatedShard == updatedShard1)
752+
#expect(updatedShard == updatedShard3)
753+
}
754+
755+
@Test("Case 3: MovedError specifies another shards primary node")
756+
func movedErrorSpecifiesOtherShardPrimaryNode() throws {
757+
let clusterDescription = self.makeExampleCusterWithNShardsAndMReplicasPerShard(shards: 3, replicas: 3)
758+
759+
var map = HashSlotShardMap()
760+
map.updateCluster(clusterDescription.shards)
761+
762+
let ogShard = try map.nodeID(for: CollectionOfOne(2))
763+
let otherShard = try map.nodeID(for: CollectionOfOne(.max))
764+
let newPrimary = otherShard.master
765+
766+
let update = map.updateSlots(with: ValkeyMovedError(slot: 2, endpoint: newPrimary.endpoint, port: newPrimary.port))
767+
#expect(update == .updatedSlotToExistingNode)
768+
let updatedShard = try map.nodeID(for: CollectionOfOne(2))
769+
#expect(updatedShard == otherShard)
770+
771+
// test neighboring hashes have not been updated
772+
let updatedShard1 = try map.nodeID(for: CollectionOfOne(1))
773+
let updatedShard3 = try map.nodeID(for: CollectionOfOne(3))
774+
775+
#expect(ogShard == updatedShard1)
776+
#expect(ogShard == updatedShard3)
777+
}
778+
779+
@Test("Case 4: MovedError specifies another shards replica node")
780+
func movedErrorSpecifiesOtherShardReplicaNode() throws {
781+
let clusterDescription = self.makeExampleCusterWithNShardsAndMReplicasPerShard(shards: 3, replicas: 3)
782+
783+
var map = HashSlotShardMap()
784+
map.updateCluster(clusterDescription.shards)
785+
786+
let ogShard = try map.nodeID(for: CollectionOfOne(2))
787+
let otherShard = try map.nodeID(for: CollectionOfOne(.max))
788+
let newPrimary = otherShard.replicas.randomElement()!
789+
790+
let update = map.updateSlots(with: ValkeyMovedError(slot: 2, endpoint: newPrimary.endpoint, port: newPrimary.port))
791+
#expect(update == .updatedSlotToExistingNode)
792+
let updatedShard = try map.nodeID(for: CollectionOfOne(2))
793+
#expect(updatedShard.master == newPrimary)
794+
#expect(updatedShard.replicas.isEmpty)
795+
#expect(updatedShard != ogShard)
796+
797+
// test neighboring hashes have not been updated
798+
let updatedShard1 = try map.nodeID(for: CollectionOfOne(1))
799+
let updatedShard3 = try map.nodeID(for: CollectionOfOne(3))
800+
801+
#expect(ogShard == updatedShard1)
802+
#expect(ogShard == updatedShard3)
803+
804+
// test other shard has been updated and new primary replica has been removed there
805+
let otherShardUpdated = try map.nodeID(for: CollectionOfOne(.max))
806+
#expect(!otherShardUpdated.replicas.contains(newPrimary))
807+
}
808+
809+
@Test("Case 5: MovedError specifies previously unknown node")
810+
func movedErrorSpecifiesPreviouslyUnknownNode() throws {
811+
let clusterDescription = self.makeExampleCusterWithNShardsAndMReplicasPerShard(shards: 3, replicas: 3)
812+
813+
var map = HashSlotShardMap()
814+
map.updateCluster(clusterDescription.shards)
815+
816+
let ogShard = try map.nodeID(for: CollectionOfOne(2))
817+
let otherShard = try map.nodeID(for: CollectionOfOne(.max))
818+
let newPrimary = ValkeyNodeID(endpoint: "new.valkey.io", port: 6379)
819+
820+
let update = map.updateSlots(with: ValkeyMovedError(slot: 2, endpoint: newPrimary.endpoint, port: newPrimary.port))
821+
#expect(update == .updatedSlotToUnknownNode)
822+
let updatedShard = try map.nodeID(for: CollectionOfOne(2))
823+
#expect(updatedShard.master == newPrimary)
824+
#expect(updatedShard.replicas.isEmpty)
825+
#expect(updatedShard != ogShard)
826+
827+
// test neighboring hashes have not been updated
828+
let updatedShard1 = try map.nodeID(for: CollectionOfOne(1))
829+
let updatedShard3 = try map.nodeID(for: CollectionOfOne(3))
830+
831+
#expect(ogShard == updatedShard1)
832+
#expect(ogShard == updatedShard3)
833+
834+
// test other shard has been updated and new primary replica has been removed there
835+
let otherShardUpdated = try map.nodeID(for: CollectionOfOne(.max))
836+
#expect(!otherShardUpdated.replicas.contains(newPrimary))
837+
}
670838
}

0 commit comments

Comments
 (0)