Skip to content

Commit 0a4f34d

Browse files
authored
Cluster command keys should all come from the one hashkey (#209)
Signed-off-by: Adam Fowler <[email protected]>
1 parent 64547be commit 0a4f34d

File tree

3 files changed

+21
-6
lines changed

3 files changed

+21
-6
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,9 @@ public final class ValkeyClusterClient: Sendable {
143143
/// - Other errors if the command execution or parsing fails
144144
@inlinable
145145
public func execute<Command: ValkeyCommand>(_ command: Command) async throws -> Command.Response {
146-
let hashSlots = command.keysAffected.map { HashSlot(key: $0) }
146+
let hashSlot = try self.hashSlot(for: command.keysAffected)
147147
var clientSelector: () async throws -> ValkeyNodeClient = {
148-
try await self.nodeClient(for: hashSlots)
148+
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
149149
}
150150

151151
while !Task.isCancelled {
@@ -178,8 +178,8 @@ public final class ValkeyClusterClient: Sendable {
178178
isolation: isolated (any Actor)? = #isolation,
179179
operation: (ValkeyConnection) async throws -> sending Value
180180
) async throws -> Value {
181-
let hashSlots = keys.map { HashSlot(key: $0) }
182-
let node = try await self.nodeClient(for: hashSlots)
181+
let hashSlot = try self.hashSlot(for: keys)
182+
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
183183
return try await node.withConnection(isolation: isolation, operation: operation)
184184
}
185185

@@ -234,6 +234,20 @@ public final class ValkeyClusterClient: Sendable {
234234

235235
// MARK: - Private methods -
236236

237+
/// Return HashSlot for collection of keys.
238+
///
239+
/// If collection is empty return `nil`
240+
/// If collection of keys use a variety of hash slot then throw an error
241+
@usableFromInline
242+
/* private */ func hashSlot(for keys: some Collection<ValkeyKey>) throws -> HashSlot? {
243+
guard let firstKey = keys.first else { return nil }
244+
let hashSlot = HashSlot(key: firstKey)
245+
for key in keys.dropFirst() {
246+
guard hashSlot == HashSlot(key: key) else { throw ValkeyClusterError.keysInCommandRequireMultipleHashSlots }
247+
}
248+
return hashSlot
249+
}
250+
237251
private func queueAction(_ action: RunAction) {
238252
self.actionStreamContinuation.yield(action)
239253
}
@@ -418,7 +432,7 @@ public final class ValkeyClusterClient: Sendable {
418432
/// - `ValkeyClusterError.clusterIsUnavailable` if no healthy nodes are available
419433
/// - `ValkeyClusterError.clusterIsMissingSlotAssignment` if the slot assignment cannot be determined
420434
@inlinable
421-
func nodeClient(for slots: some (Collection<HashSlot> & Sendable)) async throws -> ValkeyNodeClient {
435+
func nodeClient(for slots: [HashSlot]) async throws -> ValkeyNodeClient {
422436
var retries = 0
423437
while retries < 3 {
424438
defer { retries += 1 }

Sources/Valkey/Cluster/ValkeyClusterError.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ package enum ValkeyClusterError: Error {
1414
case noNodeToTalkTo
1515
case serverDiscoveryFailedNoKnownNode
1616
case keysInCommandRequireMultipleNodes
17+
case keysInCommandRequireMultipleHashSlots
1718
case clusterIsUnavailable
1819
case noConsensusReachedCircuitBreakerOpen
1920
case clusterHasNoNodes

Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ struct ClusterIntegrationTests {
6666
let shard = try #require(
6767
cluster.shards.first { shard in
6868
let hashSlot = HashSlot(key: key)
69-
return shard.slots[0].lowerBound <= hashSlot && shard.slots[0].upperBound >= hashSlot
69+
return shard.slots.reduce(into: false) { $0 = $0 || ($1.lowerBound <= hashSlot && $1.upperBound >= hashSlot) }
7070
}
7171
)
7272
let replica = try #require(shard.nodes.first { $0.role == .replica })

0 commit comments

Comments
 (0)