|
6 | 6 | "strconv" |
7 | 7 | "time" |
8 | 8 |
|
| 9 | + "github.com/redis/go-redis/v9/internal" |
9 | 10 | "github.com/redis/go-redis/v9/internal/interfaces" |
10 | 11 | "github.com/redis/go-redis/v9/internal/pool" |
11 | 12 | "github.com/redis/go-redis/v9/push" |
@@ -109,26 +110,38 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus |
109 | 110 | return ErrInvalidNotification |
110 | 111 | } |
111 | 112 |
|
112 | | - // TODO(hitless): if newEndpoint is empty, we should schedule a handoff to the current endpoint in timeS/2 seconds |
| 113 | + deadline := time.Now().Add(time.Duration(timeS) * time.Second) |
| 114 | + // If newEndpoint is empty, we should schedule a handoff to the current endpoint in timeS/2 seconds |
| 115 | + if newEndpoint == "" || newEndpoint == internal.RedisNull { |
| 116 | + // same as current endpoint |
| 117 | + newEndpoint = snh.manager.options.GetAddr() |
| 118 | + // delay the handoff for timeS/2 seconds to the same endpoint |
| 119 | + // do this in a goroutine to avoid blocking the notification handler |
| 120 | + go func() { |
| 121 | + time.Sleep(time.Duration(timeS/2) * time.Second) |
| 122 | + snh.markConnForHandoff(poolConn, newEndpoint, seqID, deadline) |
| 123 | + }() |
| 124 | + return nil |
| 125 | + } |
| 126 | + |
| 127 | + return snh.markConnForHandoff(poolConn, newEndpoint, seqID, deadline) |
| 128 | +} |
113 | 129 |
|
114 | | - // Mark the connection for handoff |
115 | | - if err := poolConn.MarkForHandoff(newEndpoint, seqID); err != nil { |
| 130 | +func (snh *NotificationHandler) markConnForHandoff(conn *pool.Conn, newEndpoint string, seqID int64, deadline time.Time) error { |
| 131 | + if err := conn.MarkForHandoff(newEndpoint, seqID); err != nil { |
116 | 132 | // Connection is already marked for handoff, which is acceptable |
117 | 133 | // This can happen if multiple MOVING notifications are received for the same connection |
118 | 134 | return nil |
119 | 135 | } |
120 | | - |
121 | 136 | // Optionally track in hitless manager for monitoring/debugging |
122 | 137 | if snh.manager != nil { |
123 | | - connID := poolConn.GetID() |
124 | | - deadline := time.Now().Add(time.Duration(timeS) * time.Second) |
| 138 | + connID := conn.GetID() |
125 | 139 |
|
126 | 140 | // Track the operation (ignore errors since this is optional) |
127 | 141 | _ = snh.manager.TrackMovingOperationWithConnID(context.Background(), newEndpoint, deadline, seqID, connID) |
128 | 142 | } else { |
129 | 143 | return fmt.Errorf("hitless: manager not initialized") |
130 | 144 | } |
131 | | - |
132 | 145 | return nil |
133 | 146 | } |
134 | 147 |
|
|
0 commit comments