@@ -204,7 +204,8 @@ extension RedisPubSubHandler {
204204 /// - receiver: The closure that receives any future pub/sub messages.
205205 /// - subscribeHandler: An optional closure to invoke when the subscription first becomes active.
206206 /// - unsubscribeHandler: An optional closure to invoke when the subscription becomes inactive.
207- /// - Returns: A `NIO.EventLoopFuture` that resolves the number of subscriptions the client has after the subscription has been added.
207+ /// - Returns: A `NIO.EventLoopFuture` that resolves the number of subscriptions the client has after the
208+ /// subscription has been added.
208209 public func addSubscription(
209210 for target: RedisSubscriptionTarget ,
210211 messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver ,
@@ -228,7 +229,8 @@ extension RedisPubSubHandler {
228229 case let . error( e) : return self . eventLoop. makeFailedFuture ( e)
229230
230231 case . default:
231- // go through all the target patterns/names and update the map with the new receiver if it's already registered
232+ // go through all the target patterns/names and update the map with the new receiver
233+ // if it's already registered
232234 // if it was a new registration, not an update, we keep that name to send to Redis
233235 // we do this so that we save on data transfer bandwidth
234236
@@ -312,7 +314,8 @@ extension RedisPubSubHandler {
312314 let pendingSubscriptions : [ ( String , EventLoopPromise < Int > ) ] = targets. map {
313315 ( self . prefixKey ( $0, with: keyPrefix) , self . eventLoop. makePromise ( ) )
314316 }
315- // add the subscription change handler to the appropriate queue for each individual subscription target
317+ // add the subscription change handler to the appropriate queue
318+ // for each individual subscription target
316319 for (key, value) in pendingSubscriptions {
317320 self [ keyPath: pendingQueue] . updateValue ( value, forKey: key)
318321 }
@@ -324,19 +327,23 @@ extension RedisPubSubHandler {
324327 on: self . eventLoop
325328 )
326329 . flatMapThrowing { ( results) -> Int in
330+ /*
331+ if we have no success cases, we will still
332+ have at least one response that we can rely on the 'get'
333+ method to throw the error for us, rather than unwrapping it
334+ ourselves
335+ */
336+
337+ let latestSubscriptionCount = results
338+ . lazy
339+ . reversed ( )
340+ . compactMap ( { try ? $0. get ( ) } )
341+ . first
342+
327343 // trust the last success response as the most current count
328- guard
329- let latestSubscriptionCount = results
330- . lazy
331- // reverse to save time-complexity,
332- // as we just need the last (first) successful value
333- . reversed ( )
334- . compactMap ( { try ? $0. get ( ) } )
335- . first
336- // if we have no success cases, we will still have at least
337- // one response that we can rely on the 'get' method to
338- // throw the error for us, rather than unwrapping it ourselves
339- else { return try results. first!. get ( ) }
344+ guard let latestSubscriptionCount else {
345+ return try results. first!. get ( )
346+ }
340347
341348 return latestSubscriptionCount
342349 }
@@ -396,7 +403,8 @@ extension RedisPubSubHandler: ChannelInboundHandler {
396403 // if it is, we handle it here
397404
398405 // Redis defines the format as [messageKeyword: String, channelName: String, message: RESPValue]
399- // unless the messageType is 'pmessage', in which case it's [messageKeyword, pattern: String, channelName, message]
406+ // unless the messageType is 'pmessage', in which case it's [messageKeyword, pattern: String,
407+ // channelName, message]
400408
401409 // these guards extract some of the basic details of a pubsub message
402410 guard
@@ -428,7 +436,8 @@ extension RedisPubSubHandler: ChannelInboundHandler {
428436 case " pmessage " :
429437 self . handleMessage (
430438 message,
431- from: . init( array [ 2 ] . string!) , // the channel name is stored as the 3rd element in the array in 'pmessage' streams
439+ from: . init( array [ 2 ] . string!) ,
440+ // the channel name is stored as the 3rd element in the array in 'pmessage' streams
432441 withSubscriptionKey: channelOrPattern,
433442 keyPrefix: kSubscriptionKeyPrefixPattern
434443 )
0 commit comments