@@ -2,6 +2,7 @@ package push
22
33import (
44 "context"
5+ "time"
56
67 "github.com/redis/go-redis/v9/internal"
78 "github.com/redis/go-redis/v9/internal/proto"
@@ -51,8 +52,19 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
5152 if rd == nil {
5253 return nil
5354 }
55+ conn := handlerCtx .Conn
56+ if conn == nil {
57+ return nil
58+ }
59+ netConn := handlerCtx .Conn .GetNetConn ()
60+ if netConn == nil {
61+ return nil
62+ }
5463
5564 for {
65+ // Set a short read deadline to check for available data
66+ // otherwise we may block on Peek if there is no data available
67+ netConn .SetReadDeadline (time .Now ().Add (1 ))
5668 // Check if there's data available to read
5769 replyType , err := rd .PeekReplyType ()
5870 if err != nil {
@@ -104,6 +116,7 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
104116 }
105117 }
106118
119+ netConn .SetReadDeadline (time.Time {})
107120 return nil
108121}
109122
@@ -133,12 +146,23 @@ func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error {
133146// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
134147// are only available in RESP3 and this processor is used for RESP2 connections.
135148// This avoids unnecessary buffer scanning overhead.
136- func (v * VoidProcessor ) ProcessPendingNotifications (_ context.Context , _ NotificationHandlerContext , rd * proto.Reader ) error {
149+ func (v * VoidProcessor ) ProcessPendingNotifications (_ context.Context , handlerCtx NotificationHandlerContext , rd * proto.Reader ) error {
137150 // read and discard all push notifications
138151 if rd == nil {
139152 return nil
140153 }
154+ conn := handlerCtx .Conn
155+ if conn == nil {
156+ return nil
157+ }
158+ netConn := handlerCtx .Conn .GetNetConn ()
159+ if netConn == nil {
160+ return nil
161+ }
141162 for {
163+ // Set a short read deadline to check for available data
164+ netConn .SetReadDeadline (time .Now ().Add (1 ))
165+ // Check if there's data available to read
142166 replyType , err := rd .PeekReplyType ()
143167 if err != nil {
144168 // No more data available or error reading
@@ -166,6 +190,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ Notific
166190 return nil
167191 }
168192 }
193+ netConn .SetReadDeadline (time.Time {})
169194 return nil
170195}
171196
@@ -174,7 +199,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ Notific
174199func willHandleNotificationInClient (notificationType string ) bool {
175200 switch notificationType {
176201 // Pub/Sub notifications - handled by pub/sub system
177- case "message" , // Regular pub/sub message
202+ case "message" , // Regular pub/sub message
178203 "pmessage" , // Pattern pub/sub message
179204 "subscribe" , // Subscription confirmation
180205 "unsubscribe" , // Unsubscription confirmation
0 commit comments