Skip to content

Commit ed0caa6

Browse files
committed
Copy errors from queuing into returned results
Signed-off-by: Adam Fowler <[email protected]>
1 parent 0d8bc49 commit ed0caa6

File tree

1 file changed

+59
-18
lines changed

1 file changed

+59
-18
lines changed

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -258,28 +258,53 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
258258
public func transaction<each Command: ValkeyCommand>(
259259
_ commands: repeat each Command
260260
) async throws -> sending (repeat Result<(each Command).Response, Error>) {
261+
func replaceSuccessWithError<Response: RESPTokenDecodable>(
262+
response: Response.Type,
263+
result: Result<RESPToken, Error>,
264+
error: any Error
265+
) -> Result<Response, Error> {
266+
switch result {
267+
case .failure(let error):
268+
return .failure(error)
269+
case .success:
270+
return .failure(error)
271+
}
272+
}
261273
var encoder = ValkeyCommandEncoder()
262-
var count = 0
274+
var promises: [EventLoopPromise<RESPToken>] = []
263275
MULTI().encode(into: &encoder)
276+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
264277
for command in repeat each commands {
265-
count += 1
266278
command.encode(into: &encoder)
279+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
267280
}
268281
EXEC().encode(into: &encoder)
269-
let promise = channel.eventLoop.makePromise(of: RESPToken.self)
282+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
270283
return try await _execute(
271284
buffer: encoder.buffer,
272-
promises: CollectionOfOne(promise),
273-
valkeyPromises: .init(repeating: .forget, count: count + 1) + [.nio(promise)]
285+
promises: promises,
286+
valkeyPromises: promises.map { .nio($0) }
274287
) { promises -> sending Result<(repeat Result<(each Command).Response, Error>), Error> in
275288
// get response from channel handler
276289
do {
277-
guard let responses = try await promises[0].futureResult._result().convertFromRESP(to: EXEC.Response.self).get() else {
290+
guard let responses = try await promises.last!.futureResult._result().convertFromRESP(to: EXEC.Response.self).get() else {
278291
return .failure(ValkeyClientError(.transactionAborted))
279292
}
280293
return .success(responses.decodeElementResults())
281294
} catch {
282-
return .failure(error)
295+
// we received an error while running the EXEC command. We return an
296+
// array of results all with errors. If the queuing of a command already
297+
// generated an error then we use that error, otherwise we use the error
298+
// that we have just caught
299+
var index = AutoIncrementingInteger(1)
300+
return await .success(
301+
(repeat
302+
replaceSuccessWithError(
303+
response: (each Command).Response.self,
304+
result: promises[index.next()].futureResult._result(),
305+
error: error
306+
))
307+
)
283308
}
284309
}.get()
285310
}
@@ -339,21 +364,22 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
339364
_ commands: some Collection<any ValkeyCommand>
340365
) async throws -> [Result<RESPToken, Error>] {
341366
var encoder = ValkeyCommandEncoder()
342-
var count = 0
367+
var promises: [EventLoopPromise<RESPToken>] = []
343368
MULTI().encode(into: &encoder)
369+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
344370
for command in commands {
345-
count += 1
346371
command.encode(into: &encoder)
372+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
347373
}
348374
EXEC().encode(into: &encoder)
349-
let promise = channel.eventLoop.makePromise(of: RESPToken.self)
375+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
350376
return try await _execute(
351377
buffer: encoder.buffer,
352-
promises: CollectionOfOne(promise),
353-
valkeyPromises: .init(repeating: .forget, count: count + 1) + [.nio(promise)]
378+
promises: promises,
379+
valkeyPromises: promises.map { .nio($0) }
354380
) { promises -> sending Result<[Result<RESPToken, Error>], Error> in
355381
do {
356-
guard let responses = try await promises[0].futureResult._result().convertFromRESP(to: EXEC.Response.self).get() else {
382+
guard let responses = try await promises.last!.futureResult._result().convertFromRESP(to: EXEC.Response.self).get() else {
357383
return .failure(ValkeyClientError(.transactionAborted))
358384
}
359385
return .success(
@@ -367,7 +393,22 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
367393
}
368394
)
369395
} catch {
370-
return .failure(error)
396+
// we received an error while running the EXEC command. We return an
397+
// array of results all with errors. If the queuing of a command already
398+
// generated an error then we use that error, otherwise we use the error
399+
// that we have just caught
400+
var results: [Result<RESPToken, Error>] = .init()
401+
results.reserveCapacity(promises.count - 2)
402+
for promise in promises[1..<(promises.count - 1)] {
403+
let result = await promise.futureResult._result()
404+
switch result {
405+
case .failure:
406+
results.append(result)
407+
case .success:
408+
results.append(.failure(error))
409+
}
410+
}
411+
return .success(results)
371412
}
372413
}.get()
373414
}
@@ -421,12 +462,12 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
421462
/// The function is provided with an array of EventLoopPromises for the responses of commands
422463
/// we care about and an array of valkey promises one for each command
423464
@inlinable
424-
func _execute<Value, Promises: Collection & Sendable>(
465+
func _execute<Value>(
425466
buffer: ByteBuffer,
426-
promises: Promises,
467+
promises: [EventLoopPromise<RESPToken>],
427468
valkeyPromises: [ValkeyPromise<RESPToken>],
428-
processResults: sending (Promises) async -> Value
429-
) async -> Value where Promises.Element == EventLoopPromise<RESPToken> {
469+
processResults: sending ([EventLoopPromise<RESPToken>]) async -> Value
470+
) async -> Value {
430471
let requestID = Self.requestIDGenerator.next()
431472
return await withTaskCancellationHandler {
432473
if Task.isCancelled {

0 commit comments

Comments
 (0)