Skip to content

Commit 92940e3

Browse files
authored
Add subscribe functions to ValkeyClientProtocol (#237)
* Add subscribe/psubscribe to client protocol Signed-off-by: Adam Fowler <[email protected]> * Add option to disable generic strings in commands Signed-off-by: Adam Fowler <[email protected]> * Add ValkeySubscribeCommand protocol and use in ValkeyClientProtocol Signed-off-by: Adam Fowler <[email protected]> * comments Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]>
1 parent 8e99b94 commit 92940e3

9 files changed

+213
-304
lines changed

Sources/Valkey/Commands/PubsubCommands.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,12 @@ public struct SPUBLISH<Shardchannel: RESPStringRenderable, Message: RESPStringRe
196196

197197
/// Listens for messages published to shard channels.
198198
@_documentation(visibility: internal)
199-
public struct SSUBSCRIBE<Shardchannel: RESPStringRenderable>: ValkeyCommand {
199+
public struct SSUBSCRIBE: ValkeyCommand {
200200
@inlinable public static var name: String { "SSUBSCRIBE" }
201201

202-
public var shardchannels: [Shardchannel]
202+
public var shardchannels: [String]
203203

204-
@inlinable public init(shardchannels: [Shardchannel]) {
204+
@inlinable public init(shardchannels: [String]) {
205205
self.shardchannels = shardchannels
206206
}
207207

@@ -212,12 +212,12 @@ public struct SSUBSCRIBE<Shardchannel: RESPStringRenderable>: ValkeyCommand {
212212

213213
/// Listens for messages published to channels.
214214
@_documentation(visibility: internal)
215-
public struct SUBSCRIBE<Channel: RESPStringRenderable>: ValkeyCommand {
215+
public struct SUBSCRIBE: ValkeyCommand {
216216
@inlinable public static var name: String { "SUBSCRIBE" }
217217

218-
public var channels: [Channel]
218+
public var channels: [String]
219219

220-
@inlinable public init(channels: [Channel]) {
220+
@inlinable public init(channels: [String]) {
221221
self.channels = channels
222222
}
223223

Sources/Valkey/Subscriptions/ValkeyClient+subscribe.swift

Lines changed: 9 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -37,98 +37,6 @@ extension ValkeyClient {
3737
return try await operation(connection)
3838
}
3939

40-
/// Subscribe to list of channels and run closure with subscription
41-
///
42-
/// When the closure is exited the channels are automatically unsubscribed from.
43-
///
44-
/// When running subscribe from `ValkeyClient` a single connection is used for
45-
/// all subscriptions.
46-
///
47-
/// - Parameters:
48-
/// - channels: list of channels to subscribe to
49-
/// - isolation: Actor isolation
50-
/// - process: Closure that is called with subscription async sequence
51-
/// - Returns: Return value of closure
52-
@inlinable
53-
public func subscribe<Value>(
54-
to channels: String...,
55-
isolation: isolated (any Actor)? = #isolation,
56-
process: (ValkeySubscription) async throws -> sending Value
57-
) async throws -> sending Value {
58-
try await self.subscribe(to: channels, process: process)
59-
}
60-
61-
@inlinable
62-
/// Subscribe to list of channels and run closure with subscription
63-
///
64-
/// When the closure is exited the channels are automatically unsubscribed from.
65-
///
66-
/// When running subscribe from `ValkeyClient` a single connection is used for
67-
/// all subscriptions.
68-
///
69-
/// - Parameters:
70-
/// - channels: list of channels to subscribe to
71-
/// - isolation: Actor isolation
72-
/// - process: Closure that is called with subscription async sequence
73-
/// - Returns: Return value of closure
74-
public func subscribe<Value>(
75-
to channels: [String],
76-
isolation: isolated (any Actor)? = #isolation,
77-
process: (ValkeySubscription) async throws -> sending Value
78-
) async throws -> sending Value {
79-
try await self.subscribe(
80-
command: SUBSCRIBE(channels: channels),
81-
filters: channels.map { .channel($0) },
82-
process: process
83-
)
84-
}
85-
86-
/// Subscribe to list of channel patterns and run closure with subscription
87-
///
88-
/// When the closure is exited the patterns are automatically unsubscribed from.
89-
///
90-
/// When running subscribe from `ValkeyClient` a single connection is used for
91-
/// all subscriptions.
92-
///
93-
/// - Parameters:
94-
/// - patterns: list of channel patterns to subscribe to
95-
/// - isolation: Actor isolation
96-
/// - process: Closure that is called with subscription async sequence
97-
/// - Returns: Return value of closure
98-
@inlinable
99-
public func psubscribe<Value>(
100-
to patterns: String...,
101-
isolation: isolated (any Actor)? = #isolation,
102-
process: (ValkeySubscription) async throws -> sending Value
103-
) async throws -> sending Value {
104-
try await self.psubscribe(to: patterns, process: process)
105-
}
106-
107-
/// Subscribe to list of pattern matching channels and run closure with subscription
108-
///
109-
/// When the closure is exited the patterns are automatically unsubscribed from.
110-
///
111-
/// When running subscribe from `ValkeyClient` a single connection is used for
112-
/// all subscriptions.
113-
///
114-
/// - Parameters:
115-
/// - patterns: list of channel patterns to subscribe to
116-
/// - isolation: Actor isolation
117-
/// - process: Closure that is called with subscription async sequence
118-
/// - Returns: Return value of closure
119-
@inlinable
120-
public func psubscribe<Value>(
121-
to patterns: [String],
122-
isolation: isolated (any Actor)? = #isolation,
123-
process: (ValkeySubscription) async throws -> sending Value
124-
) async throws -> sending Value {
125-
try await self.subscribe(
126-
command: PSUBSCRIBE(patterns: patterns),
127-
filters: patterns.map { .pattern($0) },
128-
process: process
129-
)
130-
}
131-
13240
/// Subscribe to key invalidation channel required for client-side caching
13341
///
13442
/// See https://valkey.io/topics/client-side-caching/ for more details. The `process`
@@ -159,15 +67,20 @@ extension ValkeyClient {
15967
}
16068
}
16169

70+
/// Execute subscribe command and run closure using related ``ValkeySubscription``
71+
/// AsyncSequence
72+
///
73+
/// This should not be called directly, used the related commands
74+
/// ``ValkeyClient/subscribe(to:isolation:process:)`` or
75+
/// ``ValkeyClient/psubscribe(to:isolation:process:)``
16276
@inlinable
163-
func subscribe<Value>(
164-
command: some ValkeyCommand,
165-
filters: [ValkeySubscriptionFilter],
77+
public func _subscribe<Value>(
78+
command: some ValkeySubscribeCommand,
16679
isolation: isolated (any Actor)? = #isolation,
16780
process: (ValkeySubscription) async throws -> sending Value
16881
) async throws -> sending Value {
16982
try await self.withSubscriptionConnection { connection in
170-
try await connection.subscribe(command: command, filters: filters, process: process)
83+
try await connection._subscribe(command: command, isolation: isolation, process: process)
17184
}
17285
}
17386
}

Sources/Valkey/Subscriptions/ValkeyClusterClient+subscribe.swift

Lines changed: 9 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -37,98 +37,6 @@ extension ValkeyClusterClient {
3737
return try await operation(connection)
3838
}
3939

40-
/// Subscribe to list of channels and run closure with subscription
41-
///
42-
/// When the closure is exited the channels are automatically unsubscribed from.
43-
///
44-
/// When running subscribe from `ValkeyClient` a single connection is used for
45-
/// all subscriptions.
46-
///
47-
/// - Parameters:
48-
/// - channels: list of channels to subscribe to
49-
/// - isolation: Actor isolation
50-
/// - process: Closure that is called with subscription async sequence
51-
/// - Returns: Return value of closure
52-
@inlinable
53-
public func subscribe<Value>(
54-
to channels: String...,
55-
isolation: isolated (any Actor)? = #isolation,
56-
process: (ValkeySubscription) async throws -> sending Value
57-
) async throws -> sending Value {
58-
try await self.subscribe(to: channels, process: process)
59-
}
60-
61-
@inlinable
62-
/// Subscribe to list of channels and run closure with subscription
63-
///
64-
/// When the closure is exited the channels are automatically unsubscribed from.
65-
///
66-
/// When running subscribe from `ValkeyClient` a single connection is used for
67-
/// all subscriptions.
68-
///
69-
/// - Parameters:
70-
/// - channels: list of channels to subscribe to
71-
/// - isolation: Actor isolation
72-
/// - process: Closure that is called with subscription async sequence
73-
/// - Returns: Return value of closure
74-
public func subscribe<Value>(
75-
to channels: [String],
76-
isolation: isolated (any Actor)? = #isolation,
77-
process: (ValkeySubscription) async throws -> sending Value
78-
) async throws -> sending Value {
79-
try await self.subscribe(
80-
command: SUBSCRIBE(channels: channels),
81-
filters: channels.map { .channel($0) },
82-
process: process
83-
)
84-
}
85-
86-
/// Subscribe to list of channel patterns and run closure with subscription
87-
///
88-
/// When the closure is exited the patterns are automatically unsubscribed from.
89-
///
90-
/// When running subscribe from `ValkeyClient` a single connection is used for
91-
/// all subscriptions.
92-
///
93-
/// - Parameters:
94-
/// - patterns: list of channel patterns to subscribe to
95-
/// - isolation: Actor isolation
96-
/// - process: Closure that is called with subscription async sequence
97-
/// - Returns: Return value of closure
98-
@inlinable
99-
public func psubscribe<Value>(
100-
to patterns: String...,
101-
isolation: isolated (any Actor)? = #isolation,
102-
process: (ValkeySubscription) async throws -> sending Value
103-
) async throws -> sending Value {
104-
try await self.psubscribe(to: patterns, process: process)
105-
}
106-
107-
/// Subscribe to list of pattern matching channels and run closure with subscription
108-
///
109-
/// When the closure is exited the patterns are automatically unsubscribed from.
110-
///
111-
/// When running subscribe from `ValkeyClient` a single connection is used for
112-
/// all subscriptions.
113-
///
114-
/// - Parameters:
115-
/// - patterns: list of channel patterns to subscribe to
116-
/// - isolation: Actor isolation
117-
/// - process: Closure that is called with subscription async sequence
118-
/// - Returns: Return value of closure
119-
@inlinable
120-
public func psubscribe<Value>(
121-
to patterns: [String],
122-
isolation: isolated (any Actor)? = #isolation,
123-
process: (ValkeySubscription) async throws -> sending Value
124-
) async throws -> sending Value {
125-
try await self.subscribe(
126-
command: PSUBSCRIBE(patterns: patterns),
127-
filters: patterns.map { .pattern($0) },
128-
process: process
129-
)
130-
}
131-
13240
/// Subscribe to key invalidation channel required for client-side caching
13341
///
13442
/// See https://valkey.io/topics/client-side-caching/ for more details. The `process`
@@ -159,15 +67,20 @@ extension ValkeyClusterClient {
15967
}
16068
}
16169

70+
/// Execute subscribe command and run closure using related ``ValkeySubscription``
71+
/// AsyncSequence
72+
///
73+
/// This should not be called directly, used the related commands
74+
/// ``ValkeyClusterClient/subscribe(to:isolation:process:)`` or
75+
/// ``ValkeyClusterClient/psubscribe(to:isolation:process:)``
16276
@inlinable
163-
func subscribe<Value>(
164-
command: some ValkeyCommand,
165-
filters: [ValkeySubscriptionFilter],
77+
public func _subscribe<Value>(
78+
command: some ValkeySubscribeCommand,
16679
isolation: isolated (any Actor)? = #isolation,
16780
process: (ValkeySubscription) async throws -> sending Value
16881
) async throws -> sending Value {
16982
try await self.withSubscriptionConnection { connection in
170-
try await connection.subscribe(command: command, filters: filters, process: process)
83+
try await connection._subscribe(command: command, isolation: isolation, process: process)
17184
}
17285
}
17386
}

0 commit comments

Comments
 (0)