Skip to content

Commit 3417164

Browse files
committed
fast workers are fast :D
1 parent aabfdaa commit 3417164

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

hitless/pool_hook.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,20 @@ func (ph *PoolHook) OnPut(ctx context.Context, conn *pool.Conn) (shouldPool bool
151151
internal.Logger.Printf(ctx, "Failed to queue handoff: %v", err)
152152
return false, true, nil // Don't pool, remove connection, no error to caller
153153
}
154+
155+
// Check if handoff was already processed by a worker before we can mark it as queued
156+
if !conn.ShouldHandoff() {
157+
// Handoff was already processed - this is normal and the connection should be pooled
158+
return true, false, nil
159+
}
160+
154161
if err := conn.MarkQueuedForHandoff(); err != nil {
155-
// If marking fails, remove the connection instead
162+
// If marking fails, check if handoff was processed in the meantime
163+
if !conn.ShouldHandoff() {
164+
// Handoff was processed - this is normal, pool the connection
165+
return true, false, nil
166+
}
167+
// Other error - remove the connection
156168
return false, true, nil
157169
}
158170
return true, false, nil
@@ -281,10 +293,12 @@ func (ph *PoolHook) queueHandoff(conn *pool.Conn) error {
281293
return nil
282294
default:
283295
// Queue is full - log and attempt scaling
296+
queueLen := len(ph.handoffQueue)
297+
queueCap := cap(ph.handoffQueue)
284298
if ph.config != nil && ph.config.LogLevel >= 1 { // Warning level
285299
internal.Logger.Printf(context.Background(),
286300
"hitless: handoff queue is full (%d/%d), attempting timeout queuing and scaling workers",
287-
len(ph.handoffQueue), cap(ph.handoffQueue))
301+
queueLen, queueCap)
288302
}
289303
}
290304
}

hitless/pool_hook_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -735,22 +735,22 @@ func TestConnectionHook(t *testing.T) {
735735
if err != nil {
736736
t.Errorf("Failed to queue handoff %d: %v", i, err)
737737
}
738+
738739
if !shouldPool || shouldRemove {
739-
t.Errorf("Connection %d should be pooled after handoff", i)
740+
t.Errorf("Connection %d should be pooled after handoff (shouldPool=%v, shouldRemove=%v)",
741+
i, shouldPool, shouldRemove)
740742
}
741743
}
742744

743745
// Verify queue capacity remains static (the main purpose of this test)
744746
finalCapacity := cap(processor.handoffQueue)
747+
745748
if finalCapacity != 50 {
746749
t.Errorf("Queue capacity should remain static at 50, got %d", finalCapacity)
747750
}
748751

749752
// Note: We don't check queue size here because workers process items quickly
750753
// The important thing is that the capacity remains static regardless of pool size
751-
currentQueueSize := len(processor.handoffQueue)
752-
t.Logf("Static queue test completed - Capacity: %d, Current size: %d",
753-
finalCapacity, currentQueueSize)
754754
})
755755

756756
t.Run("ConnectionRemovalOnHandoffFailure", func(t *testing.T) {

0 commit comments

Comments
 (0)