Skip to content

Commit 4da00e6

Browse files
committed
Rewrite transaction code, to make it easier to move to client
Signed-off-by: Adam Fowler <[email protected]>
1 parent 52bb794 commit 4da00e6

File tree

4 files changed

+79
-368
lines changed

4 files changed

+79
-368
lines changed

Sources/Valkey/Connection/ValkeyConnection+transactions.swift

Lines changed: 0 additions & 288 deletions
This file was deleted.

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -225,26 +225,60 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
225225
public func execute<each Command: ValkeyCommand>(
226226
_ commands: repeat each Command
227227
) async -> sending (repeat Result<(each Command).Response, Error>) {
228-
func convert<Response: RESPTokenDecodable>(_ result: Result<RESPToken, Error>, to: Response.Type) -> Result<Response, Error> {
229-
result.flatMap {
230-
do {
231-
return try .success(Response(fromRESP: $0))
232-
} catch {
233-
return .failure(error)
228+
let requestID = Self.requestIDGenerator.next()
229+
// this currently allocates a promise for every command. We could collapse this down to one promise
230+
var mpromises: [EventLoopPromise<RESPToken>] = []
231+
var encoder = ValkeyCommandEncoder()
232+
for command in repeat each commands {
233+
command.encode(into: &encoder)
234+
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
235+
}
236+
let outBuffer = encoder.buffer
237+
let promises = mpromises
238+
return await withTaskCancellationHandler {
239+
if Task.isCancelled {
240+
for promise in mpromises {
241+
promise.fail(ValkeyClientError(.cancelled))
234242
}
243+
} else {
244+
// write directly to channel handler
245+
self.channelHandler.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }, id: requestID))
235246
}
247+
// get response from channel handler
248+
var index = AutoIncrementingInteger()
249+
return await (repeat promises[index.next()].futureResult._result().convertFromRESP(to: (each Command).Response.self))
250+
} onCancel: {
251+
self.cancel(requestID: requestID)
236252
}
253+
}
254+
255+
/// Pipeline a series of commands to Valkey connection
256+
///
257+
/// Once all the responses for the commands have been received the function returns
258+
/// a parameter pack of Results, one for each command.
259+
///
260+
/// - Parameter commands: Parameter pack of ValkeyCommands
261+
/// - Returns: Parameter pack holding the responses of all the commands
262+
@inlinable
263+
public func transaction<each Command: ValkeyCommand>(
264+
_ commands: repeat each Command
265+
) async throws -> sending (repeat Result<(each Command).Response, Error>) {
237266
let requestID = Self.requestIDGenerator.next()
238267
// this currently allocates a promise for every command. We could collapse this down to one promise
239268
var mpromises: [EventLoopPromise<RESPToken>] = []
240269
var encoder = ValkeyCommandEncoder()
270+
MULTI().encode(into: &encoder)
271+
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
241272
for command in repeat each commands {
242273
command.encode(into: &encoder)
243274
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
244275
}
276+
EXEC().encode(into: &encoder)
277+
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
278+
245279
let outBuffer = encoder.buffer
246280
let promises = mpromises
247-
return await withTaskCancellationHandler {
281+
return try await withTaskCancellationHandler {
248282
if Task.isCancelled {
249283
for promise in mpromises {
250284
promise.fail(ValkeyClientError(.cancelled))
@@ -254,8 +288,10 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
254288
self.channelHandler.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }, id: requestID))
255289
}
256290
// get response from channel handler
257-
var index = AutoIncrementingInteger()
258-
return await (repeat convert(promises[index.next()].futureResult._result(), to: (each Command).Response.self))
291+
guard let responses = try await promises.last!.futureResult._result().convertFromRESP(to: EXEC.Response.self).get() else {
292+
throw ValkeyClientError(.transactionAborted)
293+
}
294+
return responses.decodeElementResults()
259295
} onCancel: {
260296
self.cancel(requestID: requestID)
261297
}
@@ -539,8 +575,8 @@ struct AutoIncrementingInteger {
539575
var value: Int = 0
540576

541577
@inlinable
542-
init() {
543-
self.value = 0
578+
init(_ value: Int = 0) {
579+
self.value = value
544580
}
545581

546582
@inlinable
@@ -566,3 +602,16 @@ extension ValkeyClientError {
566602
}
567603
}
568604
#endif
605+
606+
extension Result where Success == RESPToken, Failure == any Error {
607+
@usableFromInline
608+
func convertFromRESP<Response: RESPTokenDecodable>(to: Response.Type) -> Result<Response, Error> {
609+
self.flatMap {
610+
do {
611+
return try .success(Response(fromRESP: $0))
612+
} catch {
613+
return .failure(error)
614+
}
615+
}
616+
}
617+
}

0 commit comments

Comments
 (0)