Skip to content

Commit f7fef03

Browse files
authored
Merge pull request kubernetes#127944 from antoninbas/use-generics-for-delaying-queue-waitFor
Use generics for waitFor (delaying workqueue) in client-go
2 parents c5aa403 + 53ddffb commit f7fef03

File tree

5 files changed

+25
-25
lines changed

5 files changed

+25
-25
lines changed

staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T],
141141
clock: clock,
142142
heartbeat: clock.NewTicker(maxWait),
143143
stopCh: make(chan struct{}),
144-
waitingForAddCh: make(chan *waitFor, 1000),
144+
waitingForAddCh: make(chan *waitFor[T], 1000),
145145
metrics: newRetryMetrics(name, provider),
146146
}
147147

@@ -165,15 +165,15 @@ type delayingType[T comparable] struct {
165165
heartbeat clock.Ticker
166166

167167
// waitingForAddCh is a buffered channel that feeds waitingForAdd
168-
waitingForAddCh chan *waitFor
168+
waitingForAddCh chan *waitFor[T]
169169

170170
// metrics counts the number of retries
171171
metrics retryMetrics
172172
}
173173

174174
// waitFor holds the data to add and the time it should be added
175-
type waitFor struct {
176-
data t
175+
type waitFor[T any] struct {
176+
data T
177177
readyAt time.Time
178178
// index in the priority queue (heap)
179179
index int
@@ -187,32 +187,32 @@ type waitFor struct {
187187
// it has been removed from the queue and placed at index Len()-1 by
188188
// container/heap. Push adds an item at index Len(), and container/heap
189189
// percolates it into the correct location.
190-
type waitForPriorityQueue []*waitFor
190+
type waitForPriorityQueue[T any] []*waitFor[T]
191191

192-
func (pq waitForPriorityQueue) Len() int {
192+
func (pq waitForPriorityQueue[T]) Len() int {
193193
return len(pq)
194194
}
195-
func (pq waitForPriorityQueue) Less(i, j int) bool {
195+
func (pq waitForPriorityQueue[T]) Less(i, j int) bool {
196196
return pq[i].readyAt.Before(pq[j].readyAt)
197197
}
198-
func (pq waitForPriorityQueue) Swap(i, j int) {
198+
func (pq waitForPriorityQueue[T]) Swap(i, j int) {
199199
pq[i], pq[j] = pq[j], pq[i]
200200
pq[i].index = i
201201
pq[j].index = j
202202
}
203203

204204
// Push adds an item to the queue. Push should not be called directly; instead,
205205
// use `heap.Push`.
206-
func (pq *waitForPriorityQueue) Push(x interface{}) {
206+
func (pq *waitForPriorityQueue[T]) Push(x interface{}) {
207207
n := len(*pq)
208-
item := x.(*waitFor)
208+
item := x.(*waitFor[T])
209209
item.index = n
210210
*pq = append(*pq, item)
211211
}
212212

213213
// Pop removes an item from the queue. Pop should not be called directly;
214214
// instead, use `heap.Pop`.
215-
func (pq *waitForPriorityQueue) Pop() interface{} {
215+
func (pq *waitForPriorityQueue[T]) Pop() interface{} {
216216
n := len(*pq)
217217
item := (*pq)[n-1]
218218
item.index = -1
@@ -222,7 +222,7 @@ func (pq *waitForPriorityQueue) Pop() interface{} {
222222

223223
// Peek returns the item at the beginning of the queue, without removing the
224224
// item or otherwise mutating the queue. It is safe to call directly.
225-
func (pq waitForPriorityQueue) Peek() interface{} {
225+
func (pq waitForPriorityQueue[T]) Peek() interface{} {
226226
return pq[0]
227227
}
228228

@@ -254,7 +254,7 @@ func (q *delayingType[T]) AddAfter(item T, duration time.Duration) {
254254
select {
255255
case <-q.stopCh:
256256
// unblock if ShutDown() is called
257-
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
257+
case q.waitingForAddCh <- &waitFor[T]{data: item, readyAt: q.clock.Now().Add(duration)}:
258258
}
259259
}
260260

@@ -273,10 +273,10 @@ func (q *delayingType[T]) waitingLoop() {
273273
// Make a timer that expires when the item at the head of the waiting queue is ready
274274
var nextReadyAtTimer clock.Timer
275275

276-
waitingForQueue := &waitForPriorityQueue{}
276+
waitingForQueue := &waitForPriorityQueue[T]{}
277277
heap.Init(waitingForQueue)
278278

279-
waitingEntryByData := map[t]*waitFor{}
279+
waitingEntryByData := map[T]*waitFor[T]{}
280280

281281
for {
282282
if q.TypedInterface.ShuttingDown() {
@@ -287,13 +287,13 @@ func (q *delayingType[T]) waitingLoop() {
287287

288288
// Add ready entries
289289
for waitingForQueue.Len() > 0 {
290-
entry := waitingForQueue.Peek().(*waitFor)
290+
entry := waitingForQueue.Peek().(*waitFor[T])
291291
if entry.readyAt.After(now) {
292292
break
293293
}
294294

295-
entry = heap.Pop(waitingForQueue).(*waitFor)
296-
q.Add(entry.data.(T))
295+
entry = heap.Pop(waitingForQueue).(*waitFor[T])
296+
q.Add(entry.data)
297297
delete(waitingEntryByData, entry.data)
298298
}
299299

@@ -303,7 +303,7 @@ func (q *delayingType[T]) waitingLoop() {
303303
if nextReadyAtTimer != nil {
304304
nextReadyAtTimer.Stop()
305305
}
306-
entry := waitingForQueue.Peek().(*waitFor)
306+
entry := waitingForQueue.Peek().(*waitFor[T])
307307
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
308308
nextReadyAt = nextReadyAtTimer.C()
309309
}
@@ -322,7 +322,7 @@ func (q *delayingType[T]) waitingLoop() {
322322
if waitEntry.readyAt.After(q.clock.Now()) {
323323
insert(waitingForQueue, waitingEntryByData, waitEntry)
324324
} else {
325-
q.Add(waitEntry.data.(T))
325+
q.Add(waitEntry.data)
326326
}
327327

328328
drained := false
@@ -332,7 +332,7 @@ func (q *delayingType[T]) waitingLoop() {
332332
if waitEntry.readyAt.After(q.clock.Now()) {
333333
insert(waitingForQueue, waitingEntryByData, waitEntry)
334334
} else {
335-
q.Add(waitEntry.data.(T))
335+
q.Add(waitEntry.data)
336336
}
337337
default:
338338
drained = true
@@ -343,7 +343,7 @@ func (q *delayingType[T]) waitingLoop() {
343343
}
344344

345345
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
346-
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
346+
func insert[T comparable](q *waitForPriorityQueue[T], knownEntries map[T]*waitFor[T], entry *waitFor[T]) {
347347
// if the entry already exists, update the time only if it would cause the item to be queued sooner
348348
existing, exists := knownEntries[entry.data]
349349
if exists {

staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func TestCopyShifting(t *testing.T) {
214214

215215
func BenchmarkDelayingQueue_AddAfter(b *testing.B) {
216216
fakeClock := testingclock.NewFakeClock(time.Now())
217-
q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
217+
q := NewTypedDelayingQueueWithConfig(TypedDelayingQueueConfig[string]{Clock: fakeClock})
218218

219219
// Add items
220220
for n := 0; n < b.N; n++ {

staging/src/k8s.io/client-go/util/workqueue/queue.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,6 @@ type Typed[t comparable] struct {
212212
}
213213

214214
type empty struct{}
215-
type t interface{}
216215
type set[t comparable] map[t]empty
217216

218217
func (s set[t]) has(item t) bool {

staging/src/k8s.io/client-go/util/workqueue/queue_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@ func BenchmarkQueue(b *testing.B) {
467467
for idx := range keys {
468468
keys[idx] = fmt.Sprintf("key-%d", idx)
469469
}
470+
b.ResetTimer()
470471
for i := 0; i < b.N; i++ {
471472
b.StopTimer()
472473
q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{})

staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestRateLimitingQueue(t *testing.T) {
3232
clock: fakeClock,
3333
heartbeat: fakeClock.NewTicker(maxWait),
3434
stopCh: make(chan struct{}),
35-
waitingForAddCh: make(chan *waitFor, 1000),
35+
waitingForAddCh: make(chan *waitFor[any], 1000),
3636
metrics: newRetryMetrics("", nil),
3737
}
3838
queue.TypedDelayingInterface = delayingQueue

0 commit comments

Comments
 (0)