Skip to content

Commit b34f827

Browse files
committed
separate worker from pool hook
1 parent 73ff273 commit b34f827

File tree

3 files changed

+442
-376
lines changed

3 files changed

+442
-376
lines changed

hitless/handoff_worker.go

Lines changed: 394 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,394 @@
1+
package hitless
2+
3+
import (
4+
"context"
5+
"net"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
10+
"github.com/redis/go-redis/v9/internal"
11+
"github.com/redis/go-redis/v9/internal/pool"
12+
)
13+
14+
// handoffWorkerManager manages background workers and queue for connection handoffs
15+
type handoffWorkerManager struct {
16+
// Event-driven handoff support
17+
handoffQueue chan HandoffRequest // Queue for handoff requests
18+
shutdown chan struct{} // Shutdown signal
19+
shutdownOnce sync.Once // Ensure clean shutdown
20+
workerWg sync.WaitGroup // Track worker goroutines
21+
22+
// On-demand worker management
23+
maxWorkers int
24+
activeWorkers atomic.Int32
25+
workerTimeout time.Duration // How long workers wait for work before exiting
26+
workersScaling atomic.Bool
27+
28+
// Simple state tracking
29+
pending sync.Map // map[uint64]int64 (connID -> seqID)
30+
31+
// Configuration for the hitless upgrade
32+
config *Config
33+
34+
// Pool hook reference for handoff processing
35+
poolHook *PoolHook
36+
}
37+
38+
// newHandoffWorkerManager creates a new handoff worker manager
39+
func newHandoffWorkerManager(config *Config, poolHook *PoolHook) *handoffWorkerManager {
40+
return &handoffWorkerManager{
41+
handoffQueue: make(chan HandoffRequest, config.HandoffQueueSize),
42+
shutdown: make(chan struct{}),
43+
maxWorkers: config.MaxWorkers,
44+
activeWorkers: atomic.Int32{}, // Start with no workers - create on demand
45+
workerTimeout: 15 * time.Second, // Workers exit after 15s of inactivity
46+
config: config,
47+
poolHook: poolHook,
48+
}
49+
}
50+
51+
// getCurrentWorkers returns the current number of active workers (for testing)
52+
func (hwm *handoffWorkerManager) getCurrentWorkers() int {
53+
return int(hwm.activeWorkers.Load())
54+
}
55+
56+
// getPendingMap returns the pending map for testing purposes
57+
func (hwm *handoffWorkerManager) getPendingMap() *sync.Map {
58+
return &hwm.pending
59+
}
60+
61+
// getMaxWorkers returns the max workers for testing purposes
62+
func (hwm *handoffWorkerManager) getMaxWorkers() int {
63+
return hwm.maxWorkers
64+
}
65+
66+
// getHandoffQueue returns the handoff queue for testing purposes
67+
func (hwm *handoffWorkerManager) getHandoffQueue() chan HandoffRequest {
68+
return hwm.handoffQueue
69+
}
70+
71+
// isHandoffPending returns true if the given connection has a pending handoff
72+
func (hwm *handoffWorkerManager) isHandoffPending(conn *pool.Conn) bool {
73+
_, pending := hwm.pending.Load(conn.GetID())
74+
return pending
75+
}
76+
77+
// ensureWorkerAvailable ensures at least one worker is available to process requests
78+
// Creates a new worker if needed and under the max limit
79+
func (hwm *handoffWorkerManager) ensureWorkerAvailable() {
80+
select {
81+
case <-hwm.shutdown:
82+
return
83+
default:
84+
if hwm.workersScaling.CompareAndSwap(false, true) {
85+
defer hwm.workersScaling.Store(false)
86+
// Check if we need a new worker
87+
currentWorkers := hwm.activeWorkers.Load()
88+
workersWas := currentWorkers
89+
for currentWorkers <= int32(hwm.maxWorkers) {
90+
hwm.workerWg.Add(1)
91+
go hwm.onDemandWorker()
92+
currentWorkers++
93+
}
94+
// workersWas is always <= currentWorkers
95+
// currentWorkers will be maxWorkers, but if we have a worker that was closed
96+
// while we were creating new workers, just add the difference between
97+
// the currentWorkers and the number of workers we observed initially (i.e. the number of workers we created)
98+
hwm.activeWorkers.Add(currentWorkers - workersWas)
99+
}
100+
}
101+
}
102+
103+
// onDemandWorker processes handoff requests and exits when idle
104+
func (hwm *handoffWorkerManager) onDemandWorker() {
105+
defer func() {
106+
// Decrement active worker count when exiting
107+
hwm.activeWorkers.Add(-1)
108+
hwm.workerWg.Done()
109+
}()
110+
111+
for {
112+
select {
113+
case <-hwm.shutdown:
114+
return
115+
case <-time.After(hwm.workerTimeout):
116+
// Worker has been idle for too long, exit to save resources
117+
if hwm.config != nil && hwm.config.LogLevel.InfoOrAbove() { // Debug level
118+
internal.Logger.Printf(context.Background(),
119+
"hitless: worker exiting due to inactivity timeout (%v)", hwm.workerTimeout)
120+
}
121+
return
122+
case request := <-hwm.handoffQueue:
123+
// Check for shutdown before processing
124+
select {
125+
case <-hwm.shutdown:
126+
// Clean up the request before exiting
127+
hwm.pending.Delete(request.ConnID)
128+
return
129+
default:
130+
// Process the request
131+
hwm.processHandoffRequest(request)
132+
}
133+
}
134+
}
135+
}
136+
137+
// processHandoffRequest processes a single handoff request
138+
func (hwm *handoffWorkerManager) processHandoffRequest(request HandoffRequest) {
139+
// Remove from pending map
140+
defer hwm.pending.Delete(request.Conn.GetID())
141+
internal.Logger.Printf(context.Background(), "hitless: conn[%d] Processing handoff request start", request.Conn.GetID())
142+
143+
// Create a context with handoff timeout from config
144+
handoffTimeout := 15 * time.Second // Default timeout
145+
if hwm.config != nil && hwm.config.HandoffTimeout > 0 {
146+
handoffTimeout = hwm.config.HandoffTimeout
147+
}
148+
ctx, cancel := context.WithTimeout(context.Background(), handoffTimeout)
149+
defer cancel()
150+
151+
// Create a context that also respects the shutdown signal
152+
shutdownCtx, shutdownCancel := context.WithCancel(ctx)
153+
defer shutdownCancel()
154+
155+
// Monitor shutdown signal in a separate goroutine
156+
go func() {
157+
select {
158+
case <-hwm.shutdown:
159+
shutdownCancel()
160+
case <-shutdownCtx.Done():
161+
}
162+
}()
163+
164+
// Perform the handoff with cancellable context
165+
shouldRetry, err := hwm.performConnectionHandoff(shutdownCtx, request.Conn)
166+
minRetryBackoff := 500 * time.Millisecond
167+
if err != nil {
168+
if shouldRetry {
169+
now := time.Now()
170+
deadline, ok := shutdownCtx.Deadline()
171+
thirdOfTimeout := handoffTimeout / 3
172+
if !ok || deadline.Before(now) {
173+
// wait half the timeout before retrying if no deadline or deadline has passed
174+
deadline = now.Add(thirdOfTimeout)
175+
}
176+
afterTime := deadline.Sub(now)
177+
if afterTime < minRetryBackoff {
178+
afterTime = minRetryBackoff
179+
}
180+
181+
internal.Logger.Printf(context.Background(), "Handoff failed for conn[%d] WILL RETRY After %v: %v", request.ConnID, afterTime, err)
182+
time.AfterFunc(afterTime, func() {
183+
if err := hwm.queueHandoff(request.Conn); err != nil {
184+
internal.Logger.Printf(context.Background(), "can't queue handoff for retry: %v", err)
185+
hwm.closeConnFromRequest(context.Background(), request, err)
186+
}
187+
})
188+
return
189+
} else {
190+
go hwm.closeConnFromRequest(ctx, request, err)
191+
}
192+
193+
// Clear handoff state if not returned for retry
194+
seqID := request.Conn.GetMovingSeqID()
195+
connID := request.Conn.GetID()
196+
if hwm.poolHook.hitlessManager != nil {
197+
hwm.poolHook.hitlessManager.UntrackOperationWithConnID(seqID, connID)
198+
}
199+
}
200+
}
201+
202+
// queueHandoff queues a handoff request for processing
203+
// if err is returned, connection will be removed from pool
204+
func (hwm *handoffWorkerManager) queueHandoff(conn *pool.Conn) error {
205+
// Create handoff request
206+
request := HandoffRequest{
207+
Conn: conn,
208+
ConnID: conn.GetID(),
209+
Endpoint: conn.GetHandoffEndpoint(),
210+
SeqID: conn.GetMovingSeqID(),
211+
Pool: hwm.poolHook.pool, // Include pool for connection removal on failure
212+
}
213+
214+
select {
215+
// priority to shutdown
216+
case <-hwm.shutdown:
217+
return ErrShutdown
218+
default:
219+
select {
220+
case <-hwm.shutdown:
221+
return ErrShutdown
222+
case hwm.handoffQueue <- request:
223+
// Store in pending map
224+
hwm.pending.Store(request.ConnID, request.SeqID)
225+
// Ensure we have a worker to process this request
226+
hwm.ensureWorkerAvailable()
227+
return nil
228+
default:
229+
select {
230+
case <-hwm.shutdown:
231+
return ErrShutdown
232+
case hwm.handoffQueue <- request:
233+
// Store in pending map
234+
hwm.pending.Store(request.ConnID, request.SeqID)
235+
// Ensure we have a worker to process this request
236+
hwm.ensureWorkerAvailable()
237+
return nil
238+
case <-time.After(100 * time.Millisecond): // give workers a chance to process
239+
// Queue is full - log and attempt scaling
240+
queueLen := len(hwm.handoffQueue)
241+
queueCap := cap(hwm.handoffQueue)
242+
if hwm.config != nil && hwm.config.LogLevel.WarnOrAbove() { // Warning level
243+
internal.Logger.Printf(context.Background(),
244+
"hitless: handoff queue is full (%d/%d), cannot queue new handoff requests - consider increasing HandoffQueueSize or MaxWorkers in configuration",
245+
queueLen, queueCap)
246+
}
247+
}
248+
}
249+
}
250+
251+
// Ensure we have workers available to handle the load
252+
hwm.ensureWorkerAvailable()
253+
return ErrHandoffQueueFull
254+
}
255+
256+
// shutdownWorkers gracefully shuts down the worker manager, waiting for workers to complete
257+
func (hwm *handoffWorkerManager) shutdownWorkers(ctx context.Context) error {
258+
hwm.shutdownOnce.Do(func() {
259+
close(hwm.shutdown)
260+
// workers will exit when they finish their current request
261+
})
262+
263+
// Wait for workers to complete
264+
done := make(chan struct{})
265+
go func() {
266+
hwm.workerWg.Wait()
267+
close(done)
268+
}()
269+
270+
select {
271+
case <-done:
272+
return nil
273+
case <-ctx.Done():
274+
return ctx.Err()
275+
}
276+
}
277+
278+
// performConnectionHandoff performs the actual connection handoff
279+
// When error is returned, the connection handoff should be retried if err is not ErrMaxHandoffRetriesReached
280+
func (hwm *handoffWorkerManager) performConnectionHandoff(ctx context.Context, conn *pool.Conn) (shouldRetry bool, err error) {
281+
// Clear handoff state after successful handoff
282+
connID := conn.GetID()
283+
284+
newEndpoint := conn.GetHandoffEndpoint()
285+
if newEndpoint == "" {
286+
return false, ErrConnectionInvalidHandoffState
287+
}
288+
289+
retries := conn.IncrementAndGetHandoffRetries(1)
290+
internal.Logger.Printf(ctx, "hitless: conn[%d] Retry %d: Performing handoff to %s(was %s)", conn.GetID(), retries, newEndpoint, conn.RemoteAddr().String())
291+
maxRetries := 3 // Default fallback
292+
if hwm.config != nil {
293+
maxRetries = hwm.config.MaxHandoffRetries
294+
}
295+
296+
if retries > maxRetries {
297+
if hwm.config != nil && hwm.config.LogLevel.WarnOrAbove() { // Warning level
298+
internal.Logger.Printf(ctx,
299+
"hitless: reached max retries (%d) for handoff of conn[%d] to %s",
300+
maxRetries, conn.GetID(), conn.GetHandoffEndpoint())
301+
}
302+
// won't retry on ErrMaxHandoffRetriesReached
303+
return false, ErrMaxHandoffRetriesReached
304+
}
305+
306+
// Create endpoint-specific dialer
307+
endpointDialer := hwm.createEndpointDialer(newEndpoint)
308+
309+
// Create new connection to the new endpoint
310+
newNetConn, err := endpointDialer(ctx)
311+
if err != nil {
312+
internal.Logger.Printf(ctx, "hitless: conn[%d] Failed to dial new endpoint %s: %v", conn.GetID(), newEndpoint, err)
313+
// hitless: will retry
314+
// Maybe a network error - retry after a delay
315+
return true, err
316+
}
317+
318+
// Get the old connection
319+
oldConn := conn.GetNetConn()
320+
321+
// Apply relaxed timeout to the new connection for the configured post-handoff duration
322+
// This gives the new connection more time to handle operations during cluster transition
323+
// Setting this here (before initing the connection) ensures that the connection is going
324+
// to use the relaxed timeout for the first operation (auth/ACL select)
325+
if hwm.config != nil && hwm.config.PostHandoffRelaxedDuration > 0 {
326+
relaxedTimeout := hwm.config.RelaxedTimeout
327+
// Set relaxed timeout with deadline - no background goroutine needed
328+
deadline := time.Now().Add(hwm.config.PostHandoffRelaxedDuration)
329+
conn.SetRelaxedTimeoutWithDeadline(relaxedTimeout, relaxedTimeout, deadline)
330+
331+
if hwm.config.LogLevel.InfoOrAbove() {
332+
internal.Logger.Printf(context.Background(),
333+
"hitless: conn[%d] applied post-handoff relaxed timeout (%v) until %v",
334+
connID, relaxedTimeout, deadline.Format("15:04:05.000"))
335+
}
336+
}
337+
338+
// Replace the connection and execute initialization
339+
err = conn.SetNetConnAndInitConn(ctx, newNetConn)
340+
if err != nil {
341+
// hitless: won't retry
342+
// Initialization failed - remove the connection
343+
return false, err
344+
}
345+
defer func() {
346+
if oldConn != nil {
347+
oldConn.Close()
348+
}
349+
}()
350+
351+
conn.ClearHandoffState()
352+
internal.Logger.Printf(ctx, "hitless: conn[%d] Handoff to %s successful", conn.GetID(), newEndpoint)
353+
354+
return false, nil
355+
}
356+
357+
// createEndpointDialer creates a dialer function that connects to a specific endpoint
358+
func (hwm *handoffWorkerManager) createEndpointDialer(endpoint string) func(context.Context) (net.Conn, error) {
359+
return func(ctx context.Context) (net.Conn, error) {
360+
// Parse endpoint to extract host and port
361+
host, port, err := net.SplitHostPort(endpoint)
362+
if err != nil {
363+
// If no port specified, assume default Redis port
364+
host = endpoint
365+
if port == "" {
366+
port = "6379"
367+
}
368+
}
369+
370+
// Use the base dialer to connect to the new endpoint
371+
return hwm.poolHook.baseDialer(ctx, hwm.poolHook.network, net.JoinHostPort(host, port))
372+
}
373+
}
374+
375+
// closeConnFromRequest closes the connection and logs the reason
376+
func (hwm *handoffWorkerManager) closeConnFromRequest(ctx context.Context, request HandoffRequest, err error) {
377+
pooler := request.Pool
378+
conn := request.Conn
379+
if pooler != nil {
380+
pooler.Remove(ctx, conn, err)
381+
if hwm.config != nil && hwm.config.LogLevel.WarnOrAbove() { // Warning level
382+
internal.Logger.Printf(ctx,
383+
"hitless: removed conn[%d] from pool due to max handoff retries reached: %v",
384+
conn.GetID(), err)
385+
}
386+
} else {
387+
conn.Close()
388+
if hwm.config != nil && hwm.config.LogLevel.WarnOrAbove() { // Warning level
389+
internal.Logger.Printf(ctx,
390+
"hitless: no pool provided for conn[%d], cannot remove due to handoff initialization failure: %v",
391+
conn.GetID(), err)
392+
}
393+
}
394+
}

0 commit comments

Comments
 (0)