Skip to content

Commit ebae84b

Browse files
authored
Fix: Nodes in CLUSTER SHARDS are not sorted (#97)
1 parent 5fd3f90 commit ebae84b

File tree

8 files changed

+571
-35
lines changed

8 files changed

+571
-35
lines changed

Sources/Valkey/Cluster/HashSlotShardMap.swift

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,13 +133,27 @@ package struct HashSlotShardMap: Sendable {
133133

134134
var shardID = 0
135135
for shard in shards {
136-
guard let master = shard.master else {
136+
var master: ValkeyNodeID?
137+
var replicas = [ValkeyNodeID]()
138+
replicas.reserveCapacity(shard.nodes.count - 1)
139+
140+
for node in shard.nodes {
141+
switch node.role.base {
142+
case .master:
143+
master = node.nodeID
144+
145+
case .replica:
146+
replicas.append(node.nodeID)
147+
}
148+
}
149+
150+
guard let master else {
137151
continue
138152
}
139153

140154
let nodeIDs = ValkeyShardNodeIDs(
141-
master: master.nodeID,
142-
replicas: shard.replicas.map(\.nodeID)
155+
master: master,
156+
replicas: replicas
143157
)
144158

145159
defer { shardID += 1 }

Sources/Valkey/Cluster/ValkeyClusterError.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@
1515
@usableFromInline
1616
package enum ValkeyClusterError: Error {
1717
case clusterIsMissingSlotAssignment
18+
case clusterIsMissingMovedErrorNode
19+
case shardIsMissingMasterNode
20+
case shardHasMultipleMasterNodes
1821
case noNodeToTalkTo
1922
case serverDiscoveryFailedNoKnownNode
2023
case keysInCommandRequireMultipleNodes
21-
case noConsensusReached
24+
case clusterIsUnavailable
2225
case noConsensusReachedCircuitBreakerOpen
2326
case clusterHasNoNodes
27+
case clusterClientIsShutDown
2428
}
2529

Sources/Valkey/Cluster/ValkeyMovedError.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ struct ValkeyMovedError: Hashable, Sendable {
3232

3333
/// The port number of the node that owns the requested hash slot.
3434
var port: Int
35+
36+
@usableFromInline
37+
var nodeID: ValkeyNodeID {
38+
ValkeyNodeID(endpoint: self.endpoint, port: self.port)
39+
}
3540
}
3641

3742
extension RESPToken {

Sources/Valkey/Cluster/ValkeyTopologyCandidate.swift

Lines changed: 57 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,46 +18,46 @@
1818
/// designed specifically for efficient comparison during cluster updates. It preserves
1919
/// only the essential properties needed to determine if a topology has changed, while
2020
/// maintaining consistent ordering of elements to ensure reliable equality checks.
21-
struct ValkeyTopologyCandidate: Hashable {
21+
package struct ValkeyTopologyCandidate: Hashable {
2222
/// Represents a shard (hash slot range) within a Valkey cluster topology.
2323
///
2424
/// A shard consists of a set of hash slots assigned to a master node and optional replica nodes.
25-
struct Shard: Hashable {
25+
package struct Shard: Hashable {
2626
/// The hash slots assigned to this shard.
27-
var slots: HashSlots
27+
package var slots: HashSlots
2828

2929
/// The master node responsible for this shard.
30-
var master: Node
31-
30+
package var master: Node
31+
3232
/// The replica nodes for this shard, sorted by endpoint, port, and TLS status for consistent equality checking.
33-
var replicas: [Node]
33+
package var replicas: [Node]
3434
}
3535

3636
/// Represents a node (either master or replica) in the Valkey cluster topology.
3737
///
3838
/// Contains only the essential connection properties needed to identify and connect to a node.
39-
struct Node: Hashable {
39+
package struct Node: Hashable {
4040
/// The endpoint (hostname or IP address) of the node.
41-
var endpoint: String
42-
41+
package var endpoint: String
42+
4343
/// The port to connect to (either standard port or TLS port).
44-
var port: Int
45-
44+
package var port: Int
45+
4646
/// Whether TLS should be used for connecting to this node.
47-
var useTLS: Bool
47+
package var useTLS: Bool
4848

4949
/// Creates a simplified node representation from a `ValkeyClusterDescription.Node`.
5050
///
5151
/// - Parameter node: The source node from a cluster description.
52-
init(_ node: ValkeyClusterDescription.Node) {
52+
package init(_ node: ValkeyClusterDescription.Node) {
5353
self.endpoint = node.endpoint
5454
self.port = node.tlsPort ?? node.port ?? 6379
5555
self.useTLS = node.tlsPort != nil
5656
}
5757
}
5858

5959
/// Shards in the cluster topology, sorted by starting hash slot for consistent equality checking.
60-
var shards: [Shard]
60+
package var shards: [Shard]
6161

6262
/// Creates a topology candidate from a cluster description.
6363
///
@@ -67,27 +67,54 @@ struct ValkeyTopologyCandidate: Hashable {
6767
/// - Sorts shards by their starting hash slot for consistent equality checking
6868
///
6969
/// - Parameter description: The cluster description to create a topology candidate from.
70-
init(_ description: ValkeyClusterDescription) {
70+
package init(_ description: ValkeyClusterDescription) throws(ValkeyClusterError) {
7171

72-
self.shards = description.shards.map({ shard in
73-
Shard(
74-
slots: shard.slots,
75-
master: Node(shard.master!),
76-
replicas: shard.replicas.map { Node($0) }.sorted(by: { lhs, rhs in
77-
if lhs.endpoint != rhs.endpoint {
78-
return lhs.endpoint < rhs.endpoint
79-
}
80-
if lhs.port != rhs.port {
81-
return lhs.port < rhs.port
82-
}
83-
if lhs.useTLS != rhs.useTLS {
84-
return !lhs.useTLS
72+
self.shards = try description.shards.map({ shard throws(ValkeyClusterError) in
73+
var master: Node?
74+
var replicas = [Node]()
75+
replicas.reserveCapacity(shard.nodes.count)
76+
77+
for node in shard.nodes {
78+
switch node.role.base {
79+
case .master:
80+
if master != nil {
81+
throw ValkeyClusterError.shardHasMultipleMasterNodes
8582
}
86-
return true
87-
})
83+
master = Node(node)
84+
case .replica:
85+
replicas.append(Node(node))
86+
}
87+
}
88+
89+
let sorted = replicas.sorted(by: { lhs, rhs in
90+
if lhs.endpoint != rhs.endpoint {
91+
return lhs.endpoint < rhs.endpoint
92+
}
93+
if lhs.port != rhs.port {
94+
return lhs.port < rhs.port
95+
}
96+
if lhs.useTLS != rhs.useTLS {
97+
return !lhs.useTLS
98+
}
99+
return true
100+
})
101+
102+
guard let master else {
103+
throw ValkeyClusterError.shardIsMissingMasterNode
104+
}
105+
106+
return Shard(
107+
slots: shard.slots.sorted(by: { $0.startIndex < $1.startIndex }),
108+
master: master,
109+
replicas: sorted
88110
)
89111
})
90112
// Sort shards by starting hash slot
91113
self.shards = self.shards.sorted(by: { (lhs, rhs) in (lhs.slots.first?.startIndex ?? .pastEnd) < (rhs.slots.first?.startIndex ?? .pastEnd) })
92114
}
93115
}
116+
117+
struct ValkeyClusterVoter<ConnectionPool: ValkeyNodeConnectionPool> {
118+
var client: ConnectionPool
119+
var nodeID: ValkeyNodeID
120+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-valkey project
4+
//
5+
// Copyright (c) 2025 the swift-valkey authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See swift-valkey/CONTRIBUTORS.txt for the list of swift-valkey authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
/// ``ValkeyTopologyElection`` manages the consensus process for electing a cluster topology.
16+
///
17+
/// This struct tracks votes from cluster nodes for different topology candidates, keeping count of
18+
/// received votes and determining when a consensus is reached. Once a candidate receives more than half
19+
/// of the possible votes from all nodes in the cluster, it becomes the elected topology configuration.
20+
///
21+
/// The election process handles:
22+
/// - Recording votes from nodes
23+
/// - Tracking vote counts for each topology candidate
24+
/// - Managing revotes (nodes changing their vote)
25+
/// - Determining when a winner has been elected
26+
package struct ValkeyTopologyElection {
27+
/// Represents a candidate in the topology election, tracking votes and thresholds.
28+
///
29+
/// Each candidate corresponds to a specific cluster description and maintains
30+
/// count of the votes it has received and how many votes it needs to win.
31+
private struct Candidate {
32+
/// The cluster configuration this candidate represents.
33+
var description: ValkeyClusterDescription
34+
35+
/// The number of votes needed for this candidate to win the election.
36+
/// Calculated as a simple majority of the total nodes in the cluster.
37+
var needed: Int
38+
39+
/// The number of votes this candidate has received so far.
40+
var received: Int
41+
42+
init(description: ValkeyClusterDescription) {
43+
self.description = description
44+
// Calculate the needed votes as a simple majority of all nodes across all shards
45+
self.needed = description.shards.reduce(0) { $0 + $1.nodes.count } / 2 + 1
46+
self.received = 0
47+
}
48+
49+
/// Adds a vote for this candidate and checks if it has reached the winning threshold.
50+
///
51+
/// - Returns: `true` if this candidate has received enough votes to win, `false` otherwise
52+
mutating func addVote() -> Bool {
53+
self.received += 1
54+
return self.received >= self.needed
55+
}
56+
}
57+
58+
/// Provides metrics about the current state of the election process.
59+
///
60+
/// This structure encapsulates information about a specific topology candidate,
61+
/// including how many votes it has received and how many it needs to win.
62+
package struct VoteMetrics {
63+
/// The total number of topology configurations being considered in this election.
64+
package var candidateCount: Int
65+
66+
/// The specific topology candidate these metrics refer to.
67+
package var candidate: ValkeyTopologyCandidate
68+
69+
/// The number of votes this candidate has received so far.
70+
package var votesReceived: Int
71+
72+
/// The number of votes needed for this candidate to win the election.
73+
/// This is calculated as (total nodes / 2) + 1, representing a simple majority.
74+
package var votesNeeded: Int
75+
}
76+
77+
private var votes = [ValkeyNodeID: ValkeyTopologyCandidate]()
78+
private var results = [ValkeyTopologyCandidate: Candidate]()
79+
80+
/// The currently elected cluster configuration, if any.
81+
/// This is set to the first candidate that reaches the required vote threshold.
82+
package private(set) var winner: ValkeyClusterDescription?
83+
84+
package init() {}
85+
86+
/// Records a vote from a node for a specific cluster description.
87+
///
88+
/// This method handles the core voting logic:
89+
/// 1. If the node has voted before, its previous vote is removed
90+
/// 2. The new vote is recorded
91+
/// 3. If this vote causes a candidate to reach the required threshold, it becomes the winner
92+
///
93+
/// - Parameters:
94+
/// - description: The cluster configuration the node is voting for
95+
/// - voter: The ID of the node casting the vote
96+
///
97+
/// - Returns: Metrics about the current state of the election after recording this vote
98+
///
99+
/// - Throws: ``ValkeyClusterError`` if the provided cluster description cannot be converted to a valid topology candidate
100+
package mutating func voteReceived(
101+
for description: ValkeyClusterDescription,
102+
from voter: ValkeyNodeID
103+
) throws(ValkeyClusterError) -> VoteMetrics {
104+
// 1. check that the voter hasn't voted before.
105+
// - if it has voted before, remove its earlier vote.
106+
107+
let topologyCandidate = try ValkeyTopologyCandidate(description)
108+
109+
if let previousVote = self.votes[voter] {
110+
self.results[previousVote]!.received -= 1
111+
}
112+
113+
self.votes[voter] = topologyCandidate
114+
if self.results[topologyCandidate, default: .init(description: description)].addVote() {
115+
if self.winner == nil {
116+
self.winner = description
117+
}
118+
}
119+
120+
return VoteMetrics(
121+
candidateCount: self.results.count,
122+
candidate: topologyCandidate,
123+
votesReceived: self.results[topologyCandidate]!.received,
124+
votesNeeded: self.results[topologyCandidate]!.needed
125+
)
126+
}
127+
}

Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public struct ValkeyClusterDescription: Hashable, Sendable, RESPTokenDecodable {
6565
case replica
6666
}
6767

68-
private var base: Base
68+
private(set) var base: Base
6969

7070
init(base: Base) {
7171
self.base = base

0 commit comments

Comments
 (0)