Skip to content

Commit 72d5452

Browse files
authored
fix(queue): replace semaphore with sync.Cond to eliminate worker serialization (#9)
The current implementation uses a size-1 semaphore to ensure a single worker reads from q.items.Front() at a given time. The problem is that after the worker realizes it has to wait until `plannedToStartWorkAt`, it continues to hold the semaphore while sleeping. Other workers block until the sleeping worker wakes up, processes its task, and releases. The q.items list acts as a FIFO queue and the keeps the invariant that elements are ordered by how soon they are to be processed, given the plannedToStartWorkAt value. While a worker sleeps holding the semaphore, elements that should be scheduled immediately cannot be processed even though they might have nothing else to do. When we consider that the default max backoff is 1000s, we can have a worker holding the semaphore for as much as 16 minutes. This patch removes the semaphore and replaces it with a sync.Cond. We'll c.Wait in two situations: there's nothing to do (no elements to process), so we wait until we get a Signal from insert, or we got a element but have to wait for the plannedToStartWorkAt timer. To avoid a timer mutex race, we prefer Signals when we know an item is ready (insert) or the timer fired. Broadcast is used for cancellations and shutdowns. Signed-off-by: Juliana Oliveira <juliana@fly.io>
1 parent db3c70e commit 72d5452

File tree

1 file changed

+84
-67
lines changed

1 file changed

+84
-67
lines changed

internal/queue/queue.go

Lines changed: 84 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
pkgerrors "github.com/pkg/errors"
2525
"github.com/virtual-kubelet/virtual-kubelet/log"
2626
"github.com/virtual-kubelet/virtual-kubelet/trace"
27-
"golang.org/x/sync/semaphore"
2827
"k8s.io/apimachinery/pkg/util/wait"
2928
"k8s.io/client-go/util/workqueue"
3029
"k8s.io/utils/clock"
@@ -47,6 +46,7 @@ type Queue struct {
4746
clock clock.Clock
4847
// lock protects running, and the items list / map
4948
lock sync.Mutex
49+
cond *sync.Cond
5050
running bool
5151
name string
5252
handler ItemHandler
@@ -58,12 +58,11 @@ type Queue struct {
5858
itemsInQueue map[string]*list.Element
5959
// itemsBeingProcessed is a map of (string) key -> item once it has been moved
6060
itemsBeingProcessed map[string]*queueItem
61-
// Wait for next semaphore is an exclusive (1 item) lock that is taken every time items is checked to see if there
62-
// is an item in queue for work
63-
waitForNextItemSemaphore *semaphore.Weighted
6461

65-
// wakeup
66-
wakeupCh chan struct{}
62+
// timerLeaderActive is true when a worker is waiting on a timer for a delayed item.
63+
// Only one worker should wait on the timer; others wait on cond for a signal.
64+
// This avoids a thundering herd on timers.
65+
timerLeaderActive bool
6766

6867
retryFunc ShouldRetryFunc
6968
}
@@ -94,18 +93,18 @@ func New(ratelimiter workqueue.TypedRateLimiter[any], name string, handler ItemH
9493
if retryFunc == nil {
9594
retryFunc = DefaultRetryFunc
9695
}
97-
return &Queue{
98-
clock: clock.RealClock{},
99-
name: name,
100-
ratelimiter: ratelimiter,
101-
items: list.New(),
102-
itemsBeingProcessed: make(map[string]*queueItem),
103-
itemsInQueue: make(map[string]*list.Element),
104-
handler: handler,
105-
wakeupCh: make(chan struct{}, 1),
106-
waitForNextItemSemaphore: semaphore.NewWeighted(1),
107-
retryFunc: retryFunc,
96+
q := &Queue{
97+
clock: clock.RealClock{},
98+
name: name,
99+
ratelimiter: ratelimiter,
100+
items: list.New(),
101+
itemsBeingProcessed: make(map[string]*queueItem),
102+
itemsInQueue: make(map[string]*list.Element),
103+
handler: handler,
104+
retryFunc: retryFunc,
108105
}
106+
q.cond = sync.NewCond(&q.lock)
107+
return q
109108
}
110109

111110
// Enqueue enqueues the key in a rate limited fashion
@@ -179,12 +178,7 @@ func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay *t
179178
ctx = span.WithField(ctx, "delay", delay.String())
180179
}
181180

182-
defer func() {
183-
select {
184-
case q.wakeupCh <- struct{}{}:
185-
default:
186-
}
187-
}()
181+
defer q.cond.Signal()
188182

189183
// First see if the item is already being processed
190184
if item, ok := q.itemsBeingProcessed[key]; ok {
@@ -340,6 +334,8 @@ func (q *Queue) Run(ctx context.Context, workers int) {
340334
}
341335
defer group.Wait()
342336
<-ctx.Done()
337+
// Wake all workers for cancellation
338+
q.cond.Broadcast()
343339
}
344340

345341
func (q *Queue) worker(ctx context.Context, i int) {
@@ -355,58 +351,79 @@ func (q *Queue) getNextItem(ctx context.Context) (*queueItem, error) {
355351
ctx, span := trace.StartSpan(ctx, "getNextItem")
356352
defer span.End()
357353

358-
ctx, acqSpan := trace.StartSpan(ctx, "acquireNextItemSemaphore")
359-
if err := q.waitForNextItemSemaphore.Acquire(ctx, 1); err != nil {
360-
acqSpan.SetStatus(err)
361-
acqSpan.End()
362-
return nil, err
363-
}
364-
acqSpan.SetStatus(nil)
365-
acqSpan.End()
366-
defer q.waitForNextItemSemaphore.Release(1)
354+
// Goroutine broadcasts on context cancellation so workers blocked
355+
// on cond.Wait() can exit. Exits when ctx is done OR done is closed.
356+
done := make(chan struct{})
357+
defer close(done)
358+
go func() {
359+
select {
360+
case <-ctx.Done():
361+
q.cond.Broadcast()
362+
case <-done:
363+
}
364+
}()
365+
366+
q.lock.Lock()
367+
defer q.lock.Unlock()
367368

368369
for {
369-
q.lock.Lock()
370+
select {
371+
case <-ctx.Done():
372+
span.SetStatus(ctx.Err())
373+
return nil, ctx.Err()
374+
default:
375+
}
376+
370377
element := q.items.Front()
371378
if element == nil {
372-
// Wait for the next item
373-
q.lock.Unlock()
379+
_, waitSpan := trace.StartSpan(ctx, "waitForItem")
380+
q.cond.Wait()
381+
waitSpan.End()
382+
continue
383+
}
384+
385+
qi := element.Value.(*queueItem)
386+
timeUntilProcessing := time.Until(qi.plannedToStartWorkAt)
387+
388+
// Do we need to sleep? If not, let's party.
389+
if timeUntilProcessing <= 0 {
390+
q.itemsBeingProcessed[qi.key] = qi
391+
q.items.Remove(element)
392+
delete(q.itemsInQueue, qi.key)
393+
span.SetStatus(nil)
394+
return qi, nil
395+
}
396+
397+
// Item is delayed. Only one worker waits on the timer.
398+
// Other workers just wait for a signal.
399+
if q.timerLeaderActive {
400+
_, waitSpan := trace.StartSpan(ctx, "waitForReady")
401+
q.cond.Wait()
402+
waitSpan.End()
403+
continue
404+
}
405+
406+
// Now I'm the leader
407+
q.timerLeaderActive = true
408+
timer := q.clock.NewTimer(timeUntilProcessing)
409+
timerDone := make(chan struct{})
410+
go func() {
374411
select {
375412
case <-ctx.Done():
376-
span.SetStatus(nil)
377-
return nil, ctx.Err()
378-
case <-q.wakeupCh:
379-
}
380-
} else {
381-
qi := element.Value.(*queueItem)
382-
timeUntilProcessing := time.Until(qi.plannedToStartWorkAt)
383-
384-
// Do we need to sleep? If not, let's party.
385-
if timeUntilProcessing <= 0 {
386-
q.itemsBeingProcessed[qi.key] = qi
387-
q.items.Remove(element)
388-
delete(q.itemsInQueue, qi.key)
389-
q.lock.Unlock()
390-
span.SetStatus(nil)
391-
return qi, nil
413+
case <-timer.C():
414+
case <-timerDone:
415+
return
392416
}
393417

394-
q.lock.Unlock()
395-
if err := func() error {
396-
timer := q.clock.NewTimer(timeUntilProcessing)
397-
defer timer.Stop()
398-
select {
399-
case <-timer.C():
400-
case <-ctx.Done():
401-
return ctx.Err()
402-
case <-q.wakeupCh:
403-
}
404-
return nil
405-
}(); err != nil {
406-
span.SetStatus(err)
407-
return nil, err
408-
}
409-
}
418+
q.cond.Broadcast()
419+
}()
420+
421+
_, waitSpan := trace.StartSpan(ctx, "waitForReadyAsLeader")
422+
q.cond.Wait()
423+
waitSpan.End()
424+
timer.Stop()
425+
close(timerDone)
426+
q.timerLeaderActive = false
410427
}
411428
}
412429

0 commit comments

Comments
 (0)