Skip to content

Commit 8e99b94

Browse files
authored
Add existential execute to ValkeyClientProtocol (#235)
* Add existential execute to ValkeyClientProtocol Signed-off-by: Adam Fowler <[email protected]> * remove `@inlinable` from protocol Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]>
1 parent 4df9a1d commit 8e99b94

File tree

4 files changed

+70
-33
lines changed

4 files changed

+70
-33
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public final class ValkeyClusterClient: Sendable {
207207
@inlinable
208208
public func execute<each Command: ValkeyCommand>(
209209
_ commands: repeat each Command
210-
) async throws -> sending (repeat Result<(each Command).Response, Error>) {
210+
) async -> sending (repeat Result<(each Command).Response, Error>) {
211211
func convert<Response: RESPTokenDecodable>(_ result: Result<RESPToken, Error>, to: Response.Type) -> Result<Response, Error> {
212212
result.flatMap {
213213
do {
@@ -217,7 +217,7 @@ public final class ValkeyClusterClient: Sendable {
217217
}
218218
}
219219
}
220-
let results = try await self.execute([any ValkeyCommand](commands: repeat each commands))
220+
let results = await self.execute([any ValkeyCommand](commands: repeat each commands))
221221
var index = AutoIncrementingInteger()
222222
return (repeat convert(results[index.next()], to: (each Command).Response.self))
223223
}
@@ -259,40 +259,44 @@ public final class ValkeyClusterClient: Sendable {
259259
@inlinable
260260
public func execute(
261261
_ commands: [any ValkeyCommand]
262-
) async throws -> sending [Result<RESPToken, Error>] {
262+
) async -> sending [Result<RESPToken, Error>] {
263263
guard commands.count > 0 else { return [] }
264264
// get a list of nodes and the commands that should be run on them
265-
let nodes = try await self.splitCommandsAcrossNodes(commands: commands)
266-
// if this list has one element, then just run the pipeline on that single node
267-
if nodes.count == 1 {
268-
do {
269-
return try await self.execute(node: nodes[nodes.startIndex].node, commands: commands)
270-
} catch {
271-
return .init(repeating: .failure(error), count: commands.count)
265+
do {
266+
let nodes = try await self.splitCommandsAcrossNodes(commands: commands)
267+
// if this list has one element, then just run the pipeline on that single node
268+
if nodes.count == 1 {
269+
do {
270+
return try await self.execute(node: nodes[nodes.startIndex].node, commands: commands)
271+
} catch {
272+
return .init(repeating: .failure(error), count: commands.count)
273+
}
272274
}
273-
}
274-
return await withTaskGroup(of: NodePipelineResult.self) { group in
275-
// run generated pipelines concurrently
276-
for node in nodes {
277-
let indices = node.commandIndices
278-
group.addTask {
279-
do {
280-
let results = try await self.execute(node: node.node, commands: IndexedSubCollection(commands, indices: indices))
281-
return .init(indices: indices, results: results)
282-
} catch {
283-
return NodePipelineResult(indices: indices, results: .init(repeating: .failure(error), count: indices.count))
275+
return await withTaskGroup(of: NodePipelineResult.self) { group in
276+
// run generated pipelines concurrently
277+
for node in nodes {
278+
let indices = node.commandIndices
279+
group.addTask {
280+
do {
281+
let results = try await self.execute(node: node.node, commands: IndexedSubCollection(commands, indices: indices))
282+
return .init(indices: indices, results: results)
283+
} catch {
284+
return NodePipelineResult(indices: indices, results: .init(repeating: .failure(error), count: indices.count))
285+
}
284286
}
285287
}
286-
}
287-
var results = [Result<RESPToken, Error>](repeating: .failure(ValkeyClusterError.pipelinedResultNotReturned), count: commands.count)
288-
// get results for each node
289-
while let taskResult = await group.next() {
290-
precondition(taskResult.indices.count == taskResult.results.count)
291-
for index in 0..<taskResult.indices.count {
292-
results[taskResult.indices[index]] = taskResult.results[index]
288+
var results = [Result<RESPToken, Error>](repeating: .failure(ValkeyClusterError.pipelinedResultNotReturned), count: commands.count)
289+
// get results for each node
290+
while let taskResult = await group.next() {
291+
precondition(taskResult.indices.count == taskResult.results.count)
292+
for index in 0..<taskResult.indices.count {
293+
results[taskResult.indices[index]] = taskResult.results[index]
294+
}
293295
}
296+
return results
294297
}
295-
return results
298+
} catch {
299+
return .init(repeating: .failure(error), count: commands.count)
296300
}
297301
}
298302

Sources/Valkey/ValkeyClientProtocol.swift

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,23 @@
77
//
88
/// A type that provides the ability to send a Valkey command and get a response.
99
@available(valkeySwift 1.0, *)
10-
public protocol ValkeyClientProtocol {
10+
public protocol ValkeyClientProtocol: Sendable {
1111
/// Send RESP command to Valkey connection
1212
/// - Parameter command: ValkeyCommand structure
1313
/// - Returns: The command response as defined in the ValkeyCommand
1414
func execute<Command: ValkeyCommand>(_ command: Command) async throws -> Command.Response
15+
16+
/// Pipeline a series of commands to Valkey connection
17+
///
18+
/// Once all the responses for the commands have been received the function returns
19+
/// an array of RESPToken Results, one for each command.
20+
///
21+
/// This is an alternative version of the pipelining function ``ValkeyClient/execute(_:)->(_,_)``
22+
/// that allows for a collection of ValkeyCommands. It provides more flexibility but
23+
/// is more expensive to run and the command responses are returned as ``RESPToken``
24+
/// instead of the response type for the command.
25+
///
26+
/// - Parameter commands: Collection of ValkeyCommands
27+
/// - Returns: Array holding the RESPToken responses of all the commands
28+
func execute(_ commands: [any ValkeyCommand]) async -> sending [Result<RESPToken, Error>]
1529
}

Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ struct ClusterIntegrationTests {
615615
var commands: [any ValkeyCommand] = .init()
616616
commands.append(SET(key, value: "cluster pipeline test"))
617617
commands.append(GET(key))
618-
let results = try await client.execute(commands)
618+
let results = await client.execute(commands)
619619
let response = try results[1].get().decode(as: String.self)
620620
#expect(response == "cluster pipeline test")
621621
}
@@ -638,7 +638,7 @@ struct ClusterIntegrationTests {
638638
commands.append(GET(key))
639639
commands.append(DEL(keys: [key]))
640640
}
641-
let results = try await client.execute(commands)
641+
let results = await client.execute(commands)
642642
let response = try results[1].get().decode(as: String.self)
643643
#expect(response == "0")
644644
let response2 = try results[7].get().decode(as: String.self)
@@ -655,7 +655,7 @@ struct ClusterIntegrationTests {
655655
let firstNodePort = clusterFirstNodePort ?? 6379
656656
try await ClusterIntegrationTests.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort)], logger: logger) {
657657
client in
658-
let results = try await client.execute(
658+
let results = await client.execute(
659659
SET("test1", value: "1"),
660660
GET("test1"),
661661
DEL(keys: ["test1"]),

Tests/IntegrationTests/ClientIntegrationTests.swift

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,25 @@ struct ClientIntegratedTests {
203203
}
204204
}
205205

206+
@Test
207+
@available(valkeySwift 1.0, *)
208+
func testPipelinedProtocolSetGet() async throws {
209+
@Sendable func setGet(_ client: some ValkeyClientProtocol, key: ValkeyKey) async throws {
210+
var commands: [any ValkeyCommand] = []
211+
commands.append(SET(key, value: "Pipelined Hello"))
212+
commands.append(GET(key))
213+
let responses = await client.execute(commands)
214+
try #expect(responses[1].get().decode(as: String.self) == "Pipelined Hello")
215+
}
216+
var logger = Logger(label: "Valkey")
217+
logger.logLevel = .debug
218+
try await withValkeyConnection(.hostname(valkeyHostname, port: 6379), logger: logger) { connection in
219+
try await withKey(connection: connection) { key in
220+
try await setGet(connection, key: key)
221+
}
222+
}
223+
}
224+
206225
@Test
207226
@available(valkeySwift 1.0, *)
208227
func testPipelinedSetGetClient() async throws {

0 commit comments

Comments
 (0)