@@ -3,7 +3,6 @@ package hitless
3
3
import (
4
4
"context"
5
5
"errors"
6
- "fmt"
7
6
"net"
8
7
"sync"
9
8
"time"
@@ -252,57 +251,6 @@ func (ph *PoolHook) scaleDownWorkers() {
252
251
}
253
252
}
254
253
255
- // queueHandoffWithTimeout attempts to queue a handoff request with timeout and scaling
256
- func (ph * PoolHook ) queueHandoffWithTimeout (request HandoffRequest , cn * pool.Conn ) {
257
- // First attempt - try immediate queuing
258
- select {
259
- case ph .handoffQueue <- request :
260
- return
261
- case <- ph .shutdown :
262
- ph .pending .Delete (cn .GetID ())
263
- return
264
- default :
265
- // Queue is full - log and attempt scaling
266
- if ph .config != nil && ph .config .LogLevel >= 1 { // Warning level
267
- internal .Logger .Printf (context .Background (),
268
- "hitless: handoff queue is full (%d/%d), attempting timeout queuing and scaling workers" ,
269
- len (ph .handoffQueue ), cap (ph .handoffQueue ))
270
- }
271
-
272
- // Scale up workers to handle the load
273
- ph .scaleUpWorkers ()
274
- }
275
-
276
- queueTimeout := 5 * time .Second // Default fallback
277
- if ph .config != nil {
278
- queueTimeout = ph .config .HandoffQueueTimeout
279
- }
280
-
281
- timeout := time .NewTimer (queueTimeout )
282
- defer timeout .Stop ()
283
-
284
- select {
285
- case ph .handoffQueue <- request :
286
- // Queued successfully after timeout
287
- if ph .config != nil && ph .config .LogLevel >= 2 { // Info level
288
- internal .Logger .Printf (context .Background (),
289
- "hitless: handoff queued successfully after scaling workers" )
290
- }
291
- return
292
- case <- timeout .C :
293
- // Timeout expired - drop the connection
294
- err := fmt .Errorf ("handoff queue timeout after %v" , queueTimeout )
295
- ph .pending .Delete (cn .GetID ())
296
- if ph .config != nil && ph .config .LogLevel >= 0 { // Error level
297
- internal .Logger .Printf (context .Background (), err .Error ())
298
- }
299
- return
300
- case <- ph .shutdown :
301
- ph .pending .Delete (cn .GetID ())
302
- return
303
- }
304
- }
305
-
306
254
// scheduleScaleDownCheck schedules a scale down check after a delay
307
255
// This is called after completing a handoff request to avoid expensive immediate checks
308
256
func (ph * PoolHook ) scheduleScaleDownCheck () {
@@ -439,11 +387,26 @@ func (ph *PoolHook) queueHandoff(conn *pool.Conn) error {
439
387
Pool : ph .pool , // Include pool for connection removal on failure
440
388
}
441
389
442
- // Store in pending map
443
- ph .pending .Store (request .ConnID , request .SeqID )
390
+ select {
391
+ case ph .handoffQueue <- request :
392
+ // Store in pending map
393
+ ph .pending .Store (request .ConnID , request .SeqID )
394
+ return nil
395
+ case <- ph .shutdown :
396
+ ph .pending .Delete (cn .GetID ())
397
+ return errors .New ("shutdown" )
398
+ default :
399
+ // Queue is full - log and attempt scaling
400
+ if ph .config != nil && ph .config .LogLevel >= 1 { // Warning level
401
+ internal .Logger .Printf (context .Background (),
402
+ "hitless: handoff queue is full (%d/%d), attempting timeout queuing and scaling workers" ,
403
+ len (ph .handoffQueue ), cap (ph .handoffQueue ))
404
+ }
444
405
445
- go ph .queueHandoffWithTimeout (request , conn )
446
- return nil
406
+ // Scale up workers to handle the load
407
+ go ph .scaleUpWorkers ()
408
+ }
409
+ return errors .New ("queue full" )
447
410
}
448
411
449
412
// performConnectionHandoffWithPool performs the actual connection handoff with pool for connection removal on failure
0 commit comments