@@ -258,28 +258,53 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
258
258
public func transaction< each Command : ValkeyCommand > (
259
259
_ commands: repeat each Command
260
260
) async throws -> sending ( repeat Result < ( each Command ) . Response, Error > ) {
261
+ func replaceSuccessWithError< Response: RESPTokenDecodable > (
262
+ response: Response . Type ,
263
+ result: Result < RESPToken , Error > ,
264
+ error: any Error
265
+ ) -> Result < Response , Error > {
266
+ switch result {
267
+ case . failure( let error) :
268
+ return . failure( error)
269
+ case . success:
270
+ return . failure( error)
271
+ }
272
+ }
261
273
var encoder = ValkeyCommandEncoder ( )
262
- var count = 0
274
+ var promises : [ EventLoopPromise < RESPToken > ] = [ ]
263
275
MULTI ( ) . encode ( into: & encoder)
276
+ promises. append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
264
277
for command in repeat each commands {
265
- count += 1
266
278
command. encode ( into: & encoder)
279
+ promises. append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
267
280
}
268
281
EXEC ( ) . encode ( into: & encoder)
269
- let promise = channel. eventLoop. makePromise ( of: RESPToken . self)
282
+ promises . append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
270
283
return try await _execute (
271
284
buffer: encoder. buffer,
272
- promises: CollectionOfOne ( promise ) ,
273
- valkeyPromises: . init ( repeating : . forget , count : count + 1 ) + [ . nio( promise ) ]
285
+ promises: promises ,
286
+ valkeyPromises: promises . map { . nio( $0 ) }
274
287
) { promises -> sending Result< ( repeat Result < ( each Command ) . Response, Error > ) , Error > in
275
288
// get response from channel handler
276
289
do {
277
- guard let responses = try await promises [ 0 ] . futureResult. _result ( ) . convertFromRESP ( to: EXEC . Response. self) . get ( ) else {
290
+ guard let responses = try await promises. last! . futureResult. _result ( ) . convertFromRESP ( to: EXEC . Response. self) . get ( ) else {
278
291
return . failure( ValkeyClientError ( . transactionAborted) )
279
292
}
280
293
return . success( responses. decodeElementResults ( ) )
281
294
} catch {
282
- return . failure( error)
295
+ // we received an error while running the EXEC command. We return an
296
+ // array of results all with errors. If the queuing of a command already
297
+ // generated an error then we use that error, otherwise we use the error
298
+ // that we have just caught
299
+ var index = AutoIncrementingInteger ( 1 )
300
+ return await . success(
301
+ ( repeat
302
+ replaceSuccessWithError(
303
+ response: ( each Command) . Response. self,
304
+ result: promises [ index. next ( ) ] . futureResult. _result ( ) ,
305
+ error: error
306
+ ) )
307
+ )
283
308
}
284
309
} . get ( )
285
310
}
@@ -339,21 +364,22 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
339
364
_ commands: some Collection < any ValkeyCommand >
340
365
) async throws -> [ Result < RESPToken , Error > ] {
341
366
var encoder = ValkeyCommandEncoder ( )
342
- var count = 0
367
+ var promises : [ EventLoopPromise < RESPToken > ] = [ ]
343
368
MULTI ( ) . encode ( into: & encoder)
369
+ promises. append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
344
370
for command in commands {
345
- count += 1
346
371
command. encode ( into: & encoder)
372
+ promises. append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
347
373
}
348
374
EXEC ( ) . encode ( into: & encoder)
349
- let promise = channel. eventLoop. makePromise ( of: RESPToken . self)
375
+ promises . append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
350
376
return try await _execute (
351
377
buffer: encoder. buffer,
352
- promises: CollectionOfOne ( promise ) ,
353
- valkeyPromises: . init ( repeating : . forget , count : count + 1 ) + [ . nio( promise ) ]
378
+ promises: promises ,
379
+ valkeyPromises: promises . map { . nio( $0 ) }
354
380
) { promises -> sending Result< [ Result < RESPToken , Error > ] , Error > in
355
381
do {
356
- guard let responses = try await promises [ 0 ] . futureResult. _result ( ) . convertFromRESP ( to: EXEC . Response. self) . get ( ) else {
382
+ guard let responses = try await promises. last! . futureResult. _result ( ) . convertFromRESP ( to: EXEC . Response. self) . get ( ) else {
357
383
return . failure( ValkeyClientError ( . transactionAborted) )
358
384
}
359
385
return . success(
@@ -367,7 +393,22 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
367
393
}
368
394
)
369
395
} catch {
370
- return . failure( error)
396
+ // we received an error while running the EXEC command. We return an
397
+ // array of results all with errors. If the queuing of a command already
398
+ // generated an error then we use that error, otherwise we use the error
399
+ // that we have just caught
400
+ var results : [ Result < RESPToken , Error > ] = . init( )
401
+ results. reserveCapacity ( promises. count - 2 )
402
+ for promise in promises [ 1 ..< ( promises. count - 1 ) ] {
403
+ let result = await promise. futureResult. _result ( )
404
+ switch result {
405
+ case . failure:
406
+ results. append ( result)
407
+ case . success:
408
+ results. append ( . failure( error) )
409
+ }
410
+ }
411
+ return . success( results)
371
412
}
372
413
} . get ( )
373
414
}
@@ -421,12 +462,12 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
421
462
/// The function is provided with an array of EventLoopPromises for the responses of commands
422
463
/// we care about and an array of valkey promises one for each command
423
464
@inlinable
424
- func _execute< Value, Promises : Collection & Sendable > (
465
+ func _execute< Value> (
425
466
buffer: ByteBuffer ,
426
- promises: Promises ,
467
+ promises: [ EventLoopPromise < RESPToken > ] ,
427
468
valkeyPromises: [ ValkeyPromise < RESPToken > ] ,
428
- processResults: sending ( Promises ) async -> Value
429
- ) async -> Value where Promises . Element == EventLoopPromise < RESPToken > {
469
+ processResults: sending ( [ EventLoopPromise < RESPToken > ] ) async -> Value
470
+ ) async -> Value {
430
471
let requestID = Self . requestIDGenerator. next ( )
431
472
return await withTaskCancellationHandler {
432
473
if Task . isCancelled {
0 commit comments