Skip to content

Commit 69569ea

Browse files
committed
still hunting this deadlock
1 parent 76bb698 commit 69569ea

File tree

2 files changed

+27
-64
lines changed

2 files changed

+27
-64
lines changed

hitless/config.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,16 @@ type Config struct {
8888

8989
// MinWorkers is the minimum number of worker goroutines for processing handoff requests.
9090
// The processor starts with this number of workers and scales down to this level when idle.
91-
// If zero, defaults to max(1, PoolSize/25) to be proportional to connection pool size.
91+
// If zero, defaults to max(1, PoolSize/50) to be proportional to connection pool size.
9292
//
93-
// Default: max(1, PoolSize/25)
93+
// Default: max(1, PoolSize/50)
9494
MinWorkers int
9595

9696
// MaxWorkers is the maximum number of worker goroutines for processing handoff requests.
9797
// The processor will scale up to this number when under load.
98-
// If zero, defaults to max(MinWorkers*4, PoolSize/5) to handle bursts effectively.
98+
// If zero, defaults to max(MinWorkers*4, PoolSize/10) to handle bursts effectively.
9999
//
100-
// Default: max(MinWorkers*4, PoolSize/5)
100+
// Default: max(MinWorkers*4, PoolSize/10)
101101
MaxWorkers int
102102

103103
// HandoffQueueSize is the size of the buffered channel used to queue handoff requests.
@@ -517,14 +517,14 @@ func (c *Config) applyWorkerDefaults(poolSize int) {
517517
poolSize = 10 * runtime.GOMAXPROCS(0)
518518
}
519519

520-
// MinWorkers: max(1, poolSize/25) - conservative baseline
520+
// MinWorkers: max(1, poolSize/50) - conservative baseline
521521
if c.MinWorkers == 0 {
522-
c.MinWorkers = util.Max(1, poolSize/25)
522+
c.MinWorkers = util.Max(1, poolSize/50)
523523
}
524524

525-
// MaxWorkers: max(MinWorkers*4, poolSize/5) - handle bursts effectively
525+
// MaxWorkers: max(MinWorkers*4, poolSize/10) - handle bursts effectively
526526
if c.MaxWorkers == 0 {
527-
c.MaxWorkers = util.Max(c.MinWorkers*4, poolSize/5)
527+
c.MaxWorkers = util.Max(c.MinWorkers*4, poolSize/10)
528528
}
529529

530530
// Ensure MaxWorkers >= MinWorkers

hitless/pool_hook.go

Lines changed: 19 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package hitless
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"net"
87
"sync"
98
"time"
@@ -252,57 +251,6 @@ func (ph *PoolHook) scaleDownWorkers() {
252251
}
253252
}
254253

255-
// queueHandoffWithTimeout attempts to queue a handoff request with timeout and scaling
256-
func (ph *PoolHook) queueHandoffWithTimeout(request HandoffRequest, cn *pool.Conn) {
257-
// First attempt - try immediate queuing
258-
select {
259-
case ph.handoffQueue <- request:
260-
return
261-
case <-ph.shutdown:
262-
ph.pending.Delete(cn.GetID())
263-
return
264-
default:
265-
// Queue is full - log and attempt scaling
266-
if ph.config != nil && ph.config.LogLevel >= 1 { // Warning level
267-
internal.Logger.Printf(context.Background(),
268-
"hitless: handoff queue is full (%d/%d), attempting timeout queuing and scaling workers",
269-
len(ph.handoffQueue), cap(ph.handoffQueue))
270-
}
271-
272-
// Scale up workers to handle the load
273-
ph.scaleUpWorkers()
274-
}
275-
276-
queueTimeout := 5 * time.Second // Default fallback
277-
if ph.config != nil {
278-
queueTimeout = ph.config.HandoffQueueTimeout
279-
}
280-
281-
timeout := time.NewTimer(queueTimeout)
282-
defer timeout.Stop()
283-
284-
select {
285-
case ph.handoffQueue <- request:
286-
// Queued successfully after timeout
287-
if ph.config != nil && ph.config.LogLevel >= 2 { // Info level
288-
internal.Logger.Printf(context.Background(),
289-
"hitless: handoff queued successfully after scaling workers")
290-
}
291-
return
292-
case <-timeout.C:
293-
// Timeout expired - drop the connection
294-
err := fmt.Errorf("handoff queue timeout after %v", queueTimeout)
295-
ph.pending.Delete(cn.GetID())
296-
if ph.config != nil && ph.config.LogLevel >= 0 { // Error level
297-
internal.Logger.Printf(context.Background(), err.Error())
298-
}
299-
return
300-
case <-ph.shutdown:
301-
ph.pending.Delete(cn.GetID())
302-
return
303-
}
304-
}
305-
306254
// scheduleScaleDownCheck schedules a scale down check after a delay
307255
// This is called after completing a handoff request to avoid expensive immediate checks
308256
func (ph *PoolHook) scheduleScaleDownCheck() {
@@ -439,11 +387,26 @@ func (ph *PoolHook) queueHandoff(conn *pool.Conn) error {
439387
Pool: ph.pool, // Include pool for connection removal on failure
440388
}
441389

442-
// Store in pending map
443-
ph.pending.Store(request.ConnID, request.SeqID)
390+
select {
391+
case ph.handoffQueue <- request:
392+
// Store in pending map
393+
ph.pending.Store(request.ConnID, request.SeqID)
394+
return nil
395+
case <-ph.shutdown:
396+
ph.pending.Delete(request.ConnID)
397+
return errors.New("shutdown")
398+
default:
399+
// Queue is full - log and attempt scaling
400+
if ph.config != nil && ph.config.LogLevel >= 1 { // Warning level
401+
internal.Logger.Printf(context.Background(),
402+
"hitless: handoff queue is full (%d/%d), attempting timeout queuing and scaling workers",
403+
len(ph.handoffQueue), cap(ph.handoffQueue))
404+
}
444405

445-
go ph.queueHandoffWithTimeout(request, conn)
446-
return nil
406+
// Scale up workers to handle the load
407+
go ph.scaleUpWorkers()
408+
}
409+
return errors.New("queue full")
447410
}
448411

449412
// performConnectionHandoffWithPool performs the actual connection handoff with pool for connection removal on failure

0 commit comments

Comments
 (0)