Skip to content

Commit 00eb9b5

Browse files
committed
Unify PubSub Handler Signature
Motivation Right now the PubSub handlers are split into three separate closures, with the subscribe/unsubscribe handlers being optional. This won't play well with AsyncStream for being able to respond to all events that a PubSub subscription can cause. Additionally, the current structure is very verbose in code to maintain - but also adds complexity to developers who are first getting started to understand the lifecycle of PubSub events. Changes - Add: New `RedisPubSubEvent` enum that captures the subscribe, unsubscribe, and message lifecycle events - Add: New `RedisPubSubEventReceiver` that combines the previous 3 closure types - Add: Dedicated DocC Symbol Extension file for `RedisPubSubHandler` - Change: `RedisClient.subscribe` and `RedisClient.psubscribe` method signatures to only require a single unlabeled closure - Rename: `RedisUnsubscribeEventSource` to `RedisPubSubEvent.UnsubscribeEventSource` - Remove: `RedisSubscriptionMessageReceiver`, `RedisSubscriptionChangeDetails`, `RedisSubscribeHandler`, and `RedisUnsubscribeHandler` types Result Developers should have a much easier time getting started and understanding PubSub with assistance from the compiler with types to understand what they're being given and what's available to them as information to make more informed decisions in their app logic.
1 parent 459f2cc commit 00eb9b5

File tree

11 files changed

+296
-275
lines changed

11 files changed

+296
-275
lines changed

Sources/RediStack/ChannelHandlers/RedisPubSubHandler.swift

Lines changed: 57 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -14,58 +14,52 @@
1414

1515
import NIO
1616

17-
/// A closure receiver of individual Pub/Sub messages from Redis subscriptions to channels and patterns.
18-
/// - Warning: The receiver is called on the same `NIO.EventLoop` that processed the message.
19-
///
20-
/// If you are doing non-trivial work in response to PubSub messages, it is **highly recommended** that the work be dispatched to another thread
21-
/// so as to not block further messages from being processed.
22-
/// - Parameters:
23-
/// - publisher: The name of the channel that published the message.
24-
/// - message: The message data that was received from the `publisher`.
25-
public typealias RedisSubscriptionMessageReceiver = (_ publisher: RedisChannelName, _ message: RESPValue) -> Void
26-
27-
/// The details of the subscription change.
28-
/// - Parameters:
29-
/// - subscriptionKey: The subscribed channel or pattern that had its subscription status changed.
30-
/// - currentSubscriptionCount: The current total number of subscriptions the connection has.
31-
public typealias RedisSubscriptionChangeDetails = (subscriptionKey: String, currentSubscriptionCount: Int)
32-
33-
/// A closure handler invoked for Pub/Sub subscribe commands.
34-
///
35-
/// This closure will be invoked only *once* for each individual channel or pattern that is having its subscription changed,
36-
/// even if it was done as a single PSUBSCRIBE or SUBSCRIBE command.
37-
/// - Warning: The receiver is called on the same `NIO.EventLoop` that processed the message.
38-
///
39-
/// If you are doing non-trivial work in response to PubSub messages, it is **highly recommended** that the work be dispatched to another thread
40-
/// so as to not block further messages from being processed.
41-
/// - Parameter details: The details of the subscription.
42-
public typealias RedisSubscribeHandler = (_ details: RedisSubscriptionChangeDetails) -> Void
43-
44-
/// An enumeration of possible sources of Pub/Sub unsubscribe events.
45-
public enum RedisUnsubscribeEventSource {
46-
/// The client sent an unsubscribe command either as UNSUBSCRIBE or PUNSUBSCRIBE.
47-
case userInitiated
48-
/// The client encountered an error and had to unsubscribe.
49-
/// - Parameter _: The error the client encountered.
50-
case clientError(_ error: Error)
17+
/// The possible events that are received from Redis Pub/Sub channels.
18+
public enum RedisPubSubEvent {
19+
/// The available sources of Pub/Sub unsubscribe events.
20+
public enum UnsubscribeEventSource {
21+
/// The client sent an unsubscribe command either as UNSUBSCRIBE or PUNSUBSCRIBE.
22+
case userInitiated
23+
/// The client encountered an error and had to unsubscribe.
24+
/// - Parameter _: The error the client encountered.
25+
case clientError(_ error: Error)
26+
}
27+
28+
/// The connection has been subscribed to a channel.
29+
///
30+
/// This event should only be received once, before receiving messages.
31+
/// - Parameters:
32+
/// - key: The subscribed channel or pattern that was subscribed to.
33+
/// - currentSubscriptionCount: The current total number of subscriptions the connection has after subscribing.
34+
case subscribed(key: String, currentSubscriptionCount: Int)
35+
/// The connection has been unsubscribed from a channel.
36+
///
37+
/// This event should only be received once, after all messages received have been processed, with no further messages being received.
38+
/// - Parameters:
39+
/// - key: The subscribed channel or pattern that was unsubscribed from.
40+
/// - currentSubscriptionCount: The current total number of subscriptions the connection has after unsubscribing.
41+
/// - source: The source of the unsubscribe event.
42+
case unsubscribed(key: String, currentSubscriptionCount: Int, source: UnsubscribeEventSource)
43+
/// The connection has received a message on the given channel.
44+
///
45+
/// This event can be received an infinite number of times, until the connection has unsubscribed from the channel.
46+
/// - Parameters:
47+
/// - publisher: The name of the channel that published the message.
48+
/// - message: The message data that was received from the `publisher`.
49+
case message(publisher: RedisChannelName, message: RESPValue)
5150
}
5251

53-
/// A closure handler invoked for Pub/Sub unsubscribe commands.
52+
/// A closure receiver of individual Pub/Sub events from Redis subscriptions to channels and patterns.
53+
/// - Warning: The receiver is called on the same `NIO.EventLoop` that processed the message (the `EventLoop` of the `NIO.ChannelPipeline`).
5454
///
55-
/// This closure will be invoked only *once* for each individual channel or pattern that is having its subscription changed,
56-
/// even if it was done as a single PUNSUBSCRIBE or UNSUBSCRIBE command.
57-
/// - Warning: The receiver is called on the same `NIO.EventLoop` that processed the message.
58-
///
59-
/// If you are doing non-trivial work in response to PubSub messages, it is **highly recommended** that the work be dispatched to another thread
60-
/// so as to not block further messages from being processed.
61-
/// - Parameters:
62-
/// - details: The details of the subscription.
63-
/// - source: The source of the unsubscribe event.
64-
public typealias RedisUnsubscribeHandler = (_ details: RedisSubscriptionChangeDetails, _ source: RedisUnsubscribeEventSource) -> Void
55+
/// If you are doing non-trivial work in response to PubSub messages, it is **highly recommended** that the work
56+
/// be dispatched to another thread, so as to not block further messages from being processed.
57+
/// - Parameter event: The event that the connection is responding to.
58+
public typealias RedisPubSubEventReceiver = (_ event: RedisPubSubEvent) -> Void
6559

6660
/// A list of patterns or channels that a Pub/Sub subscription change is targetting.
6761
///
68-
/// See `RedisChannelName`, [PSUBSCRIBE](https://redis.io/commands/psubscribe) and [SUBSCRIBE](https://redis.io/commands/subscribe)
62+
/// See ``RedisChannelName`` or the Redis documentation on [PSUBSCRIBE](https://redis.io/commands/psubscribe) and [SUBSCRIBE](https://redis.io/commands/subscribe).
6963
///
7064
/// Use the `values` property to quickly access the underlying list of the target for any purpose that requires a the `String` values.
7165
public enum RedisSubscriptionTarget: Equatable, CustomDebugStringConvertible {
@@ -97,33 +91,6 @@ public enum RedisSubscriptionTarget: Equatable, CustomDebugStringConvertible {
9791
}
9892

9993
/// A channel handler that stores a map of closures and channel or pattern names subscribed to in Redis using Pub/Sub.
100-
///
101-
/// These `RedisPubSubMessageReceiver` closures are added and removed using methods directly on an instance of this handler.
102-
///
103-
/// When a receiver is added or removed, the handler will send the appropriate subscribe or unsubscribe message to Redis so that the connection
104-
/// reflects the local Channel state.
105-
///
106-
/// # ChannelInboundHandler
107-
/// This handler is designed to be placed _before_ a `RedisCommandHandler` so that it can intercept Pub/Sub messages and dispatch them to the appropriate
108-
/// receiver.
109-
///
110-
/// If a response is not in the Pub/Sub message format as specified by Redis, then it is treated as a normal Redis command response and sent further into
111-
/// the pipeline so that eventually a `RedisCommandHandler` can process it.
112-
///
113-
/// # ChannelOutboundHandler
114-
/// This handler is what is defined as a "transparent" `NIO.ChannelOutboundHandler` in that it does absolutely nothing except forward outgoing commands
115-
/// in the pipeline.
116-
///
117-
/// The reason why this handler needs to conform to this protocol at all, is that subscribe and unsubscribe commands are executed outside of a normal
118-
/// `NIO.Channel.write(_:)` cycle, as message receivers aren't command arguments and need to be stored.
119-
///
120-
/// All of this is outside the responsibility of the `RedisCommandHandler`,
121-
/// so the `RedisPubSubHandler` uses its own `NIO.ChannelHandlerContext` being before the command handler to short circuit the pipeline.
122-
///
123-
/// # RemovableChannelHandler
124-
/// As a connection can move in and out of "PubSub mode", this handler is can be added and removed from a `NIO.ChannelPipeline` as needed.
125-
///
126-
/// When the handler has received a `removeHandler(context:removalToken:)` request, it will remove itself immediately.
12794
public final class RedisPubSubHandler {
12895
private var state: State = .default
12996

@@ -172,8 +139,7 @@ extension RedisPubSubHandler {
172139

173140
guard let subscription = self.subscriptions[prefixedKey] else { return }
174141

175-
subscription.onSubscribe?((subscriptionKey, subscriptionCount))
176-
subscription.onSubscribe = nil // nil to free memory
142+
subscription.onEvent(.subscribed(key: subscriptionKey, currentSubscriptionCount: subscriptionCount))
177143
self.subscriptions[prefixedKey] = subscription
178144

179145
subscription.type.gauge.increment()
@@ -188,7 +154,11 @@ extension RedisPubSubHandler {
188154
let prefixedKey = self.prefixKey(subscriptionKey, with: keyPrefix)
189155
guard let subscription = self.subscriptions.removeValue(forKey: prefixedKey) else { return }
190156

191-
subscription.onUnsubscribe?((subscriptionKey, subscriptionCount), .userInitiated)
157+
subscription.onEvent(.unsubscribed(
158+
key: subscriptionKey,
159+
currentSubscriptionCount: subscriptionCount,
160+
source: .userInitiated
161+
))
192162
subscription.type.gauge.decrement()
193163

194164
switch self.pendingUnsubscribes.removeValue(forKey: prefixedKey) {
@@ -215,36 +185,27 @@ extension RedisPubSubHandler {
215185
keyPrefix: String
216186
) {
217187
guard let subscription = self.subscriptions[self.prefixKey(subscriptionKey, with: keyPrefix)] else { return }
218-
subscription.onMessage(channel, message)
188+
subscription.onEvent(.message(publisher: channel, message: message))
219189
RedisMetrics.subscriptionMessagesReceivedCount.increment()
220190
}
221191
}
222192

223193
// MARK: Subscription Management
224194

225195
extension RedisPubSubHandler {
226-
/// Registers the provided subscription message receiver to receive messages from the specified subscription target.
196+
/// Registers the provided subscription event handler to receive events from the specified subscription target.
227197
/// - Important: Any previously registered receiver will be replaced and not notified.
228198
/// - Parameters:
229199
/// - target: The channels or patterns that the receiver should receive messages for.
230-
/// - receiver: The closure that receives any future pub/sub messages.
231-
/// - subscribeHandler: An optional closure to invoke when the subscription first becomes active.
232-
/// - unsubscribeHandler: An optional closure to invoke when the subscription becomes inactive.
200+
/// - receiver: The closure that receives any future pub/sub events.
233201
/// - Returns: A `NIO.EventLoopFuture` that resolves the number of subscriptions the client has after the subscription has been added.
234202
public func addSubscription(
235203
for target: RedisSubscriptionTarget,
236-
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
237-
onSubscribe subscribeHandler: RedisSubscribeHandler?,
238-
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
204+
receiver: @escaping RedisPubSubEventReceiver
239205
) -> EventLoopFuture<Int> {
240206
guard self.eventLoop.inEventLoop else {
241207
return self.eventLoop.flatSubmit {
242-
return self.addSubscription(
243-
for: target,
244-
messageReceiver: receiver,
245-
onSubscribe: subscribeHandler,
246-
onUnsubscribe: unsubscribeHandler
247-
)
208+
return self.addSubscription(for: target, receiver: receiver)
248209
}
249210
}
250211

@@ -260,12 +221,7 @@ extension RedisPubSubHandler {
260221

261222
let newSubscriptionTargets = target.values
262223
.compactMap { (targetKey) -> String? in
263-
let subscription = Subscription(
264-
type: target.subscriptionType,
265-
messageReceiver: receiver,
266-
subscribeHandler: subscribeHandler,
267-
unsubscribeHandler: unsubscribeHandler
268-
)
224+
let subscription = Subscription(type: target.subscriptionType, eventReceiver: receiver)
269225
let prefixedKey = self.prefixKey(targetKey, with: target.keyPrefix)
270226
guard self.subscriptions.updateValue(subscription, forKey: prefixedKey) == nil else { return nil }
271227
return targetKey
@@ -507,8 +463,8 @@ extension RedisPubSubHandler: ChannelInboundHandler {
507463
let receivers = self.subscriptions
508464
self.subscriptions.removeAll()
509465
receivers.forEach {
510-
let source: RedisUnsubscribeEventSource = error.map { .clientError($0) } ?? .userInitiated
511-
$0.value.onUnsubscribe?(($0.key, 0), source)
466+
let source: RedisPubSubEvent.UnsubscribeEventSource = error.map { .clientError($0) } ?? .userInitiated
467+
$0.value.onEvent(.unsubscribed(key: $0.key, currentSubscriptionCount: 0, source: source))
512468
$0.value.type.gauge.decrement()
513469
}
514470
}
@@ -547,20 +503,11 @@ extension RedisPubSubHandler {
547503

548504
fileprivate final class Subscription {
549505
let type: SubscriptionType
550-
let onMessage: RedisSubscriptionMessageReceiver
551-
var onSubscribe: RedisSubscribeHandler? // will be set to nil after first call
552-
let onUnsubscribe: RedisUnsubscribeHandler?
506+
let onEvent: RedisPubSubEventReceiver
553507

554-
init(
555-
type: SubscriptionType,
556-
messageReceiver: @escaping RedisSubscriptionMessageReceiver,
557-
subscribeHandler: RedisSubscribeHandler?,
558-
unsubscribeHandler: RedisUnsubscribeHandler?
559-
) {
508+
init(type: SubscriptionType, eventReceiver: @escaping RedisPubSubEventReceiver) {
560509
self.type = type
561-
self.onMessage = messageReceiver
562-
self.onSubscribe = subscribeHandler
563-
self.onUnsubscribe = unsubscribeHandler
510+
self.onEvent = eventReceiver
564511
}
565512
}
566513

Sources/RediStack/Documentation.docc/RediStack.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ print(result) // Optional("some value")
4141
### Pub/Sub
4242

4343
- ``RedisChannelName``
44+
- ``RedisPubSubEventReceiver``
4445

4546
### Error Handling
4647

File renamed without changes.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# ``RediStack/RedisPubSubHandler``
2+
3+
``RedisPubSubEventReceiver`` closures are added and removed using methods directly on an instance of this handler.
4+
5+
When a receiver is added or removed, the handler will send the appropriate subscribe or unsubscribe message to Redis so that the connection
6+
reflects the local Channel state.
7+
8+
## ChannelInboundHandler
9+
This handler is designed to be placed _before_ a ``RedisCommandHandler`` so that it can intercept Pub/Sub messages and dispatch them to the appropriate
10+
receiver.
11+
12+
If a response is not in the Pub/Sub message format as specified by Redis, then it is treated as a normal Redis command response and sent further into
13+
the pipeline so that eventually a ``RedisCommandHandler`` can process it.
14+
15+
## ChannelOutboundHandler
16+
This handler is what is defined as a "transparent" `NIO.ChannelOutboundHandler` in that it does absolutely nothing except forward outgoing commands
17+
in the pipeline.
18+
19+
The reason why this handler needs to conform to this protocol at all, is that subscribe and unsubscribe commands are executed outside of a normal
20+
`NIO.Channel.write(_:)` cycle, as message receivers aren't command arguments and need to be stored.
21+
22+
All of this is outside the responsibility of the ``RedisCommandHandler``,
23+
so the ``RedisPubSubHandler`` uses its own `NIO.ChannelHandlerContext` being before the command handler to short circuit the pipeline.
24+
25+
## RemovableChannelHandler
26+
As a connection can move in and out of "PubSub mode", this handler can be added and removed from a `NIO.ChannelPipeline` as needed.
27+
28+
When the handler has received a `removeHandler(context:removalToken:)` request, it will remove itself immediately.
29+
30+
## Topics
31+
32+
### Managing Subscriptions
33+
34+
- ``RedisSubscriptionTarget``
35+
- ``addSubscription(for:receiver:)``
36+
- ``removeSubscription(for:)``
37+
38+
### Pub/Sub Events
39+
40+
- ``RedisPubSubEvent``
41+
- ``RedisPubSubEventReceiver``

0 commit comments

Comments
 (0)