Skip to content

Commit 94b2fa0

Browse files
authored
use a dispatch ticker to dispatch requests periodly in ShardProcessor… (#1850)
* use a dispatch ticker to dispatch requests periodly in ShardProcessor, preventing high CPU usage Signed-off-by: Hang Yin <[email protected]> * fix test Signed-off-by: Hang Yin <[email protected]> --------- Signed-off-by: Hang Yin <[email protected]>
1 parent 58a7e6c commit 94b2fa0

File tree

2 files changed

+26
-17
lines changed

2 files changed

+26
-17
lines changed

pkg/epp/flowcontrol/controller/internal/processor.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23-
"runtime"
2423
"sync"
2524
"sync/atomic"
2625
"time"
@@ -165,16 +164,19 @@ func (sp *ShardProcessor) Run(ctx context.Context) {
165164
sp.wg.Add(1)
166165
go sp.runCleanupSweep(ctx)
167166

167+
// Create a ticker for periodic dispatch attempts to avoid tight loops
168+
dispatchTicker := sp.clock.NewTicker(time.Millisecond)
169+
defer dispatchTicker.Stop()
170+
168171
// This is the main worker loop. It continuously processes incoming requests and dispatches queued requests until the
169172
// context is cancelled. The `select` statement has three cases:
170173
//
171174
// 1. Context Cancellation: The highest priority is shutting down. If the context's `Done` channel is closed, the
172175
// loop will drain all queues and exit. This is the primary exit condition.
173176
// 2. New Item Arrival: If an item is available on `enqueueChan`, it will be processed. This ensures that the
174177
// processor is responsive to new work.
175-
// 3. Default (Dispatch): If neither of the above cases is ready, the `default` case executes, ensuring the loop is
176-
// non-blocking. It continuously attempts to dispatch items from the existing backlog, preventing starvation and
177-
// ensuring queues are drained.
178+
// 3. Dispatch Ticker: Periodically triggers a dispatch cycle to attempt to dispatch items from existing queues,
179+
// ensuring that queued work is processed even when no new items arrive.
178180
for {
179181
select {
180182
case <-ctx.Done():
@@ -193,14 +195,9 @@ func (sp *ShardProcessor) Run(ctx context.Context) {
193195
continue
194196
}
195197
sp.enqueue(item)
196-
sp.dispatchCycle(ctx)
197-
default:
198-
// If no new items are arriving, continuously try to dispatch from the backlog.
199-
if !sp.dispatchCycle(ctx) {
200-
// If no work was done, yield to the scheduler to prevent a tight, busy-loop when idle, while still allowing for
201-
// immediate rescheduling.
202-
runtime.Gosched()
203-
}
198+
sp.dispatchCycle(ctx) // Process immediately when an item arrives
199+
case <-dispatchTicker.C():
200+
sp.dispatchCycle(ctx) // Periodically attempt to dispatch from queues
204201
}
205202
}
206203
}

pkg/epp/flowcontrol/controller/internal/processor_test.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -480,11 +480,23 @@ func TestShardProcessor(t *testing.T) {
480480
defer h.Stop()
481481
h.Go()
482482

483-
// Wait for the dispatch cycle to select our item and pause inside our mock `RemoveFunc`.
484-
select {
485-
case <-itemIsBeingDispatched:
486-
case <-time.After(testWaitTimeout):
487-
t.Fatal("Timed out waiting for item to be dispatched")
483+
// Advance the test clock in small increments until the item is being dispatched or timeout
484+
// This is a more reliable way to ensure the processor has started and run the dispatch cycle
485+
timeout := time.After(testWaitTimeout)
486+
ticker := time.NewTicker(1 * time.Millisecond)
487+
defer ticker.Stop()
488+
489+
dispatched := false
490+
for !dispatched {
491+
select {
492+
case <-itemIsBeingDispatched:
493+
dispatched = true
494+
case <-timeout:
495+
t.Fatal("Timed out waiting for item to be dispatched")
496+
case <-ticker.C:
497+
// Advance the test clock to trigger the dispatch ticker
498+
h.clock.Step(1 * time.Millisecond)
499+
}
488500
}
489501

490502
// 3. The dispatch goroutine is now paused. We can now safely win the "race" by running cleanup logic.

0 commit comments

Comments
 (0)