Skip to content

Commit be8d32c

Browse files
authored
Re-organise pipeline execute commands to use same internal function (#239)
1 parent ba1f99f commit be8d32c

File tree

1 file changed

+78
-67
lines changed

1 file changed

+78
-67
lines changed

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 78 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -225,87 +225,60 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
225225
public func execute<each Command: ValkeyCommand>(
226226
_ commands: repeat each Command
227227
) async -> sending (repeat Result<(each Command).Response, any Error>) {
228-
func convert<Response: RESPTokenDecodable>(_ result: Result<RESPToken, any Error>, to: Response.Type) -> Result<Response, any Error> {
229-
result.flatMap {
230-
do {
231-
return try .success(Response(fromRESP: $0))
232-
} catch {
233-
return .failure(error)
234-
}
235-
}
236-
}
237-
let requestID = Self.requestIDGenerator.next()
238228
// this currently allocates a promise for every command. We could collapse this down to one promise
239-
var mpromises: [EventLoopPromise<RESPToken>] = []
229+
var promises: [EventLoopPromise<RESPToken>] = []
240230
var encoder = ValkeyCommandEncoder()
241231
for command in repeat each commands {
242232
command.encode(into: &encoder)
243-
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
233+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
244234
}
245-
let outBuffer = encoder.buffer
246-
let promises = mpromises
247-
return await withTaskCancellationHandler {
248-
if Task.isCancelled {
249-
for promise in mpromises {
250-
promise.fail(ValkeyClientError(.cancelled))
251-
}
252-
} else {
253-
// write directly to channel handler
254-
self.channelHandler.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }, id: requestID))
255-
}
235+
return await _execute(
236+
buffer: encoder.buffer,
237+
promises: promises,
238+
valkeyPromises: promises.map { .nio($0) }
239+
) { promises in
256240
// get response from channel handler
257241
var index = AutoIncrementingInteger()
258-
return await (repeat convert(promises[index.next()].futureResult._result(), to: (each Command).Response.self))
259-
} onCancel: {
260-
self.cancel(requestID: requestID)
242+
return await (repeat promises[index.next()].futureResult._result().convertFromRESP(to: (each Command).Response.self))
261243
}
262244
}
263245

264246
/// Pipeline a series of commands to Valkey connection
265247
///
266248
/// Once all the responses for the commands have been received the function returns
267-
/// an array of RESPToken Results, one for each command.
249+
/// a parameter pack of Results, one for each command.
268250
///
269-
/// This is an alternative version of the pipelining function ``ValkeyConnection/execute(_:)->(_,_)``
270-
/// that allows for a collection of ValkeyCommands. It provides more flexibility but is
271-
/// slightly more expensive to run and the command responses are returned as ``RESPToken``
272-
/// instead of the response type for the command.
251+
/// This is an alternative version of the pipeline function ``ValkeyConnection/execute(_:)->(_,_)``
252+
/// that allows for a collection of ValkeyCommands. It provides more flexibility but the command
253+
/// responses are returned as ``RESPToken`` instead of the response type for the command.
273254
///
274255
/// - Parameter commands: Collection of ValkeyCommands
275256
/// - Returns: Array holding the RESPToken responses of all the commands
276257
@inlinable
277258
public func execute(
278259
_ commands: some Collection<any ValkeyCommand>
279260
) async -> sending [Result<RESPToken, any Error>] {
280-
let requestID = Self.requestIDGenerator.next()
281261
// this currently allocates a promise for every command. We could collapse this down to one promise
282-
var mpromises: [EventLoopPromise<RESPToken>] = []
283-
mpromises.reserveCapacity(commands.count)
262+
var promises: [EventLoopPromise<RESPToken>] = []
263+
promises.reserveCapacity(commands.count)
284264
var encoder = ValkeyCommandEncoder()
285265
for command in commands {
286266
command.encode(into: &encoder)
287-
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
267+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
288268
}
289-
let outBuffer = encoder.buffer
290-
let promises = mpromises
291-
return await withTaskCancellationHandler {
292-
if Task.isCancelled {
293-
for promise in mpromises {
294-
promise.fail(ValkeyClientError(.cancelled))
295-
}
296-
} else {
297-
// write directly to channel handler
298-
self.channelHandler.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }, id: requestID))
299-
}
269+
let count = commands.count
270+
return await _execute(
271+
buffer: encoder.buffer,
272+
promises: promises,
273+
valkeyPromises: promises.map { .nio($0) }
274+
) { promises in
300275
// get response from channel handler
301276
var results: [Result<RESPToken, any Error>] = .init()
302-
results.reserveCapacity(commands.count)
277+
results.reserveCapacity(count)
303278
for promise in promises {
304279
await results.append(promise.futureResult._result())
305280
}
306281
return results
307-
} onCancel: {
308-
self.cancel(requestID: requestID)
309282
}
310283
}
311284

@@ -322,37 +295,62 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
322295
@usableFromInline
323296
func executeWithAsk(
324297
_ commands: some Collection<any ValkeyCommand>
325-
) async -> sending [Result<RESPToken, any Error>] {
326-
let requestID = Self.requestIDGenerator.next()
298+
) async -> [Result<RESPToken, any Error>] {
327299
// this currently allocates a promise for every command. We could collapse this down to one promise
328-
var mpromises: [EventLoopPromise<RESPToken>] = []
329-
mpromises.reserveCapacity(commands.count)
300+
var promises: [EventLoopPromise<RESPToken>] = []
301+
promises.reserveCapacity(commands.count)
302+
var valkeyPromises: [ValkeyPromise<RESPToken>] = []
303+
valkeyPromises.reserveCapacity(commands.count * 2)
330304
var encoder = ValkeyCommandEncoder()
331305
for command in commands {
332306
ASKING().encode(into: &encoder)
333307
command.encode(into: &encoder)
334-
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
308+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
309+
valkeyPromises.append(.forget)
310+
valkeyPromises.append(.nio(promises.last!))
335311
}
336-
let outBuffer = encoder.buffer
337-
let promises = mpromises
312+
313+
let count = commands.count
314+
return await _execute(
315+
buffer: encoder.buffer,
316+
promises: promises,
317+
valkeyPromises: valkeyPromises
318+
) { promises in
319+
// get response from channel handler
320+
var results: [Result<RESPToken, Error>] = .init()
321+
results.reserveCapacity(count)
322+
for promise in promises {
323+
await results.append(promise.futureResult._result())
324+
}
325+
return results
326+
}
327+
}
328+
329+
/// Execute stream of commands written into buffer
330+
///
331+
/// The function is provided with an array of EventLoopPromises for the responses of commands
332+
/// we care about and an array of valkey promises one for each command
333+
@inlinable
334+
func _execute<Value>(
335+
buffer: ByteBuffer,
336+
promises: [EventLoopPromise<RESPToken>],
337+
valkeyPromises: [ValkeyPromise<RESPToken>],
338+
processResults: sending ([EventLoopPromise<RESPToken>]) async -> sending Value
339+
) async -> Value {
340+
let requestID = Self.requestIDGenerator.next()
338341
return await withTaskCancellationHandler {
339342
if Task.isCancelled {
340-
for promise in mpromises {
343+
for promise in promises {
341344
promise.fail(ValkeyClientError(.cancelled))
342345
}
343346
} else {
344347
// write directly to channel handler
345348
self.channelHandler.write(
346-
request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.flatMap { [.forget, .nio($0)] }, id: requestID)
349+
request: ValkeyRequest.multiple(buffer: buffer, promises: valkeyPromises, id: requestID)
347350
)
348351
}
349-
// get response from channel handler
350-
var results: [Result<RESPToken, any Error>] = .init()
351-
results.reserveCapacity(commands.count)
352-
for promise in promises {
353-
await results.append(promise.futureResult._result())
354-
}
355-
return results
352+
353+
return await processResults(promises)
356354
} onCancel: {
357355
self.cancel(requestID: requestID)
358356
}
@@ -539,8 +537,8 @@ struct AutoIncrementingInteger {
539537
var value: Int = 0
540538

541539
@inlinable
542-
init() {
543-
self.value = 0
540+
init(_ value: Int = 0) {
541+
self.value = value
544542
}
545543

546544
@inlinable
@@ -566,3 +564,16 @@ extension ValkeyClientError {
566564
}
567565
}
568566
#endif
567+
568+
extension Result where Success == RESPToken, Failure == any Error {
569+
@usableFromInline
570+
func convertFromRESP<Response: RESPTokenDecodable>(to: Response.Type) -> Result<Response, Error> {
571+
self.flatMap {
572+
do {
573+
return try .success(Response(fromRESP: $0))
574+
} catch {
575+
return .failure(error)
576+
}
577+
}
578+
}
579+
}

0 commit comments

Comments
 (0)