@@ -2,6 +2,7 @@ package push
22
33import (
44 "context"
5+ "time"
56
67 "github.com/redis/go-redis/v9/internal"
78 "github.com/redis/go-redis/v9/internal/proto"
@@ -51,8 +52,23 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
5152 if rd == nil {
5253 return nil
5354 }
55+ conn := handlerCtx .Conn
56+ if conn == nil {
57+ return nil
58+ }
59+ netConn := handlerCtx .Conn .GetNetConn ()
60+ if netConn == nil {
61+ return nil
62+ }
5463
5564 for {
65+ // Set a short read deadline to check for available data
66+ // otherwise we may block on Peek if there is no data available
67+ err := netConn .SetReadDeadline (time .Now ().Add (1 ))
68+ if err != nil {
69+ return err
70+ }
71+
5672 // Check if there's data available to read
5773 replyType , err := rd .PeekReplyType ()
5874 if err != nil {
@@ -104,7 +120,7 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
104120 }
105121 }
106122
107- return nil
123+ return netConn . SetReadDeadline (time. Time {})
108124}
109125
110126// VoidProcessor discards all push notifications without processing them
@@ -133,12 +149,26 @@ func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error {
133149// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
134150// are only available in RESP3 and this processor is used for RESP2 connections.
135151// This avoids unnecessary buffer scanning overhead.
136- func (v * VoidProcessor ) ProcessPendingNotifications (_ context.Context , _ NotificationHandlerContext , rd * proto.Reader ) error {
152+ func (v * VoidProcessor ) ProcessPendingNotifications (_ context.Context , handlerCtx NotificationHandlerContext , rd * proto.Reader ) error {
137153 // read and discard all push notifications
138154 if rd == nil {
139155 return nil
140156 }
157+ conn := handlerCtx .Conn
158+ if conn == nil {
159+ return nil
160+ }
161+ netConn := handlerCtx .Conn .GetNetConn ()
162+ if netConn == nil {
163+ return nil
164+ }
141165 for {
166+ // Set a short read deadline to check for available data
167+ err := netConn .SetReadDeadline (time .Now ().Add (1 ))
168+ if err != nil {
169+ return err
170+ }
171+ // Check if there's data available to read
142172 replyType , err := rd .PeekReplyType ()
143173 if err != nil {
144174 // No more data available or error reading
@@ -166,7 +196,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ Notific
166196 return nil
167197 }
168198 }
169- return nil
199+ return netConn . SetReadDeadline (time. Time {})
170200}
171201
172202// willHandleNotificationInClient checks if a notification type should be ignored by the push notification
0 commit comments