Skip to content

Commit 40cc429

Browse files
committed
queue: Use generic key type
1 parent c485b29 commit 40cc429

File tree

4 files changed

+55
-56
lines changed

4 files changed

+55
-56
lines changed

internal/queue/queue.go

Lines changed: 45 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -36,40 +36,40 @@ const (
3636
)
3737

3838
// ShouldRetryFunc is a mechanism to have a custom retry policy
39-
type ShouldRetryFunc func(ctx context.Context, key string, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error)
39+
type ShouldRetryFunc[K any] func(ctx context.Context, key K, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error)
4040

4141
// ItemHandler is a callback that handles a single key on the Queue
42-
type ItemHandler func(ctx context.Context, key string) error
42+
type ItemHandler[K any] func(ctx context.Context, key K) error
4343

4444
// Queue implements a wrapper around workqueue with native VK instrumentation
45-
type Queue struct {
45+
type Queue[K comparable] struct {
4646
// clock is used for testing
4747
clock clock.Clock
4848
// lock protects running, and the items list / map
4949
lock sync.Mutex
5050
running bool
5151
name string
52-
handler ItemHandler
52+
handler ItemHandler[K]
5353

5454
ratelimiter workqueue.TypedRateLimiter[any]
5555
// items are items that are marked dirty waiting for processing.
5656
items *list.List
57-
// itemInQueue is a map of (string) key -> item while it is in the items list
58-
itemsInQueue map[string]*list.Element
59-
// itemsBeingProcessed is a map of (string) key -> item once it has been moved
60-
itemsBeingProcessed map[string]*queueItem
57+
// itemInQueue is a map of key -> item while it is in the items list
58+
itemsInQueue map[K]*list.Element
59+
// itemsBeingProcessed is a map of key -> item once it has been moved
60+
itemsBeingProcessed map[K]*queueItem[K]
6161
// Wait for next semaphore is an exclusive (1 item) lock that is taken every time items is checked to see if there
6262
// is an item in queue for work
6363
waitForNextItemSemaphore *semaphore.Weighted
6464

6565
// wakeup
6666
wakeupCh chan struct{}
6767

68-
retryFunc ShouldRetryFunc
68+
retryFunc ShouldRetryFunc[K]
6969
}
7070

71-
type queueItem struct {
72-
key string
71+
type queueItem[K comparable] struct {
72+
key K
7373
plannedToStartWorkAt time.Time
7474
redirtiedAt time.Time
7575
redirtiedWithRatelimit bool
@@ -82,25 +82,25 @@ type queueItem struct {
8282
delayedViaRateLimit *time.Duration
8383
}
8484

85-
func (item *queueItem) String() string {
86-
return fmt.Sprintf("<plannedToStartWorkAt:%s key: %s>", item.plannedToStartWorkAt.String(), item.key)
85+
func (item *queueItem[K]) String() string {
86+
return fmt.Sprintf("<plannedToStartWorkAt:%s key: %+v>", item.plannedToStartWorkAt.String(), item.key)
8787
}
8888

8989
// New creates a queue
9090
//
9191
// It expects to get a item rate limiter, and a friendly name which is used in logs, and in the internal kubernetes
9292
// metrics. If retryFunc is nil, the default retry function.
93-
func New(ratelimiter workqueue.TypedRateLimiter[any], name string, handler ItemHandler, retryFunc ShouldRetryFunc) *Queue {
93+
func New[K comparable](ratelimiter workqueue.TypedRateLimiter[any], name string, handler ItemHandler[K], retryFunc ShouldRetryFunc[K]) *Queue[K] {
9494
if retryFunc == nil {
9595
retryFunc = DefaultRetryFunc
9696
}
97-
return &Queue{
97+
return &Queue[K]{
9898
clock: clock.RealClock{},
9999
name: name,
100100
ratelimiter: ratelimiter,
101101
items: list.New(),
102-
itemsBeingProcessed: make(map[string]*queueItem),
103-
itemsInQueue: make(map[string]*list.Element),
102+
itemsBeingProcessed: make(map[K]*queueItem[K]),
103+
itemsInQueue: make(map[K]*list.Element),
104104
handler: handler,
105105
wakeupCh: make(chan struct{}, 1),
106106
waitForNextItemSemaphore: semaphore.NewWeighted(1),
@@ -109,23 +109,23 @@ func New(ratelimiter workqueue.TypedRateLimiter[any], name string, handler ItemH
109109
}
110110

111111
// Enqueue enqueues the key in a rate limited fashion
112-
func (q *Queue) Enqueue(ctx context.Context, key string) {
112+
func (q *Queue[K]) Enqueue(ctx context.Context, key K) {
113113
q.lock.Lock()
114114
defer q.lock.Unlock()
115115

116116
q.insert(ctx, key, true, nil)
117117
}
118118

119119
// EnqueueWithoutRateLimit enqueues the key without a rate limit
120-
func (q *Queue) EnqueueWithoutRateLimit(ctx context.Context, key string) {
120+
func (q *Queue[K]) EnqueueWithoutRateLimit(ctx context.Context, key K) {
121121
q.lock.Lock()
122122
defer q.lock.Unlock()
123123

124124
q.insert(ctx, key, false, nil)
125125
}
126126

127127
// Forget forgets the key
128-
func (q *Queue) Forget(ctx context.Context, key string) {
128+
func (q *Queue[K]) Forget(ctx context.Context, key K) {
129129
q.lock.Lock()
130130
defer q.lock.Unlock()
131131
ctx, span := trace.StartSpan(ctx, "Forget")
@@ -164,7 +164,7 @@ func durationDeref(duration *time.Duration, def time.Duration) time.Duration {
164164
// If ratelimit is specified, and delay is nil, then the ratelimiter's delay (return from When function) will be used
165165
// If ratelimit is specified, and the delay is non-nil, then the delay value will be used
166166
// If ratelimit is false, then only delay is used to schedule the work. If delay is nil, it will be considered 0.
167-
func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay *time.Duration) *queueItem {
167+
func (q *Queue[K]) insert(ctx context.Context, key K, ratelimit bool, delay *time.Duration) *queueItem[K] {
168168
ctx, span := trace.StartSpan(ctx, "insert")
169169
defer span.End()
170170

@@ -205,15 +205,15 @@ func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay *t
205205
// Is the item already in the queue?
206206
if item, ok := q.itemsInQueue[key]; ok {
207207
span.WithField(ctx, "status", "itemsInQueue")
208-
qi := item.Value.(*queueItem)
208+
qi := item.Value.(*queueItem[K])
209209
when := q.clock.Now().Add(durationDeref(delay, 0))
210210
q.adjustPosition(qi, item, when)
211211
return qi
212212
}
213213

214214
span.WithField(ctx, "status", "added")
215215
now := q.clock.Now()
216-
val := &queueItem{
216+
val := &queueItem[K]{
217217
key: key,
218218
plannedToStartWorkAt: now,
219219
originallyAdded: now,
@@ -233,7 +233,7 @@ func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay *t
233233
}
234234

235235
for item := q.items.Back(); item != nil; item = item.Prev() {
236-
qi := item.Value.(*queueItem)
236+
qi := item.Value.(*queueItem[K])
237237
if qi.plannedToStartWorkAt.Before(val.plannedToStartWorkAt) {
238238
q.itemsInQueue[key] = q.items.InsertAfter(val, item)
239239
return val
@@ -244,15 +244,15 @@ func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay *t
244244
return val
245245
}
246246

247-
func (q *Queue) adjustPosition(qi *queueItem, element *list.Element, when time.Time) {
247+
func (q *Queue[K]) adjustPosition(qi *queueItem[K], element *list.Element, when time.Time) {
248248
if when.After(qi.plannedToStartWorkAt) {
249249
// The item has already been delayed appropriately
250250
return
251251
}
252252

253253
qi.plannedToStartWorkAt = when
254254
for prev := element.Prev(); prev != nil; prev = prev.Prev() {
255-
item := prev.Value.(*queueItem)
255+
item := prev.Value.(*queueItem[K])
256256
// does this item plan to start work *before* the new time? If so add it
257257
if item.plannedToStartWorkAt.Before(when) {
258258
q.items.MoveAfter(element, prev)
@@ -264,7 +264,7 @@ func (q *Queue) adjustPosition(qi *queueItem, element *list.Element, when time.T
264264
}
265265

266266
// EnqueueWithoutRateLimitWithDelay enqueues without rate limiting, but work will not start for this given delay period
267-
func (q *Queue) EnqueueWithoutRateLimitWithDelay(ctx context.Context, key string, after time.Duration) {
267+
func (q *Queue[K]) EnqueueWithoutRateLimitWithDelay(ctx context.Context, key K, after time.Duration) {
268268
q.lock.Lock()
269269
defer q.lock.Unlock()
270270
q.insert(ctx, key, false, &after)
@@ -273,12 +273,12 @@ func (q *Queue) EnqueueWithoutRateLimitWithDelay(ctx context.Context, key string
273273
// Empty returns if the queue has no items in it
274274
//
275275
// It should only be used for debugging.
276-
func (q *Queue) Empty() bool {
276+
func (q *Queue[K]) Empty() bool {
277277
return q.Len() == 0
278278
}
279279

280280
// Len includes items that are in the queue, and are being processed
281-
func (q *Queue) Len() int {
281+
func (q *Queue[K]) Len() int {
282282
q.lock.Lock()
283283
defer q.lock.Unlock()
284284
if q.items.Len() != len(q.itemsInQueue) {
@@ -289,7 +289,7 @@ func (q *Queue) Len() int {
289289
}
290290

291291
// UnprocessedLen returns the count of items yet to be processed in the queue
292-
func (q *Queue) UnprocessedLen() int {
292+
func (q *Queue[K]) UnprocessedLen() int {
293293
q.lock.Lock()
294294
defer q.lock.Unlock()
295295
if q.items.Len() != len(q.itemsInQueue) {
@@ -300,7 +300,7 @@ func (q *Queue) UnprocessedLen() int {
300300
}
301301

302302
// ProcessedLen returns the count items that are being processed
303-
func (q *Queue) ItemsBeingProcessedLen() int {
303+
func (q *Queue[K]) ItemsBeingProcessedLen() int {
304304
q.lock.Lock()
305305
defer q.lock.Unlock()
306306
return len(q.itemsBeingProcessed)
@@ -309,7 +309,7 @@ func (q *Queue) ItemsBeingProcessedLen() int {
309309
// Run starts the workers
310310
//
311311
// It blocks until context is cancelled, and all of the workers exit.
312-
func (q *Queue) Run(ctx context.Context, workers int) {
312+
func (q *Queue[K]) Run(ctx context.Context, workers int) {
313313
if workers <= 0 {
314314
panic(fmt.Sprintf("Workers must be greater than 0, got: %d", workers))
315315
}
@@ -342,7 +342,7 @@ func (q *Queue) Run(ctx context.Context, workers int) {
342342
<-ctx.Done()
343343
}
344344

345-
func (q *Queue) worker(ctx context.Context, i int) {
345+
func (q *Queue[K]) worker(ctx context.Context, i int) {
346346
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(map[string]interface{}{
347347
"workerId": i,
348348
"queue": q.name,
@@ -351,7 +351,7 @@ func (q *Queue) worker(ctx context.Context, i int) {
351351
}
352352
}
353353

354-
func (q *Queue) getNextItem(ctx context.Context) (*queueItem, error) {
354+
func (q *Queue[K]) getNextItem(ctx context.Context) (*queueItem[K], error) {
355355
if err := q.waitForNextItemSemaphore.Acquire(ctx, 1); err != nil {
356356
return nil, err
357357
}
@@ -369,7 +369,7 @@ func (q *Queue) getNextItem(ctx context.Context) (*queueItem, error) {
369369
case <-q.wakeupCh:
370370
}
371371
} else {
372-
qi := element.Value.(*queueItem)
372+
qi := element.Value.(*queueItem[K])
373373
timeUntilProcessing := time.Until(qi.plannedToStartWorkAt)
374374

375375
// Do we need to sleep? If not, let's party.
@@ -402,7 +402,7 @@ func (q *Queue) getNextItem(ctx context.Context) (*queueItem, error) {
402402
// handleQueueItem handles a single item
403403
//
404404
// A return value of "false" indicates that further processing should be stopped.
405-
func (q *Queue) handleQueueItem(ctx context.Context) bool {
405+
func (q *Queue[K]) handleQueueItem(ctx context.Context) bool {
406406
ctx, span := trace.StartSpan(ctx, "handleQueueItem")
407407
defer span.End()
408408

@@ -412,8 +412,7 @@ func (q *Queue) handleQueueItem(ctx context.Context) bool {
412412
return false
413413
}
414414

415-
// We expect strings to come off the work Queue.
416-
// These are of the form namespace/name.
415+
// We expect pod identifiers (namespace/name, UID) to come off the work Queue.
417416
// We do this as the delayed nature of the work Queue means the items in the informer cache may actually be more u
418417
// to date that when the item was initially put onto the workqueue.
419418
ctx = span.WithField(ctx, "key", qi.key)
@@ -431,7 +430,7 @@ func (q *Queue) handleQueueItem(ctx context.Context) bool {
431430
return true
432431
}
433432

434-
func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error {
433+
func (q *Queue[K]) handleQueueItemObject(ctx context.Context, qi *queueItem[K]) error {
435434
// This is a separate function / span, because the handleQueueItem span is the time spent waiting for the object
436435
// plus the time spend handling the object. Instead, this function / span is scoped to a single object.
437436
ctx, span := trace.StartSpan(ctx, "handleQueueItemObject")
@@ -459,7 +458,7 @@ func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error
459458
delete(q.itemsBeingProcessed, qi.key)
460459
if qi.forget {
461460
q.ratelimiter.Forget(qi.key)
462-
log.G(ctx).WithError(err).Warnf("forgetting %q as told to forget while in progress", qi.key)
461+
log.G(ctx).WithError(err).Warnf("forgetting %+v as told to forget while in progress", qi.key)
463462
return nil
464463
}
465464

@@ -472,17 +471,17 @@ func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error
472471
delay, err = q.retryFunc(ctx, qi.key, qi.requeues+1, qi.originallyAdded, err)
473472
if err == nil {
474473
// Put the item back on the work Queue to handle any transient errors.
475-
log.G(ctx).WithError(originalError).Warnf("requeuing %q due to failed sync", qi.key)
474+
log.G(ctx).WithError(originalError).Warnf("requeuing %+v due to failed sync", qi.key)
476475
newQI := q.insert(ctx, qi.key, true, delay)
477476
newQI.requeues = qi.requeues + 1
478477
newQI.originallyAdded = qi.originallyAdded
479478

480479
return nil
481480
}
482481
if !qi.redirtiedAt.IsZero() {
483-
err = fmt.Errorf("temporarily (requeued) forgetting %q due to: %w", qi.key, err)
482+
err = fmt.Errorf("temporarily (requeued) forgetting %+v due to: %w", qi.key, err)
484483
} else {
485-
err = fmt.Errorf("forgetting %q due to: %w", qi.key, err)
484+
err = fmt.Errorf("forgetting %+v due to: %w", qi.key, err)
486485
}
487486
}
488487

@@ -498,20 +497,20 @@ func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error
498497
return err
499498
}
500499

501-
func (q *Queue) String() string {
500+
func (q *Queue[K]) String() string {
502501
q.lock.Lock()
503502
defer q.lock.Unlock()
504503

505504
items := make([]string, 0, q.items.Len())
506505

507506
for next := q.items.Front(); next != nil; next = next.Next() {
508-
items = append(items, next.Value.(*queueItem).String())
507+
items = append(items, next.Value.(*queueItem[K]).String())
509508
}
510509
return fmt.Sprintf("<items:%s>", items)
511510
}
512511

513512
// DefaultRetryFunc is the default function used for retries by the queue subsystem.
514-
func DefaultRetryFunc(ctx context.Context, key string, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error) {
513+
func DefaultRetryFunc[K any](ctx context.Context, key K, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error) {
515514
if timesTried < MaxRetries {
516515
return nil, nil
517516
}

internal/queue/queue_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func TestQueueRedirty(t *testing.T) {
229229
defer cancel()
230230

231231
var times int64
232-
var q *Queue
232+
var q *Queue[string]
233233
q = New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
234234
assert.Assert(t, is.Equal(key, "foo"))
235235
if atomic.AddInt64(&times, 1) == 1 {
@@ -279,13 +279,13 @@ func TestHeapConcurrency(t *testing.T) {
279279
assert.Assert(t, time.Since(start) < 5*time.Second)
280280
}
281281

282-
func checkConsistency(t *testing.T, q *Queue) {
282+
func checkConsistency(t *testing.T, q *Queue[string]) {
283283
q.lock.Lock()
284284
defer q.lock.Unlock()
285285

286286
for next := q.items.Front(); next != nil && next.Next() != nil; next = next.Next() {
287-
qi := next.Value.(*queueItem)
288-
qiNext := next.Next().Value.(*queueItem)
287+
qi := next.Value.(*queueItem[string])
288+
qiNext := next.Next().Value.(*queueItem[string])
289289
assert.Assert(t, qi.plannedToStartWorkAt.Before(qiNext.plannedToStartWorkAt) || qi.plannedToStartWorkAt.Equal(qiNext.plannedToStartWorkAt))
290290
}
291291
}
@@ -420,7 +420,7 @@ func TestQueueForgetInProgress(t *testing.T) {
420420
defer cancel()
421421

422422
var times int64
423-
var q *Queue
423+
var q *Queue[string]
424424
q = New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
425425
assert.Assert(t, is.Equal(key, "foo"))
426426
atomic.AddInt64(&times, 1)

node/podcontroller.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,13 @@ type PodController struct {
111111

112112
resourceManager *manager.ResourceManager
113113

114-
syncPodsFromKubernetes *queue.Queue
114+
syncPodsFromKubernetes *queue.Queue[string]
115115

116116
// deletePodsFromKubernetes is a queue on which pods are reconciled, and we check if pods are in API server after
117117
// the grace period
118-
deletePodsFromKubernetes *queue.Queue
118+
deletePodsFromKubernetes *queue.Queue[string]
119119

120-
syncPodStatusFromProvider *queue.Queue
120+
syncPodStatusFromProvider *queue.Queue[string]
121121

122122
// From the time of creation, to termination the knownPods map will contain the pods key
123123
// (derived from Kubernetes' cache library) -> a *knownPod struct.

node/queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ import (
2424
//
2525
// If the delay is negative, the item will be scheduled "earlier" than now. This will result in the item being executed
2626
// earlier than other items in the FIFO work order.
27-
type ShouldRetryFunc = queue.ShouldRetryFunc
27+
type ShouldRetryFunc = queue.ShouldRetryFunc[string]
2828

2929
// DefaultRetryFunc is the default function used for retries by the queue subsystem. Its only policy is that it gives up
3030
// after MaxRetries, and falls back to the rate limiter for all other retries.
31-
var DefaultRetryFunc = queue.DefaultRetryFunc
31+
var DefaultRetryFunc = queue.DefaultRetryFunc[string]
3232

3333
// MaxRetries is the number of times we try to process a given key before permanently forgetting it.
3434
var MaxRetries = queue.MaxRetries

0 commit comments

Comments
 (0)