Skip to content

Commit 2939660

Browse files
committed
Rewrite all ValkeyConnection pipeline commands to share code
Signed-off-by: Adam Fowler <[email protected]>
1 parent 8efe39b commit 2939660

File tree

4 files changed

+219
-98
lines changed

4 files changed

+219
-98
lines changed

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 120 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -225,30 +225,21 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
225225
public func execute<each Command: ValkeyCommand>(
226226
_ commands: repeat each Command
227227
) async -> sending (repeat Result<(each Command).Response, Error>) {
228-
let requestID = Self.requestIDGenerator.next()
229228
// this currently allocates a promise for every command. We could collapse this down to one promise
230-
var mpromises: [EventLoopPromise<RESPToken>] = []
229+
var promises: [EventLoopPromise<RESPToken>] = []
231230
var encoder = ValkeyCommandEncoder()
232231
for command in repeat each commands {
233232
command.encode(into: &encoder)
234-
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
233+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
235234
}
236-
let outBuffer = encoder.buffer
237-
let promises = mpromises
238-
return await withTaskCancellationHandler {
239-
if Task.isCancelled {
240-
for promise in mpromises {
241-
promise.fail(ValkeyClientError(.cancelled))
242-
}
243-
} else {
244-
// write directly to channel handler
245-
self.channelHandler.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }, id: requestID))
246-
}
235+
return await _execute(
236+
buffer: encoder.buffer,
237+
promises: promises,
238+
valkeyPromises: promises.map { .nio($0) }
239+
) { promises in
247240
// get response from channel handler
248241
var index = AutoIncrementingInteger()
249242
return await (repeat promises[index.next()].futureResult._result().convertFromRESP(to: (each Command).Response.self))
250-
} onCancel: {
251-
self.cancel(requestID: requestID)
252243
}
253244
}
254245

@@ -263,38 +254,30 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
263254
public func transaction<each Command: ValkeyCommand>(
264255
_ commands: repeat each Command
265256
) async throws -> sending (repeat Result<(each Command).Response, Error>) {
266-
let requestID = Self.requestIDGenerator.next()
267-
// this currently allocates a promise for every command. We could collapse this down to one promise
268-
var mpromises: [EventLoopPromise<RESPToken>] = []
269257
var encoder = ValkeyCommandEncoder()
258+
var count = 0
270259
MULTI().encode(into: &encoder)
271-
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
272260
for command in repeat each commands {
261+
count += 1
273262
command.encode(into: &encoder)
274-
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
275263
}
276264
EXEC().encode(into: &encoder)
277-
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
278-
279-
let outBuffer = encoder.buffer
280-
let promises = mpromises
281-
return try await withTaskCancellationHandler {
282-
if Task.isCancelled {
283-
for promise in mpromises {
284-
promise.fail(ValkeyClientError(.cancelled))
285-
}
286-
} else {
287-
// write directly to channel handler
288-
self.channelHandler.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }, id: requestID))
289-
}
265+
let promise = channel.eventLoop.makePromise(of: RESPToken.self)
266+
return try await _execute(
267+
buffer: encoder.buffer,
268+
promises: CollectionOfOne(promise),
269+
valkeyPromises: .init(repeating: .forget, count: count + 1) + [.nio(promise)]
270+
) { promises -> sending Result<(repeat Result<(each Command).Response, Error>), Error> in
290271
// get response from channel handler
291-
guard let responses = try await promises.last!.futureResult._result().convertFromRESP(to: EXEC.Response.self).get() else {
292-
throw ValkeyClientError(.transactionAborted)
272+
do {
273+
guard let responses = try await promises[0].futureResult._result().convertFromRESP(to: EXEC.Response.self).get() else {
274+
return .failure(ValkeyClientError(.transactionAborted))
275+
}
276+
return .success(responses.decodeElementResults())
277+
} catch {
278+
return .failure(error)
293279
}
294-
return responses.decodeElementResults()
295-
} onCancel: {
296-
self.cancel(requestID: requestID)
297-
}
280+
}.get()
298281
}
299282

300283
/// Pipeline a series of commands to Valkey connection
@@ -312,39 +295,76 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
312295
@inlinable
313296
public func execute(
314297
_ commands: some Collection<any ValkeyCommand>
315-
) async -> sending [Result<RESPToken, Error>] {
316-
let requestID = Self.requestIDGenerator.next()
298+
) async -> [Result<RESPToken, Error>] {
317299
// this currently allocates a promise for every command. We could collapse this down to one promise
318-
var mpromises: [EventLoopPromise<RESPToken>] = []
319-
mpromises.reserveCapacity(commands.count)
300+
var promises: [EventLoopPromise<RESPToken>] = []
301+
promises.reserveCapacity(commands.count)
320302
var encoder = ValkeyCommandEncoder()
321303
for command in commands {
322304
command.encode(into: &encoder)
323-
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
305+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
324306
}
325-
let outBuffer = encoder.buffer
326-
let promises = mpromises
327-
return await withTaskCancellationHandler {
328-
if Task.isCancelled {
329-
for promise in mpromises {
330-
promise.fail(ValkeyClientError(.cancelled))
331-
}
332-
} else {
333-
// write directly to channel handler
334-
self.channelHandler.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }, id: requestID))
335-
}
307+
let count = commands.count
308+
return await _execute(
309+
buffer: encoder.buffer,
310+
promises: promises,
311+
valkeyPromises: promises.map { .nio($0) }
312+
) { promises in
336313
// get response from channel handler
337314
var results: [Result<RESPToken, Error>] = .init()
338-
results.reserveCapacity(commands.count)
315+
results.reserveCapacity(count)
339316
for promise in promises {
340317
await results.append(promise.futureResult._result())
341318
}
342319
return results
343-
} onCancel: {
344-
self.cancel(requestID: requestID)
345320
}
346321
}
347322

323+
/// Pipeline a series of commands to Valkey connection
324+
///
325+
/// Once all the responses for the commands have been received the function returns
326+
/// a parameter pack of Results, one for each command.
327+
///
328+
/// - Parameter commands: Parameter pack of ValkeyCommands
329+
/// - Returns: Parameter pack holding the responses of all the commands
330+
@inlinable
331+
public func transaction(
332+
_ commands: some Collection<any ValkeyCommand>
333+
) async throws -> [Result<RESPToken, Error>] {
334+
var encoder = ValkeyCommandEncoder()
335+
var count = 0
336+
MULTI().encode(into: &encoder)
337+
for command in commands {
338+
count += 1
339+
command.encode(into: &encoder)
340+
}
341+
EXEC().encode(into: &encoder)
342+
let promise = channel.eventLoop.makePromise(of: RESPToken.self)
343+
return try await _execute(
344+
buffer: encoder.buffer,
345+
promises: CollectionOfOne(promise),
346+
valkeyPromises: .init(repeating: .forget, count: count + 1) + [.nio(promise)]
347+
) { promises -> sending Result<[Result<RESPToken, Error>], Error> in
348+
do {
349+
guard let responses = try await promises[0].futureResult._result().convertFromRESP(to: EXEC.Response.self).get() else {
350+
return .failure(ValkeyClientError(.transactionAborted))
351+
}
352+
return .success(
353+
responses.map {
354+
switch $0.identifier {
355+
case .simpleError, .bulkError:
356+
.failure(ValkeyClientError(.commandError, message: $0.errorString.map { Swift.String(buffer: $0) }))
357+
default:
358+
.success($0)
359+
}
360+
}
361+
)
362+
} catch {
363+
return .failure(error)
364+
}
365+
}.get()
366+
}
367+
348368
/// Pipeline a series of commands to Valkey connection and precede each command with an ASKING
349369
/// command
350370
///
@@ -358,37 +378,62 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
358378
@inlinable
359379
func executeWithAsk(
360380
_ commands: some Collection<any ValkeyCommand>
361-
) async -> sending [Result<RESPToken, Error>] {
362-
let requestID = Self.requestIDGenerator.next()
381+
) async -> [Result<RESPToken, Error>] {
363382
// this currently allocates a promise for every command. We could collapse this down to one promise
364-
var mpromises: [EventLoopPromise<RESPToken>] = []
365-
mpromises.reserveCapacity(commands.count)
383+
var promises: [EventLoopPromise<RESPToken>] = []
384+
promises.reserveCapacity(commands.count)
385+
var valkeyPromises: [ValkeyPromise<RESPToken>] = []
386+
valkeyPromises.reserveCapacity(commands.count * 2)
366387
var encoder = ValkeyCommandEncoder()
367388
for command in commands {
368389
ASKING().encode(into: &encoder)
369390
command.encode(into: &encoder)
370-
mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self))
391+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
392+
valkeyPromises.append(.forget)
393+
valkeyPromises.append(.nio(promises.last!))
371394
}
372-
let outBuffer = encoder.buffer
373-
let promises = mpromises
395+
396+
let count = commands.count
397+
return await _execute(
398+
buffer: encoder.buffer,
399+
promises: promises,
400+
valkeyPromises: valkeyPromises
401+
) { promises in
402+
// get response from channel handler
403+
var results: [Result<RESPToken, Error>] = .init()
404+
results.reserveCapacity(count)
405+
for promise in promises {
406+
await results.append(promise.futureResult._result())
407+
}
408+
return results
409+
}
410+
}
411+
412+
/// Execute stream of commands written into buffer
413+
///
414+
/// The function is provided with an array of EventLoopPromises for the responses of commands
415+
/// we care about and an array of valkey promises one for each command
416+
@inlinable
417+
func _execute<Value, Promises: Collection & Sendable>(
418+
buffer: ByteBuffer,
419+
promises: Promises,
420+
valkeyPromises: [ValkeyPromise<RESPToken>],
421+
processResults: sending (Promises) async -> Value
422+
) async -> Value where Promises.Element == EventLoopPromise<RESPToken> {
423+
let requestID = Self.requestIDGenerator.next()
374424
return await withTaskCancellationHandler {
375425
if Task.isCancelled {
376-
for promise in mpromises {
426+
for promise in promises {
377427
promise.fail(ValkeyClientError(.cancelled))
378428
}
379429
} else {
380430
// write directly to channel handler
381431
self.channelHandler.write(
382-
request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.flatMap { [.forget, .nio($0)] }, id: requestID)
432+
request: ValkeyRequest.multiple(buffer: buffer, promises: valkeyPromises, id: requestID)
383433
)
384434
}
385-
// get response from channel handler
386-
var results: [Result<RESPToken, Error>] = .init()
387-
results.reserveCapacity(commands.count)
388-
for promise in promises {
389-
await results.append(promise.futureResult._result())
390-
}
391-
return results
435+
436+
return await processResults(promises)
392437
} onCancel: {
393438
self.cancel(requestID: requestID)
394439
}

Sources/Valkey/RESP/RESPTokenDecodable.swift

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,6 @@ extension RESPToken: RESPTokenDecodable {
2222
try Value(fromRESP: self)
2323
}
2424

25-
/// Convert RESP3Token to a Result containing the type to convert to or any error found while converting
26-
///
27-
/// This function also checks for RESP error types and returns them if found
28-
///
29-
/// - Parameter type: Type to convert to
30-
/// - Returns: Result containing either the Value or an error
31-
@usableFromInline
32-
func decodeResult<Value: RESPTokenDecodable>(as type: Value.Type = Value.self) -> Result<Value, Error> {
33-
switch self.identifier {
34-
case .simpleError, .bulkError:
35-
return .failure(ValkeyClientError(.commandError, message: self.errorString.map { Swift.String(buffer: $0) }))
36-
default:
37-
do {
38-
return try .success(Value(fromRESP: self))
39-
} catch {
40-
return .failure(error)
41-
}
42-
}
43-
}
44-
4525
@inlinable
4626
public init(fromRESP token: RESPToken) throws {
4727
self = token
@@ -359,9 +339,12 @@ extension RESPToken.Array: RESPTokenDecodable {
359339
return try (repeat decodeOptionalRESPToken(iterator.next(), as: (each Value).self))
360340
}
361341

362-
/// Convert RESP3Token Array to a tuple of values
342+
/// Convert RESPToken Array to a tuple of values.
343+
///
344+
/// RESP error tokens are converted into Result.failure. This is used by the transaction
345+
/// code to convert the array response from EXEC into a parameter pack of Results
346+
///
363347
/// - Parameter as: Tuple of types to convert to
364-
/// - Throws: RESPDecodeError
365348
/// - Returns: Tuple of decoded values
366349
@inlinable
367350
public func decodeElementResults<each Value: RESPTokenDecodable>(
@@ -370,7 +353,16 @@ extension RESPToken.Array: RESPTokenDecodable {
370353
func decodeOptionalRESPToken<T: RESPTokenDecodable>(_ token: RESPToken?, as: T.Type) -> Result<T, Error> {
371354
switch token {
372355
case .some(let value):
373-
return value.decodeResult(as: T.self)
356+
switch value.identifier {
357+
case .simpleError, .bulkError:
358+
return .failure(ValkeyClientError(.commandError, message: value.errorString.map { Swift.String(buffer: $0) }))
359+
default:
360+
do {
361+
return try .success(T(fromRESP: value))
362+
} catch {
363+
return .failure(error)
364+
}
365+
}
374366
case .none:
375367
// TODO: Fixup error when we have a decoding error
376368
return .failure(RESPParsingError(code: .unexpectedType, buffer: token?.base ?? .init()))

Tests/IntegrationTests/ClientIntegrationTests.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,11 @@ struct ClientIntegratedTests {
283283
INCR(key),
284284
GET(key)
285285
)
286+
let lpushError = #expect(throws: ValkeyClientError.self) {
287+
_ = try responses.0.get()
288+
}
289+
#expect(lpushError?.errorCode == .commandError)
290+
#expect(lpushError?.message?.hasPrefix("WRONGTYPE") == true)
286291
let result = try responses.2.get().map { String(buffer: $0) }
287292
#expect(result == "101")
288293
}

0 commit comments

Comments
 (0)