Skip to content

Commit 6b6ebc0

Browse files
authored
Custom responses for Stream and List commands (#103)
* Custom LMPOP response * Add LMPOP tests * Remove call to debugDescription * Add generic parameter to return value * Add custom responses for XRANGE, XREAD, XREADGROUP and XREVRANGE * XAUTOCLAIM * XPENDING * XCLAIM * Stream values are not required to be strings * Make decodeKeyValue safer * Fix decodeKeyValueElements crash on Ubuntu * Add RESPToken.Array.asMap() Also fixes crash on Ubuntu * Add testArrayAsMap * Use Response type without generic parameters for custom responses * remove empty line * Add @available where needed * Make RESPToken.Array.asMap internal
1 parent f19180d commit 6b6ebc0

18 files changed

+776
-198
lines changed

Sources/Valkey/Commands/ConnectionCommands.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,7 @@ extension ValkeyConnectionProtocol {
988988
/// - Complexity: O(1)
989989
/// - Returns: [String]: The given string
990990
@inlinable
991-
public func echo<Message: RESPStringRenderable>(message: Message) async throws -> ECHO.Response {
991+
public func echo<Message: RESPStringRenderable>(message: Message) async throws -> RESPToken {
992992
try await send(command: ECHO(message: message))
993993
}
994994

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the valkey-swift project
4+
//
5+
// Copyright (c) 2025 the valkey-swift authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See valkey-swift/CONTRIBUTORS.txt for the list of valkey-swift authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
extension LMPOP {
16+
/// - Returns: One of the following
17+
/// * [Null]: If no element could be popped.
18+
/// * [Array]: List key from which elements were popped.
19+
public struct OptionalResponse: RESPTokenDecodable, Sendable {
20+
public let key: ValkeyKey
21+
public let values: RESPToken.Array
22+
23+
public init(fromRESP token: RESPToken) throws {
24+
switch token.value {
25+
case .array(let array):
26+
(self.key, self.values) = try array.decodeElements()
27+
default:
28+
throw RESPParsingError(code: .unexpectedType, buffer: token.base)
29+
}
30+
}
31+
}
32+
public typealias Response = OptionalResponse?
33+
}
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the valkey-swift project
4+
//
5+
// Copyright (c) 2025 the valkey-swift authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See valkey-swift/CONTRIBUTORS.txt for the list of valkey-swift authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
public struct XREADMessage: RESPTokenDecodable, Sendable {
16+
public let id: String
17+
public let fields: [(key: String, value: RESPToken)]
18+
19+
public init(fromRESP token: RESPToken) throws {
20+
switch token.value {
21+
case .array(let array):
22+
let (id, values) = try array.decodeElements(as: (String, RESPToken.Array).self)
23+
let keyValuePairs = try values.asMap().map { try ($0.key.decode(as: String.self), $0.value) }
24+
self.id = id
25+
self.fields = keyValuePairs
26+
default:
27+
throw RESPParsingError(code: .unexpectedType, buffer: token.base)
28+
}
29+
}
30+
}
31+
32+
public struct XREADGroupMessage: RESPTokenDecodable, Sendable {
33+
public let id: String
34+
public let fields: [(key: String, value: RESPToken)]?
35+
36+
public init(fromRESP token: RESPToken) throws {
37+
switch token.value {
38+
case .array(let array):
39+
let (id, values) = try array.decodeElements(as: (String, RESPToken.Array?).self)
40+
let keyValuePairs = try values.map { try $0.asMap().map { try ($0.key.decode(as: String.self), $0.value) } }
41+
self.id = id
42+
self.fields = keyValuePairs
43+
default:
44+
throw RESPParsingError(code: .unexpectedType, buffer: token.base)
45+
}
46+
}
47+
}
48+
49+
public struct XREADStreams<Message>: RESPTokenDecodable, Sendable where Message: RESPTokenDecodable & Sendable {
50+
public struct Stream: Sendable {
51+
public let key: ValkeyKey
52+
public let messages: [Message]
53+
}
54+
55+
public let streams: [Stream]
56+
57+
public init(fromRESP token: RESPToken) throws {
58+
switch token.value {
59+
case .map(let map):
60+
self.streams = try map.map {
61+
let key = try $0.key.decode(as: ValkeyKey.self)
62+
let messages = try $0.value.decode(as: [Message].self)
63+
return Stream(key: key, messages: messages)
64+
}
65+
default:
66+
throw RESPParsingError(code: .unexpectedType, buffer: token.base)
67+
}
68+
}
69+
}
70+
71+
public struct XAUTOCLAIMResponse: RESPTokenDecodable, Sendable {
72+
public let streamID: String
73+
public let messsages: [XREADMessage]
74+
public let deletedMessages: [String]
75+
76+
public init(fromRESP token: RESPToken) throws {
77+
switch token.value {
78+
case .array(let array):
79+
(self.streamID, self.messsages, self.deletedMessages) = try array.decodeElements()
80+
default:
81+
throw RESPParsingError(code: .unexpectedType, buffer: token.base)
82+
}
83+
}
84+
}
85+
extension XAUTOCLAIM {
86+
public typealias Response = XAUTOCLAIMResponse
87+
}
88+
89+
public enum XCLAIMResponse: RESPTokenDecodable, Sendable {
90+
case none
91+
case messages([XREADMessage])
92+
case ids([String])
93+
94+
public init(fromRESP token: RESPToken) throws {
95+
switch token.value {
96+
case .array(let array):
97+
if array.count == 0 {
98+
self = .none
99+
return
100+
}
101+
do {
102+
self = try .messages(array.decode())
103+
} catch {
104+
self = try .ids(array.decode())
105+
}
106+
default:
107+
throw RESPParsingError(code: .unexpectedType, buffer: token.base)
108+
}
109+
}
110+
}
111+
112+
extension XCLAIM {
113+
public typealias Response = XCLAIMResponse
114+
}
115+
116+
public enum XPENDINGResponse: RESPTokenDecodable, Sendable {
117+
public struct Standard: RESPTokenDecodable, Sendable {
118+
public struct Consumer: RESPTokenDecodable, Sendable {
119+
public let consumer: String
120+
public let count: String
121+
122+
public init(fromRESP token: RESPToken) throws {
123+
switch token.value {
124+
case .array(let array):
125+
(self.consumer, self.count) = try array.decodeElements()
126+
default:
127+
throw RESPParsingError(code: .unexpectedType, buffer: token.base)
128+
}
129+
}
130+
}
131+
public let pendingMessageCount: Int
132+
public let minimumID: String
133+
public let maximumID: String
134+
public let consumers: [Consumer]
135+
136+
public init(fromRESP token: RESPToken) throws {
137+
switch token.value {
138+
case .array(let array):
139+
(self.pendingMessageCount, self.minimumID, self.maximumID, self.consumers) = try array.decodeElements()
140+
default:
141+
throw RESPParsingError(code: .unexpectedType, buffer: token.base)
142+
}
143+
}
144+
}
145+
public struct Extended: RESPTokenDecodable, Sendable {
146+
struct PendingMessage: RESPTokenDecodable, Sendable {
147+
public let id: String
148+
public let consumer: String
149+
public let millisecondsSinceDelivered: Int
150+
public let numberOfTimesDelivered: Int
151+
152+
public init(fromRESP token: RESPToken) throws {
153+
switch token.value {
154+
case .array(let array):
155+
(self.id, self.consumer, self.millisecondsSinceDelivered, self.numberOfTimesDelivered) = try array.decodeElements()
156+
default:
157+
throw RESPParsingError(code: .unexpectedType, buffer: token.base)
158+
}
159+
}
160+
}
161+
let messages: [PendingMessage]
162+
163+
public init(fromRESP token: RESPToken) throws {
164+
switch token.value {
165+
case .array(let array):
166+
self.messages = try array.decode(as: [PendingMessage].self)
167+
default:
168+
throw RESPParsingError(code: .unexpectedType, buffer: token.base)
169+
}
170+
}
171+
}
172+
173+
case standard(Standard)
174+
case extended(Extended)
175+
176+
public init(fromRESP token: RESPToken) throws {
177+
do {
178+
self = try .standard(.init(fromRESP: token))
179+
} catch {
180+
self = try .extended(.init(fromRESP: token))
181+
}
182+
}
183+
}
184+
extension XPENDING {
185+
public typealias Response = XPENDINGResponse
186+
}
187+
188+
public typealias XRANGEResponse = [XREADMessage]
189+
extension XRANGE {
190+
public typealias Response = XRANGEResponse
191+
}
192+
193+
public typealias XREADResponse = XREADStreams<XREADMessage>?
194+
extension XREAD {
195+
public typealias Response = XREADResponse
196+
}
197+
198+
public typealias XREADGROUPResponse = XREADStreams<XREADGroupMessage>?
199+
extension XREADGROUP {
200+
public typealias Response = XREADGROUPResponse
201+
}
202+
203+
public typealias XREVRANGEResponse = [XREADMessage]
204+
extension XREVRANGE {
205+
public typealias Response = XREVRANGEResponse
206+
}

Sources/Valkey/Commands/GeoCommands.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1042,7 +1042,7 @@ extension ValkeyConnectionProtocol {
10421042
/// - Complexity: O(N+log(M)) where N is the number of elements inside the bounding box of the circular area delimited by center and radius and M is the number of items inside the index.
10431043
/// - Returns: Array of matched members information.
10441044
@inlinable
1045-
public func georadiusbymember<Member: RESPStringRenderable>(key: ValkeyKey, member: Member, radius: Double, unit: GEORADIUSBYMEMBER<Member>.Unit, withcoord: Bool = false, withdist: Bool = false, withhash: Bool = false, countBlock: GEORADIUSBYMEMBER<Member>.CountBlock? = nil, order: GEORADIUSBYMEMBER<Member>.Order? = nil, store: GEORADIUSBYMEMBER<Member>.Store? = nil) async throws -> GEORADIUSBYMEMBER.Response {
1045+
public func georadiusbymember<Member: RESPStringRenderable>(key: ValkeyKey, member: Member, radius: Double, unit: GEORADIUSBYMEMBER<Member>.Unit, withcoord: Bool = false, withdist: Bool = false, withhash: Bool = false, countBlock: GEORADIUSBYMEMBER<Member>.CountBlock? = nil, order: GEORADIUSBYMEMBER<Member>.Order? = nil, store: GEORADIUSBYMEMBER<Member>.Store? = nil) async throws -> RESPToken {
10461046
try await send(command: GEORADIUSBYMEMBER(key: key, member: member, radius: radius, unit: unit, withcoord: withcoord, withdist: withdist, withhash: withhash, countBlock: countBlock, order: order, store: store))
10471047
}
10481048

Sources/Valkey/Commands/HashCommands.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ extension ValkeyConnectionProtocol {
455455
/// - Complexity: O(1)
456456
/// - Returns: [String]: The value of the field after the increment operation.
457457
@inlinable
458-
public func hincrbyfloat<Field: RESPStringRenderable>(key: ValkeyKey, field: Field, increment: Double) async throws -> HINCRBYFLOAT.Response {
458+
public func hincrbyfloat<Field: RESPStringRenderable>(key: ValkeyKey, field: Field, increment: Double) async throws -> RESPToken {
459459
try await send(command: HINCRBYFLOAT(key: key, field: field, increment: increment))
460460
}
461461

Sources/Valkey/Commands/ListCommands.swift

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,6 @@ public struct LMPOP: ValkeyCommand {
326326
}
327327
}
328328
}
329-
public typealias Response = RESPToken.Array?
330-
331329
public var key: [ValkeyKey]
332330
public var `where`: Where
333331
public var count: Int?
@@ -714,7 +712,7 @@ extension ValkeyConnectionProtocol {
714712
/// * [Null]: If no element could be popped.
715713
/// * [Array]: List key from which elements were popped.
716714
@inlinable
717-
public func lmpop(key: [ValkeyKey], `where`: LMPOP.Where, count: Int? = nil) async throws -> RESPToken.Array? {
715+
public func lmpop(key: [ValkeyKey], `where`: LMPOP.Where, count: Int? = nil) async throws -> LMPOP.Response {
718716
try await send(command: LMPOP(key: key, where: `where`, count: count))
719717
}
720718

Sources/Valkey/Commands/ScriptingCommands.swift

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ extension ValkeyConnectionProtocol {
422422
/// - Complexity: Depends on the script that is executed.
423423
/// - Returns: Return value depends on the script that is executed
424424
@inlinable
425-
public func eval<Script: RESPStringRenderable>(script: Script, key: [ValkeyKey] = [], arg: [String] = []) async throws -> EVAL.Response {
425+
public func eval<Script: RESPStringRenderable>(script: Script, key: [ValkeyKey] = [], arg: [String] = []) async throws -> RESPToken {
426426
try await send(command: EVAL(script: script, key: key, arg: arg))
427427
}
428428

@@ -433,7 +433,7 @@ extension ValkeyConnectionProtocol {
433433
/// - Complexity: Depends on the script that is executed.
434434
/// - Returns: Return value depends on the script that is executed
435435
@inlinable
436-
public func evalsha<Sha1: RESPStringRenderable>(sha1: Sha1, key: [ValkeyKey] = [], arg: [String] = []) async throws -> EVALSHA.Response {
436+
public func evalsha<Sha1: RESPStringRenderable>(sha1: Sha1, key: [ValkeyKey] = [], arg: [String] = []) async throws -> RESPToken {
437437
try await send(command: EVALSHA(sha1: sha1, key: key, arg: arg))
438438
}
439439

@@ -444,7 +444,7 @@ extension ValkeyConnectionProtocol {
444444
/// - Complexity: Depends on the script that is executed.
445445
/// - Returns: Return value depends on the script that is executed
446446
@inlinable
447-
public func evalshaRo<Sha1: RESPStringRenderable>(sha1: Sha1, key: [ValkeyKey] = [], arg: [String] = []) async throws -> EVALSHARO.Response {
447+
public func evalshaRo<Sha1: RESPStringRenderable>(sha1: Sha1, key: [ValkeyKey] = [], arg: [String] = []) async throws -> RESPToken {
448448
try await send(command: EVALSHARO(sha1: sha1, key: key, arg: arg))
449449
}
450450

@@ -455,7 +455,7 @@ extension ValkeyConnectionProtocol {
455455
/// - Complexity: Depends on the script that is executed.
456456
/// - Returns: Return value depends on the script that is executed
457457
@inlinable
458-
public func evalRo<Script: RESPStringRenderable>(script: Script, key: [ValkeyKey] = [], arg: [String] = []) async throws -> EVALRO.Response {
458+
public func evalRo<Script: RESPStringRenderable>(script: Script, key: [ValkeyKey] = [], arg: [String] = []) async throws -> RESPToken {
459459
try await send(command: EVALRO(script: script, key: key, arg: arg))
460460
}
461461

@@ -466,7 +466,7 @@ extension ValkeyConnectionProtocol {
466466
/// - Complexity: Depends on the function that is executed.
467467
/// - Returns: Return value depends on the function that is executed
468468
@inlinable
469-
public func fcall<Function: RESPStringRenderable>(function: Function, key: [ValkeyKey] = [], arg: [String] = []) async throws -> FCALL.Response {
469+
public func fcall<Function: RESPStringRenderable>(function: Function, key: [ValkeyKey] = [], arg: [String] = []) async throws -> RESPToken {
470470
try await send(command: FCALL(function: function, key: key, arg: arg))
471471
}
472472

@@ -477,7 +477,7 @@ extension ValkeyConnectionProtocol {
477477
/// - Complexity: Depends on the function that is executed.
478478
/// - Returns: Return value depends on the function that is executed
479479
@inlinable
480-
public func fcallRo<Function: RESPStringRenderable>(function: Function, key: [ValkeyKey] = [], arg: [String] = []) async throws -> FCALLRO.Response {
480+
public func fcallRo<Function: RESPStringRenderable>(function: Function, key: [ValkeyKey] = [], arg: [String] = []) async throws -> RESPToken {
481481
try await send(command: FCALLRO(function: function, key: key, arg: arg))
482482
}
483483

@@ -550,7 +550,7 @@ extension ValkeyConnectionProtocol {
550550
/// - Complexity: O(1) (considering compilation time is redundant)
551551
/// - Returns: [String]: The library name that was loaded
552552
@inlinable
553-
public func functionLoad<FunctionCode: RESPStringRenderable>(replace: Bool = false, functionCode: FunctionCode) async throws -> FUNCTION.LOAD.Response {
553+
public func functionLoad<FunctionCode: RESPStringRenderable>(replace: Bool = false, functionCode: FunctionCode) async throws -> RESPToken {
554554
try await send(command: FUNCTION.LOAD(replace: replace, functionCode: functionCode))
555555
}
556556

@@ -635,7 +635,7 @@ extension ValkeyConnectionProtocol {
635635
/// - Complexity: O(N) with N being the length in bytes of the script body.
636636
/// - Returns: [String]: The SHA1 digest of the script added into the script cache
637637
@inlinable
638-
public func scriptLoad<Script: RESPStringRenderable>(script: Script) async throws -> SCRIPT.LOAD.Response {
638+
public func scriptLoad<Script: RESPStringRenderable>(script: Script) async throws -> RESPToken {
639639
try await send(command: SCRIPT.LOAD(script: script))
640640
}
641641

@@ -646,7 +646,7 @@ extension ValkeyConnectionProtocol {
646646
/// - Complexity: O(1).
647647
/// - Returns: [String]: Lua script if sha1 hash exists in script cache.
648648
@inlinable
649-
public func scriptShow<Sha1: RESPStringRenderable>(sha1: Sha1) async throws -> SCRIPT.SHOW.Response {
649+
public func scriptShow<Sha1: RESPStringRenderable>(sha1: Sha1) async throws -> RESPToken {
650650
try await send(command: SCRIPT.SHOW(sha1: sha1))
651651
}
652652

Sources/Valkey/Commands/SentinelCommands.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ extension ValkeyConnectionProtocol {
493493
/// - Available: 2.8.4
494494
/// - Returns: [String]: Returns OK if the current Sentinel configuration is able to reach the quorum needed to failover a primary, and the majority needed to authorize the failover.
495495
@inlinable
496-
public func sentinelCkquorum<PrimaryName: RESPStringRenderable>(primaryName: PrimaryName) async throws -> SENTINEL.CKQUORUM.Response {
496+
public func sentinelCkquorum<PrimaryName: RESPStringRenderable>(primaryName: PrimaryName) async throws -> RESPToken {
497497
try await send(command: SENTINEL.CKQUORUM(primaryName: primaryName))
498498
}
499499

Sources/Valkey/Commands/ServerCommands.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1837,7 +1837,7 @@ extension ValkeyConnectionProtocol {
18371837
/// - Complexity: O(1)
18381838
/// - Returns: [String]: Latency graph
18391839
@inlinable
1840-
public func latencyGraph<Event: RESPStringRenderable>(event: Event) async throws -> LATENCY.GRAPH.Response {
1840+
public func latencyGraph<Event: RESPStringRenderable>(event: Event) async throws -> RESPToken {
18411841
try await send(command: LATENCY.GRAPH(event: event))
18421842
}
18431843

@@ -2039,7 +2039,7 @@ extension ValkeyConnectionProtocol {
20392039
/// - Documentation: [PSYNC](https:/valkey.io/commands/psync)
20402040
/// - Available: 2.8.0
20412041
@inlinable
2042-
public func psync<Replicationid: RESPStringRenderable>(replicationid: Replicationid, offset: Int) async throws -> PSYNC.Response {
2042+
public func psync<Replicationid: RESPStringRenderable>(replicationid: Replicationid, offset: Int) async throws -> RESPToken {
20432043
try await send(command: PSYNC(replicationid: replicationid, offset: offset))
20442044
}
20452045

0 commit comments

Comments
 (0)