Skip to content

Commit b75dccf

Browse files
authored
Merge branch 'main' into client-subscribe
2 parents f61c56b + 8911589 commit b75dccf

24 files changed

+338
-89
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -157,21 +157,38 @@ public final class ValkeyClusterClient: Sendable {
157157
try await self.client(for: hashSlots)
158158
}
159159

160-
while true {
160+
while !Task.isCancelled {
161161
do {
162-
let respToken = try await self.retryingSend(
163-
clientSelector: clientSelector,
164-
command: command,
165-
logger: logger
166-
)
167-
return try Command.Response(fromRESP: respToken)
162+
let client = try await clientSelector()
163+
return try await client.send(command: command)
164+
} catch ValkeyClusterError.noNodeToTalkTo {
165+
// TODO: Rerun node discovery!
168166
} catch let error as ValkeyClientError where error.errorCode == .commandError {
169167
guard let errorMessage = error.message, let movedError = ValkeyMovedError(errorMessage) else {
170168
throw error
171169
}
172170
clientSelector = { try await self.client(for: movedError) }
173171
}
174172
}
173+
throw CancellationError()
174+
}
175+
176+
/// Get connection from cluster and run operation using connection
177+
///
178+
/// - Parameters:
179+
/// - keys: Keys affected by operation. This is used to choose the cluster node
180+
/// - isolation: Actor isolation
181+
/// - operation: Closure handling Valkey connection
182+
/// - Returns: Value returned by closure
183+
@inlinable
184+
public func withConnection<Value>(
185+
forKeys keys: some Collection<ValkeyKey>,
186+
isolation: isolated (any Actor)? = #isolation,
187+
operation: (ValkeyConnection) async throws -> sending Value
188+
) async throws -> Value {
189+
let hashSlots = keys.map { HashSlot(key: $0) }
190+
let client = try await self.client(for: hashSlots)
191+
return try await client.withConnection(isolation: isolation, operation: operation)
175192
}
176193

177194
/// Starts running the cluster client.
@@ -455,39 +472,6 @@ public final class ValkeyClusterClient: Sendable {
455472
throw ValkeyClusterError.clusterIsMissingSlotAssignment
456473
}
457474

458-
/// Sends a command to the Valkey cluster with automatic retries and error handling.
459-
///
460-
/// This internal method handles:
461-
/// - Client selection strategy using the provided selector function
462-
/// - Automatic retries for certain types of failures
463-
/// - Task cancellation
464-
///
465-
/// - Parameters:
466-
/// - clientSelector: A function that resolves to the appropriate client for this command.
467-
/// - command: The command to send to the cluster.
468-
/// - logger: The logger to use for diagnostic information.
469-
/// - Returns: The raw RESP token response.
470-
/// - Throws:
471-
/// - `CancellationError` if the task is cancelled
472-
/// - Other errors if command execution fails after retries
473-
@inlinable
474-
func retryingSend(
475-
clientSelector: () async throws -> ValkeyClient,
476-
command: some ValkeyCommand,
477-
logger: Logger
478-
) async throws -> RESPToken {
479-
while !Task.isCancelled {
480-
do {
481-
let client = try await clientSelector()
482-
return try await client._send(command)
483-
} catch ValkeyClusterError.noNodeToTalkTo {
484-
// TODO: Rerun node discovery!
485-
}
486-
}
487-
488-
throw CancellationError()
489-
}
490-
491475
/// Generates a new unique request ID for tracking internal operations.
492476
///
493477
/// - Returns: A unique integer ID for tracking requests and waiters.

Sources/Valkey/Commands/BitmapCommands.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@ extension ValkeyConnectionProtocol {
492492
/// * [Array]: The result of the subcommand at the same position
493493
/// * [Array]: In case OVERFLOW FAIL was given and overflows or underflows detected
494494
@inlinable
495+
@discardableResult
495496
public func bitfield(_ key: ValkeyKey, operations: [BITFIELD.Operation] = []) async throws -> RESPToken.Array {
496497
try await send(command: BITFIELD(key, operations: operations))
497498
}
@@ -514,6 +515,7 @@ extension ValkeyConnectionProtocol {
514515
/// - Complexity: O(N)
515516
/// - Response: [Integer]: The size of the string stored in the destination key, that is equal to the size of the longest input string.
516517
@inlinable
518+
@discardableResult
517519
public func bitop(operation: BITOP.Operation, destkey: ValkeyKey, keys: [ValkeyKey]) async throws -> Int {
518520
try await send(command: BITOP(operation: operation, destkey: destkey, keys: keys))
519521
}
@@ -551,6 +553,7 @@ extension ValkeyConnectionProtocol {
551553
/// - Complexity: O(1)
552554
/// - Response: The original bit value stored at offset.
553555
@inlinable
556+
@discardableResult
554557
public func setbit(_ key: ValkeyKey, offset: Int, value: Int) async throws -> Int {
555558
try await send(command: SETBIT(key, offset: offset, value: value))
556559
}

Sources/Valkey/Commands/ClusterCommands.swift

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,7 @@ extension ValkeyConnectionProtocol {
687687
/// * [String]: If the epoch was incremented.
688688
/// * [String]: If the node already has the greatest config epoch in the cluster.
689689
@inlinable
690+
@discardableResult
690691
public func clusterBumpepoch() async throws -> ByteBuffer {
691692
try await send(command: CLUSTER.BUMPEPOCH())
692693
}
@@ -698,6 +699,7 @@ extension ValkeyConnectionProtocol {
698699
/// - Complexity: O(N) where N is the number of failure reports
699700
/// - Response: [Integer]: The number of active failure reports for the node.
700701
@inlinable
702+
@discardableResult
701703
public func clusterCountFailureReports<NodeId: RESPStringRenderable>(nodeId: NodeId) async throws -> Int {
702704
try await send(command: CLUSTER.COUNTFAILUREREPORTS(nodeId: nodeId))
703705
}
@@ -709,6 +711,7 @@ extension ValkeyConnectionProtocol {
709711
/// - Complexity: O(1)
710712
/// - Response: [Integer]: The number of keys in the specified hash slot.
711713
@inlinable
714+
@discardableResult
712715
public func clusterCountkeysinslot(slot: Int) async throws -> Int {
713716
try await send(command: CLUSTER.COUNTKEYSINSLOT(slot: slot))
714717
}
@@ -770,6 +773,7 @@ extension ValkeyConnectionProtocol {
770773
/// - Complexity: O(N) where N is the number of requested keys
771774
/// - Response: [Array]: An array with up to count elements.
772775
@inlinable
776+
@discardableResult
773777
public func clusterGetkeysinslot(slot: Int, count: Int) async throws -> RESPToken.Array {
774778
try await send(command: CLUSTER.GETKEYSINSLOT(slot: slot, count: count))
775779
}
@@ -781,6 +785,7 @@ extension ValkeyConnectionProtocol {
781785
/// - Complexity: O(1)
782786
/// - Response: [Array]: Helpful text about subcommands.
783787
@inlinable
788+
@discardableResult
784789
public func clusterHelp() async throws -> RESPToken.Array {
785790
try await send(command: CLUSTER.HELP())
786791
}
@@ -792,6 +797,7 @@ extension ValkeyConnectionProtocol {
792797
/// - Complexity: O(1)
793798
/// - Response: [String]: A map between named fields and values in the form of <field>:<value> lines separated by newlines composed by the two bytes CRLF
794799
@inlinable
800+
@discardableResult
795801
public func clusterInfo() async throws -> ByteBuffer {
796802
try await send(command: CLUSTER.INFO())
797803
}
@@ -803,6 +809,7 @@ extension ValkeyConnectionProtocol {
803809
/// - Complexity: O(N) where N is the number of bytes in the key
804810
/// - Response: [Integer]: The hash slot number for the specified key
805811
@inlinable
812+
@discardableResult
806813
public func clusterKeyslot<Key: RESPStringRenderable>(_ key: Key) async throws -> Int {
807814
try await send(command: CLUSTER.KEYSLOT(key))
808815
}
@@ -814,6 +821,7 @@ extension ValkeyConnectionProtocol {
814821
/// - Complexity: O(N) where N is the total number of Cluster nodes
815822
/// - Response: [Array]: An array of cluster links and their attributes.
816823
@inlinable
824+
@discardableResult
817825
public func clusterLinks() async throws -> RESPToken.Array {
818826
try await send(command: CLUSTER.LINKS())
819827
}
@@ -837,6 +845,7 @@ extension ValkeyConnectionProtocol {
837845
/// - Complexity: O(1)
838846
/// - Response: [String]: The node id.
839847
@inlinable
848+
@discardableResult
840849
public func clusterMyid() async throws -> ByteBuffer {
841850
try await send(command: CLUSTER.MYID())
842851
}
@@ -848,6 +857,7 @@ extension ValkeyConnectionProtocol {
848857
/// - Complexity: O(1)
849858
/// - Response: [String]: The node's shard id.
850859
@inlinable
860+
@discardableResult
851861
public func clusterMyshardid() async throws -> ByteBuffer {
852862
try await send(command: CLUSTER.MYSHARDID())
853863
}
@@ -859,6 +869,7 @@ extension ValkeyConnectionProtocol {
859869
/// - Complexity: O(N) where N is the total number of Cluster nodes
860870
/// - Response: [String]: The serialized cluster configuration.
861871
@inlinable
872+
@discardableResult
862873
public func clusterNodes() async throws -> ByteBuffer {
863874
try await send(command: CLUSTER.NODES())
864875
}
@@ -870,6 +881,7 @@ extension ValkeyConnectionProtocol {
870881
/// - Complexity: O(N) where N is the number of replicas.
871882
/// - Response: [Array]: A list of replica nodes replicating from the specified primary node provided in the same format used by CLUSTER NODES.
872883
@inlinable
884+
@discardableResult
873885
public func clusterReplicas<NodeId: RESPStringRenderable>(nodeId: NodeId) async throws -> RESPToken.Array {
874886
try await send(command: CLUSTER.REPLICAS(nodeId: nodeId))
875887
}
@@ -933,6 +945,7 @@ extension ValkeyConnectionProtocol {
933945
/// - Complexity: O(N) where N is the total number of cluster nodes
934946
/// - Response: [Array]: A nested list of a map of hash ranges and shard nodes describing individual shards.
935947
@inlinable
948+
@discardableResult
936949
public func clusterShards() async throws -> CLUSTER.SHARDS.Response {
937950
try await send(command: CLUSTER.SHARDS())
938951
}
@@ -945,6 +958,7 @@ extension ValkeyConnectionProtocol {
945958
/// - Complexity: O(N) where N is the number of replicas.
946959
/// - Response: [Array]: A list of replica nodes replicating from the specified primary node provided in the same format used by CLUSTER NODES.
947960
@inlinable
961+
@discardableResult
948962
public func clusterSlaves<NodeId: RESPStringRenderable>(nodeId: NodeId) async throws -> RESPToken.Array {
949963
try await send(command: CLUSTER.SLAVES(nodeId: nodeId))
950964
}
@@ -956,6 +970,7 @@ extension ValkeyConnectionProtocol {
956970
/// - Complexity: O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.
957971
/// - Response: [Array]: Array of nested arrays, where the inner array element represents a slot and its respective usage statistics.
958972
@inlinable
973+
@discardableResult
959974
public func clusterSlotStats(filter: CLUSTER.SLOTSTATS.Filter) async throws -> RESPToken.Array {
960975
try await send(command: CLUSTER.SLOTSTATS(filter: filter))
961976
}
@@ -970,6 +985,7 @@ extension ValkeyConnectionProtocol {
970985
/// - Complexity: O(N) where N is the total number of Cluster nodes
971986
/// - Response: [Array]: Nested list of slot ranges with networking information.
972987
@inlinable
988+
@discardableResult
973989
public func clusterSlots() async throws -> RESPToken.Array {
974990
try await send(command: CLUSTER.SLOTS())
975991
}

Sources/Valkey/Commands/ConnectionCommands.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,7 @@ extension ValkeyConnectionProtocol {
808808
/// - Available: 2.4.0
809809
/// - Complexity: Depends on subcommand.
810810
@inlinable
811+
@discardableResult
811812
public func client() async throws -> CLIENT.Response {
812813
try await send(command: CLIENT())
813814
}
@@ -841,6 +842,7 @@ extension ValkeyConnectionProtocol {
841842
/// * [String]: The connection name of the current connection
842843
/// * [Null]: Connection name was not set
843844
@inlinable
845+
@discardableResult
844846
public func clientGetname() async throws -> ByteBuffer? {
845847
try await send(command: CLIENT.GETNAME())
846848
}
@@ -855,6 +857,7 @@ extension ValkeyConnectionProtocol {
855857
/// * -1: Client tracking is not enabled.
856858
/// * [Integer]: ID of the client we are redirecting the notifications to.
857859
@inlinable
860+
@discardableResult
858861
public func clientGetredir() async throws -> Int {
859862
try await send(command: CLIENT.GETREDIR())
860863
}
@@ -866,6 +869,7 @@ extension ValkeyConnectionProtocol {
866869
/// - Complexity: O(1)
867870
/// - Response: [Array]: Helpful text about subcommands.
868871
@inlinable
872+
@discardableResult
869873
public func clientHelp() async throws -> RESPToken.Array {
870874
try await send(command: CLIENT.HELP())
871875
}
@@ -877,6 +881,7 @@ extension ValkeyConnectionProtocol {
877881
/// - Complexity: O(1)
878882
/// - Response: [Integer]: The id of the client
879883
@inlinable
884+
@discardableResult
880885
public func clientId() async throws -> Int {
881886
try await send(command: CLIENT.ID())
882887
}
@@ -898,6 +903,7 @@ extension ValkeyConnectionProtocol {
898903
/// - Complexity: O(1)
899904
/// - Response: [String]: A unique string, as described at the CLIENT LIST page, for the current client.
900905
@inlinable
906+
@discardableResult
901907
public func clientInfo() async throws -> ByteBuffer {
902908
try await send(command: CLIENT.INFO())
903909
}
@@ -920,6 +926,7 @@ extension ValkeyConnectionProtocol {
920926
/// * "OK": When called in 3 argument format.
921927
/// * [Integer]: When called in filter/value format, the number of clients killed.
922928
@inlinable
929+
@discardableResult
923930
public func clientKill(filter: CLIENT.KILL.Filter) async throws -> Int? {
924931
try await send(command: CLIENT.KILL(filter: filter))
925932
}
@@ -940,6 +947,7 @@ extension ValkeyConnectionProtocol {
940947
/// - Complexity: O(N) where N is the number of client connections
941948
/// - Response: [String]: Information and statistics about client connections
942949
@inlinable
950+
@discardableResult
943951
public func clientList(
944952
clientType: CLIENT.LIST.ClientType? = nil,
945953
clientIds: [Int] = [],
@@ -1052,6 +1060,7 @@ extension ValkeyConnectionProtocol {
10521060
/// - Available: 6.2.0
10531061
/// - Complexity: O(1)
10541062
@inlinable
1063+
@discardableResult
10551064
public func clientTrackinginfo() async throws -> RESPToken.Map {
10561065
try await send(command: CLIENT.TRACKINGINFO())
10571066
}
@@ -1065,6 +1074,7 @@ extension ValkeyConnectionProtocol {
10651074
/// * 0: If the client was unblocked successfully.
10661075
/// * 1: If the client wasn't unblocked.
10671076
@inlinable
1077+
@discardableResult
10681078
public func clientUnblock(clientId: Int, unblockType: CLIENT.UNBLOCK.UnblockType? = nil) async throws -> Int {
10691079
try await send(command: CLIENT.UNBLOCK(clientId: clientId, unblockType: unblockType))
10701080
}
@@ -1086,6 +1096,7 @@ extension ValkeyConnectionProtocol {
10861096
/// - Complexity: O(1)
10871097
/// - Response: [String]: The given string
10881098
@inlinable
1099+
@discardableResult
10891100
public func echo<Message: RESPStringRenderable>(message: Message) async throws -> ByteBuffer {
10901101
try await send(command: ECHO(message: message))
10911102
}
@@ -1099,6 +1110,7 @@ extension ValkeyConnectionProtocol {
10991110
/// * 6.2.0: `protover` made optional; when called without arguments the command reports the current connection's context.
11001111
/// - Complexity: O(1)
11011112
@inlinable
1113+
@discardableResult
11021114
public func hello(arguments: HELLO.Arguments? = nil) async throws -> RESPToken.Map {
11031115
try await send(command: HELLO(arguments: arguments))
11041116
}
@@ -1112,6 +1124,7 @@ extension ValkeyConnectionProtocol {
11121124
/// * "PONG": Default reply.
11131125
/// * [String]: Relay of given `message`.
11141126
@inlinable
1127+
@discardableResult
11151128
public func ping(message: String? = nil) async throws -> PING.Response {
11161129
try await send(command: PING(message: message))
11171130
}
@@ -1133,6 +1146,7 @@ extension ValkeyConnectionProtocol {
11331146
/// - Available: 6.2.0
11341147
/// - Complexity: O(1)
11351148
@inlinable
1149+
@discardableResult
11361150
public func reset() async throws -> String {
11371151
try await send(command: RESET())
11381152
}

0 commit comments

Comments
 (0)