Skip to content

Commit e3b4e8d

Browse files
committed
on demand workers
1 parent a4758c2 commit e3b4e8d

File tree

2 files changed

+67
-214
lines changed

2 files changed

+67
-214
lines changed

hitless/pool_hook.go

Lines changed: 59 additions & 203 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package hitless
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"net"
87
"sync"
8+
"sync/atomic"
99
"time"
1010

1111
"github.com/redis/go-redis/v9/internal"
@@ -20,12 +20,11 @@ type HitlessManagerInterface interface {
2020

2121
// HandoffRequest represents a request to handoff a connection to a new endpoint
2222
type HandoffRequest struct {
23-
Conn *pool.Conn
24-
ConnID uint64 // Unique connection identifier
25-
Endpoint string
26-
SeqID int64
27-
StopWorkerRequest bool
28-
Pool pool.Pooler // Pool to remove connection from on failure
23+
Conn *pool.Conn
24+
ConnID uint64 // Unique connection identifier
25+
Endpoint string
26+
SeqID int64
27+
Pool pool.Pooler // Pool to remove connection from on failure
2928
}
3029

3130
// PoolHook implements pool.PoolHook for Redis-specific connection handling
@@ -44,18 +43,13 @@ type PoolHook struct {
4443
shutdownOnce sync.Once // Ensure clean shutdown
4544
workerWg sync.WaitGroup // Track worker goroutines
4645

47-
// Dynamic worker scaling
48-
minWorkers int
46+
// On-demand worker management
4947
maxWorkers int
50-
currentWorkers int
51-
scalingMu sync.Mutex
52-
scaleLevel int // 0=min, 1=max
48+
activeWorkers int32 // Atomic counter for active workers
49+
workerTimeout time.Duration // How long workers wait for work before exiting
50+
scalingMu sync.Mutex // Protects worker creation
51+
5352

54-
// Scale down optimization
55-
scaleDownTimer *time.Timer
56-
scaleDownMu sync.Mutex
57-
lastCompletionTime time.Time
58-
scaleDownDelay time.Duration
5953

6054
// Simple state tracking
6155
pending sync.Map // map[uint64]int64 (connID -> seqID)
@@ -87,20 +81,16 @@ func NewPoolHookWithPoolSize(baseDialer func(context.Context, string, string) (n
8781
// handoffQueue is a buffered channel for queuing handoff requests
8882
handoffQueue: make(chan HandoffRequest, config.HandoffQueueSize),
8983
// shutdown is a channel for signaling shutdown
90-
shutdown: make(chan struct{}),
91-
minWorkers: config.MinWorkers,
92-
maxWorkers: config.MaxWorkers,
93-
// Start with minimum workers
94-
currentWorkers: config.MinWorkers,
95-
scaleLevel: 0, // Start at minimum
96-
config: config,
84+
shutdown: make(chan struct{}),
85+
maxWorkers: config.MaxWorkers,
86+
activeWorkers: 0, // Start with no workers - create on demand
87+
workerTimeout: 30 * time.Second, // Workers exit after 30s of inactivity
88+
config: config,
9789
// Hitless manager for operation completion tracking
9890
hitlessManager: hitlessManager,
99-
scaleDownDelay: config.ScaleDownDelay,
10091
}
10192

102-
// Start worker goroutines at minimum level
103-
ph.startWorkers(ph.minWorkers)
93+
// No upfront worker creation - workers are created on demand
10494

10595
return ph
10696
}
@@ -110,18 +100,17 @@ func (ph *PoolHook) SetPool(pooler pool.Pooler) {
110100
ph.pool = pooler
111101
}
112102

113-
// GetCurrentWorkers returns the current number of workers (for testing)
103+
// GetCurrentWorkers returns the current number of active workers (for testing)
114104
func (ph *PoolHook) GetCurrentWorkers() int {
115-
ph.scalingMu.Lock()
116-
defer ph.scalingMu.Unlock()
117-
return ph.currentWorkers
105+
return int(atomic.LoadInt32(&ph.activeWorkers))
118106
}
119107

120-
// GetScaleLevel returns the current scale level (for testing)
108+
// GetScaleLevel returns 1 if workers are active, 0 if none (for testing compatibility)
121109
func (ph *PoolHook) GetScaleLevel() int {
122-
ph.scalingMu.Lock()
123-
defer ph.scalingMu.Unlock()
124-
return ph.scaleLevel
110+
if atomic.LoadInt32(&ph.activeWorkers) > 0 {
111+
return 1
112+
}
113+
return 0
125114
}
126115

127116
// IsHandoffPending returns true if the given connection has a pending handoff
@@ -177,195 +166,70 @@ func (ph *PoolHook) OnPut(ctx context.Context, conn *pool.Conn) (shouldPool bool
177166
return true, false, nil
178167
}
179168

180-
// startWorkers starts the worker goroutines for processing handoff requests
181-
func (ph *PoolHook) startWorkers(count int) {
169+
// ensureWorkerAvailable ensures at least one worker is available to process requests
170+
// Creates a new worker if needed and under the max limit
171+
func (ph *PoolHook) ensureWorkerAvailable() {
182172
select {
183173
case <-ph.shutdown:
184174
return
185175
default:
186-
hookID := fmt.Sprintf("%p", ph)
187-
internal.Logger.Printf(context.Background(), "hitless: starting %d workers for hook %s", count, hookID)
188-
for i := 0; i < count; i++ {
189-
ph.workerWg.Add(1)
190-
go ph.handoffWorker()
191-
}
192-
}
193-
}
194-
195-
// scaleUpWorkers scales up workers when queue is full (single step: min → max)
196-
func (ph *PoolHook) scaleUpWorkers() {
197-
ph.scalingMu.Lock()
198-
defer ph.scalingMu.Unlock()
199-
200-
if ph.scaleLevel >= 1 {
201-
return // Already at maximum scale
202-
}
203-
204-
previousWorkers := ph.currentWorkers
205-
targetWorkers := ph.maxWorkers
206-
207-
// Ensure we don't go below current workers
208-
if targetWorkers <= ph.currentWorkers {
209-
return
210-
}
211-
212-
additionalWorkers := targetWorkers - ph.currentWorkers
213-
ph.startWorkers(additionalWorkers)
214-
ph.currentWorkers = targetWorkers
215-
ph.scaleLevel = 1
216-
217-
if ph.config != nil && ph.config.LogLevel >= 2 { // Info level
218-
internal.Logger.Printf(context.Background(),
219-
"hitless: scaled up workers from %d to %d (max level) due to queue pressure",
220-
previousWorkers, ph.currentWorkers)
221-
}
222-
}
223-
224-
// scaleDownWorkers returns to minimum worker count when queue is empty
225-
func (ph *PoolHook) scaleDownWorkers() {
226-
ph.scalingMu.Lock()
227-
defer ph.scalingMu.Unlock()
228-
229-
if ph.scaleLevel == 0 {
230-
return // Already at minimum scale
231-
}
232-
233-
// Send stop worker requests to excess workers
234-
excessWorkers := ph.currentWorkers - ph.minWorkers
235-
previousWorkers := ph.currentWorkers
236-
237-
for i := 0; i < excessWorkers; i++ {
238-
stopRequest := HandoffRequest{
239-
StopWorkerRequest: true,
240-
}
241-
242-
// Try to send stop request without blocking
243-
select {
244-
case ph.handoffQueue <- stopRequest:
245-
// Stop request sent successfully
246-
default:
247-
// Queue is full, worker will naturally exit when queue empties
248-
break
176+
// Check if we need a new worker
177+
currentWorkers := atomic.LoadInt32(&ph.activeWorkers)
178+
if currentWorkers < int32(ph.maxWorkers) {
179+
// Try to create a new worker (atomic increment to prevent race)
180+
if atomic.CompareAndSwapInt32(&ph.activeWorkers, currentWorkers, currentWorkers+1) {
181+
ph.workerWg.Add(1)
182+
go ph.onDemandWorker()
183+
}
249184
}
250185
}
251-
252-
ph.currentWorkers = ph.minWorkers
253-
ph.scaleLevel = 0
254-
255-
if ph.config != nil && ph.config.LogLevel >= 2 { // Info level
256-
internal.Logger.Printf(context.Background(),
257-
"hitless: scaling down workers from %d to %d (sent %d stop requests)",
258-
previousWorkers, ph.minWorkers, excessWorkers)
259-
}
260186
}
261187

262-
// scheduleScaleDownCheck schedules a scale down check after a delay
263-
// This is called after completing a handoff request to avoid expensive immediate checks
264-
func (ph *PoolHook) scheduleScaleDownCheck() {
265-
ph.scaleDownMu.Lock()
266-
defer ph.scaleDownMu.Unlock()
267188

268-
// Update last completion time
269-
ph.lastCompletionTime = time.Now()
270189

271-
// If timer already exists, reset it
272-
if ph.scaleDownTimer != nil {
273-
ph.scaleDownTimer.Reset(ph.scaleDownDelay)
274-
return
275-
}
276-
277-
// Create new timer
278-
ph.scaleDownTimer = time.AfterFunc(ph.scaleDownDelay, func() {
279-
ph.performScaleDownCheck()
280-
})
281-
}
282-
283-
// performScaleDownCheck performs the actual scale down check
284-
// This runs in a background goroutine after the delay
285-
func (ph *PoolHook) performScaleDownCheck() {
286-
ph.scaleDownMu.Lock()
287-
defer ph.scaleDownMu.Unlock()
288190

289-
// Clear the timer since it has fired
290-
ph.scaleDownTimer = nil
291191

292-
// Check if we should scale down
293-
if ph.shouldScaleDown() {
294-
ph.scaleDownWorkers()
295-
}
296-
}
297192

298-
// shouldScaleDown checks if conditions are met for scaling down
299-
// This is the expensive check that we want to minimize
300-
func (ph *PoolHook) shouldScaleDown() bool {
301-
// Quick check: if we're already at minimum scale, no need to scale down
302-
if ph.scaleLevel == 0 {
303-
return false
304-
}
305-
306-
// Quick check: if queue is not empty, don't scale down
307-
if len(ph.handoffQueue) > 0 {
308-
return false
309-
}
310-
311-
// Expensive check: count pending handoffs
312-
pendingCount := 0
313-
ph.pending.Range(func(key, value interface{}) bool {
314-
pendingCount++
315-
return pendingCount < 5 // Early exit if we find several pending
316-
})
317-
318-
// Only scale down if no pending handoffs
319-
return pendingCount == 0
320-
}
321193

322-
// handoffWorker processes handoff requests from the queue
323-
func (ph *PoolHook) handoffWorker() {
324-
defer ph.workerWg.Done()
325194

326-
// Debug: Log worker start with PoolHook pointer for identification
327-
hookID := fmt.Sprintf("%p", ph)
328-
internal.Logger.Printf(context.Background(), "hitless: worker started for hook %s", hookID)
195+
// onDemandWorker processes handoff requests and exits when idle
196+
func (ph *PoolHook) onDemandWorker() {
197+
defer func() {
198+
// Decrement active worker count when exiting
199+
atomic.AddInt32(&ph.activeWorkers, -1)
200+
ph.workerWg.Done()
201+
}()
329202

330203
for {
331204
select {
332205
case request := <-ph.handoffQueue:
333-
internal.Logger.Printf(context.Background(), "hitless: worker %s received request", hookID)
334-
// Check if this is a stop worker request
335-
if request.StopWorkerRequest {
336-
if ph.config != nil && ph.config.LogLevel >= 2 { // Info level
337-
internal.Logger.Printf(context.Background(),
338-
"hitless: worker %s received stop request, exiting", hookID)
339-
}
340-
return // Exit this worker
341-
}
342-
343206
// Check for shutdown before processing
344207
select {
345208
case <-ph.shutdown:
346209
// Clean up the request before exiting
347210
ph.pending.Delete(request.ConnID)
348-
internal.Logger.Printf(context.Background(), "hitless: worker %s exiting due to shutdown during request", hookID)
349211
return
350212
default:
351-
// Continue with processing
352-
internal.Logger.Printf(context.Background(), "hitless: worker %s processing request", hookID)
213+
// Process the request
353214
ph.processHandoffRequest(request)
354215
}
216+
217+
case <-time.After(ph.workerTimeout):
218+
// Worker has been idle for too long, exit to save resources
219+
if ph.config != nil && ph.config.LogLevel >= 3 { // Debug level
220+
internal.Logger.Printf(context.Background(),
221+
"hitless: worker exiting due to inactivity timeout (%v)", ph.workerTimeout)
222+
}
223+
return
224+
355225
case <-ph.shutdown:
356-
internal.Logger.Printf(context.Background(), "hitless: worker %s exiting due to shutdown", hookID)
357226
return
358227
}
359228
}
360229
}
361230

362231
// processHandoffRequest processes a single handoff request
363232
func (ph *PoolHook) processHandoffRequest(request HandoffRequest) {
364-
// Safety check: ignore stop worker requests (should be handled in worker)
365-
if request.StopWorkerRequest {
366-
return
367-
}
368-
369233
// Remove from pending map
370234
defer ph.pending.Delete(request.Conn.GetID())
371235

@@ -395,9 +259,8 @@ func (ph *PoolHook) processHandoffRequest(request HandoffRequest) {
395259
internal.Logger.Printf(context.Background(), "Handoff failed for connection WILL RETRY: %v", err)
396260
}
397261

398-
// Schedule a scale down check after completing this handoff request
399-
// This avoids expensive immediate checks and prevents rapid scaling cycles
400-
ph.scheduleScaleDownCheck()
262+
// No need for scale down scheduling with on-demand workers
263+
// Workers automatically exit when idle
401264
}
402265

403266
// queueHandoff queues a handoff request for processing
@@ -423,6 +286,8 @@ func (ph *PoolHook) queueHandoff(conn *pool.Conn) error {
423286
case ph.handoffQueue <- request:
424287
// Store in pending map
425288
ph.pending.Store(request.ConnID, request.SeqID)
289+
// Ensure we have a worker to process this request
290+
ph.ensureWorkerAvailable()
426291
return nil
427292
default:
428293
// Queue is full - log and attempt scaling
@@ -434,8 +299,8 @@ func (ph *PoolHook) queueHandoff(conn *pool.Conn) error {
434299
}
435300
}
436301

437-
// Scale up workers to handle the load
438-
go ph.scaleUpWorkers()
302+
// Ensure we have workers available to handle the load
303+
ph.ensureWorkerAvailable()
439304
return errors.New("queue full")
440305
}
441306

@@ -578,19 +443,10 @@ func (ph *PoolHook) createEndpointDialer(endpoint string) func(context.Context)
578443

579444
// Shutdown gracefully shuts down the processor, waiting for workers to complete
580445
func (ph *PoolHook) Shutdown(ctx context.Context) error {
581-
hookID := fmt.Sprintf("%p", ph)
582-
internal.Logger.Printf(context.Background(), "hitless: Shutdown called for hook %s", hookID)
583446
ph.shutdownOnce.Do(func() {
584-
internal.Logger.Printf(context.Background(), "hitless: closing shutdown channel for hook %s", hookID)
585447
close(ph.shutdown)
586448

587-
// Clean up scale down timer
588-
ph.scaleDownMu.Lock()
589-
if ph.scaleDownTimer != nil {
590-
ph.scaleDownTimer.Stop()
591-
ph.scaleDownTimer = nil
592-
}
593-
ph.scaleDownMu.Unlock()
449+
// No timers to clean up with on-demand workers
594450
})
595451

596452
// Wait for workers to complete

0 commit comments

Comments
 (0)