@@ -225,30 +225,21 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
225
225
public func execute< each Command : ValkeyCommand > (
226
226
_ commands: repeat each Command
227
227
) async -> sending ( repeat Result < ( each Command ) . Response, Error > ) {
228
- let requestID = Self . requestIDGenerator. next ( )
229
228
// this currently allocates a promise for every command. We could collapse this down to one promise
230
- var mpromises : [ EventLoopPromise < RESPToken > ] = [ ]
229
+ var promises : [ EventLoopPromise < RESPToken > ] = [ ]
231
230
var encoder = ValkeyCommandEncoder ( )
232
231
for command in repeat each commands {
233
232
command. encode ( into: & encoder)
234
- mpromises . append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
233
+ promises . append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
235
234
}
236
- let outBuffer = encoder. buffer
237
- let promises = mpromises
238
- return await withTaskCancellationHandler {
239
- if Task . isCancelled {
240
- for promise in mpromises {
241
- promise. fail ( ValkeyClientError ( . cancelled) )
242
- }
243
- } else {
244
- // write directly to channel handler
245
- self . channelHandler. write ( request: ValkeyRequest . multiple ( buffer: outBuffer, promises: promises. map { . nio( $0) } , id: requestID) )
246
- }
235
+ return await _execute (
236
+ buffer: encoder. buffer,
237
+ promises: promises,
238
+ valkeyPromises: promises. map { . nio( $0) }
239
+ ) { promises in
247
240
// get response from channel handler
248
241
var index = AutoIncrementingInteger ( )
249
242
return await ( repeat promises[ index. next ( ) ] . futureResult. _result ( ) . convertFromRESP ( to: ( each Command) . Response. self) )
250
- } onCancel: {
251
- self . cancel ( requestID: requestID)
252
243
}
253
244
}
254
245
@@ -263,38 +254,30 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
263
254
public func transaction< each Command : ValkeyCommand > (
264
255
_ commands: repeat each Command
265
256
) async throws -> sending ( repeat Result < ( each Command ) . Response, Error > ) {
266
- let requestID = Self . requestIDGenerator. next ( )
267
- // this currently allocates a promise for every command. We could collapse this down to one promise
268
- var mpromises : [ EventLoopPromise < RESPToken > ] = [ ]
269
257
var encoder = ValkeyCommandEncoder ( )
258
+ var count = 0
270
259
MULTI ( ) . encode ( into: & encoder)
271
- mpromises. append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
272
260
for command in repeat each commands {
261
+ count += 1
273
262
command. encode ( into: & encoder)
274
- mpromises. append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
275
263
}
276
264
EXEC ( ) . encode ( into: & encoder)
277
- mpromises. append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
278
-
279
- let outBuffer = encoder. buffer
280
- let promises = mpromises
281
- return try await withTaskCancellationHandler {
282
- if Task . isCancelled {
283
- for promise in mpromises {
284
- promise. fail ( ValkeyClientError ( . cancelled) )
285
- }
286
- } else {
287
- // write directly to channel handler
288
- self . channelHandler. write ( request: ValkeyRequest . multiple ( buffer: outBuffer, promises: promises. map { . nio( $0) } , id: requestID) )
289
- }
265
+ let promise = channel. eventLoop. makePromise ( of: RESPToken . self)
266
+ return try await _execute (
267
+ buffer: encoder. buffer,
268
+ promises: CollectionOfOne ( promise) ,
269
+ valkeyPromises: . init( repeating: . forget, count: count + 1 ) + [ . nio( promise) ]
270
+ ) { promises -> sending Result< ( repeat Result < ( each Command ) . Response, Error > ) , Error > in
290
271
// get response from channel handler
291
- guard let responses = try await promises. last!. futureResult. _result ( ) . convertFromRESP ( to: EXEC . Response. self) . get ( ) else {
292
- throw ValkeyClientError ( . transactionAborted)
272
+ do {
273
+ guard let responses = try await promises [ 0 ] . futureResult. _result ( ) . convertFromRESP ( to: EXEC . Response. self) . get ( ) else {
274
+ return . failure( ValkeyClientError ( . transactionAborted) )
275
+ }
276
+ return . success( responses. decodeElementResults ( ) )
277
+ } catch {
278
+ return . failure( error)
293
279
}
294
- return responses. decodeElementResults ( )
295
- } onCancel: {
296
- self . cancel ( requestID: requestID)
297
- }
280
+ } . get ( )
298
281
}
299
282
300
283
/// Pipeline a series of commands to Valkey connection
@@ -312,39 +295,76 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
312
295
@inlinable
313
296
public func execute(
314
297
_ commands: some Collection < any ValkeyCommand >
315
- ) async -> sending [ Result < RESPToken , Error > ] {
316
- let requestID = Self . requestIDGenerator. next ( )
298
+ ) async -> [ Result < RESPToken , Error > ] {
317
299
// this currently allocates a promise for every command. We could collapse this down to one promise
318
- var mpromises : [ EventLoopPromise < RESPToken > ] = [ ]
319
- mpromises . reserveCapacity ( commands. count)
300
+ var promises : [ EventLoopPromise < RESPToken > ] = [ ]
301
+ promises . reserveCapacity ( commands. count)
320
302
var encoder = ValkeyCommandEncoder ( )
321
303
for command in commands {
322
304
command. encode ( into: & encoder)
323
- mpromises . append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
305
+ promises . append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
324
306
}
325
- let outBuffer = encoder. buffer
326
- let promises = mpromises
327
- return await withTaskCancellationHandler {
328
- if Task . isCancelled {
329
- for promise in mpromises {
330
- promise. fail ( ValkeyClientError ( . cancelled) )
331
- }
332
- } else {
333
- // write directly to channel handler
334
- self . channelHandler. write ( request: ValkeyRequest . multiple ( buffer: outBuffer, promises: promises. map { . nio( $0) } , id: requestID) )
335
- }
307
+ let count = commands. count
308
+ return await _execute (
309
+ buffer: encoder. buffer,
310
+ promises: promises,
311
+ valkeyPromises: promises. map { . nio( $0) }
312
+ ) { promises in
336
313
// get response from channel handler
337
314
var results : [ Result < RESPToken , Error > ] = . init( )
338
- results. reserveCapacity ( commands . count)
315
+ results. reserveCapacity ( count)
339
316
for promise in promises {
340
317
await results. append ( promise. futureResult. _result ( ) )
341
318
}
342
319
return results
343
- } onCancel: {
344
- self . cancel ( requestID: requestID)
345
320
}
346
321
}
347
322
323
+ /// Pipeline a series of commands to Valkey connection
324
+ ///
325
+ /// Once all the responses for the commands have been received the function returns
326
+ /// a parameter pack of Results, one for each command.
327
+ ///
328
+ /// - Parameter commands: Parameter pack of ValkeyCommands
329
+ /// - Returns: Parameter pack holding the responses of all the commands
330
+ @inlinable
331
+ public func transaction(
332
+ _ commands: some Collection < any ValkeyCommand >
333
+ ) async throws -> [ Result < RESPToken , Error > ] {
334
+ var encoder = ValkeyCommandEncoder ( )
335
+ var count = 0
336
+ MULTI ( ) . encode ( into: & encoder)
337
+ for command in commands {
338
+ count += 1
339
+ command. encode ( into: & encoder)
340
+ }
341
+ EXEC ( ) . encode ( into: & encoder)
342
+ let promise = channel. eventLoop. makePromise ( of: RESPToken . self)
343
+ return try await _execute (
344
+ buffer: encoder. buffer,
345
+ promises: CollectionOfOne ( promise) ,
346
+ valkeyPromises: . init( repeating: . forget, count: count + 1 ) + [ . nio( promise) ]
347
+ ) { promises -> sending Result< [ Result < RESPToken , Error > ] , Error > in
348
+ do {
349
+ guard let responses = try await promises [ 0 ] . futureResult. _result ( ) . convertFromRESP ( to: EXEC . Response. self) . get ( ) else {
350
+ return . failure( ValkeyClientError ( . transactionAborted) )
351
+ }
352
+ return . success(
353
+ responses. map {
354
+ switch $0. identifier {
355
+ case . simpleError, . bulkError:
356
+ . failure( ValkeyClientError ( . commandError, message: $0. errorString. map { Swift . String ( buffer: $0) } ) )
357
+ default :
358
+ . success( $0)
359
+ }
360
+ }
361
+ )
362
+ } catch {
363
+ return . failure( error)
364
+ }
365
+ } . get ( )
366
+ }
367
+
348
368
/// Pipeline a series of commands to Valkey connection and precede each command with an ASKING
349
369
/// command
350
370
///
@@ -358,37 +378,62 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
358
378
@usableFromInline
359
379
func executeWithAsk(
360
380
_ commands: some Collection < any ValkeyCommand >
361
- ) async -> sending [ Result < RESPToken , Error > ] {
362
- let requestID = Self . requestIDGenerator. next ( )
381
+ ) async -> [ Result < RESPToken , Error > ] {
363
382
// this currently allocates a promise for every command. We could collapse this down to one promise
364
- var mpromises : [ EventLoopPromise < RESPToken > ] = [ ]
365
- mpromises. reserveCapacity ( commands. count)
383
+ var promises : [ EventLoopPromise < RESPToken > ] = [ ]
384
+ promises. reserveCapacity ( commands. count)
385
+ var valkeyPromises : [ ValkeyPromise < RESPToken > ] = [ ]
386
+ valkeyPromises. reserveCapacity ( commands. count * 2 )
366
387
var encoder = ValkeyCommandEncoder ( )
367
388
for command in commands {
368
389
ASKING ( ) . encode ( into: & encoder)
369
390
command. encode ( into: & encoder)
370
- mpromises. append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
391
+ promises. append ( channel. eventLoop. makePromise ( of: RESPToken . self) )
392
+ valkeyPromises. append ( . forget)
393
+ valkeyPromises. append ( . nio( promises. last!) )
371
394
}
372
- let outBuffer = encoder. buffer
373
- let promises = mpromises
395
+
396
+ let count = commands. count
397
+ return await _execute (
398
+ buffer: encoder. buffer,
399
+ promises: promises,
400
+ valkeyPromises: valkeyPromises
401
+ ) { promises in
402
+ // get response from channel handler
403
+ var results : [ Result < RESPToken , Error > ] = . init( )
404
+ results. reserveCapacity ( count)
405
+ for promise in promises {
406
+ await results. append ( promise. futureResult. _result ( ) )
407
+ }
408
+ return results
409
+ }
410
+ }
411
+
412
+ /// Execute stream of commands written into buffer
413
+ ///
414
+ /// The function is provided with an array of EventLoopPromises for the responses of commands
415
+ /// we care about and an array of valkey promises one for each command
416
+ @inlinable
417
+ func _execute< Value, Promises: Collection & Sendable > (
418
+ buffer: ByteBuffer ,
419
+ promises: Promises ,
420
+ valkeyPromises: [ ValkeyPromise < RESPToken > ] ,
421
+ processResults: sending ( Promises ) async -> Value
422
+ ) async -> Value where Promises. Element == EventLoopPromise < RESPToken > {
423
+ let requestID = Self . requestIDGenerator. next ( )
374
424
return await withTaskCancellationHandler {
375
425
if Task . isCancelled {
376
- for promise in mpromises {
426
+ for promise in promises {
377
427
promise. fail ( ValkeyClientError ( . cancelled) )
378
428
}
379
429
} else {
380
430
// write directly to channel handler
381
431
self . channelHandler. write (
382
- request: ValkeyRequest . multiple ( buffer: outBuffer , promises: promises . flatMap { [ . forget , . nio ( $0 ) ] } , id: requestID)
432
+ request: ValkeyRequest . multiple ( buffer: buffer , promises: valkeyPromises , id: requestID)
383
433
)
384
434
}
385
- // get response from channel handler
386
- var results : [ Result < RESPToken , Error > ] = . init( )
387
- results. reserveCapacity ( commands. count)
388
- for promise in promises {
389
- await results. append ( promise. futureResult. _result ( ) )
390
- }
391
- return results
435
+
436
+ return await processResults ( promises)
392
437
} onCancel: {
393
438
self . cancel ( requestID: requestID)
394
439
}
0 commit comments