Skip to content

Commit 0c8919c

Browse files
committed
Added implementation for queues with batched processing support
Signed-off-by: michaelawyu <[email protected]>
1 parent 83bc664 commit 0c8919c

File tree

5 files changed

+556
-126
lines changed

5 files changed

+556
-126
lines changed

pkg/scheduler/queue/batched.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package queue
18+
19+
import (
20+
"fmt"
21+
"time"
22+
23+
"k8s.io/client-go/util/workqueue"
24+
)
25+
26+
const (
27+
maxNumberOfKeysToMoveFromBatchedToActiveQueuePerGo = 20000
28+
)
29+
30+
// batchedProcessingPlacementSchedulingQueue implements the PlacementSchedulingQueue
31+
// interface.
32+
//
33+
// It consists of two work queues to allow processing for both immediate and batched
34+
// processing for scheduling related events (changes) of different responsiveness levels.
35+
type batchedProcessingPlacementSchedulingQueue struct {
36+
active workqueue.TypedRateLimitingInterface[any]
37+
batched workqueue.TypedRateLimitingInterface[any]
38+
39+
moveNow chan struct{}
40+
movePeriodSeconds int32
41+
}
42+
43+
// Verify that batchedProcessingPlacementSchedulingQueue implements
44+
// PlacementSchedulingQueue at compile time.
45+
var _ PlacementSchedulingQueue = &batchedProcessingPlacementSchedulingQueue{}
46+
47+
// batchedProcessingPlacementSchedulingQueueOptions are the options for the
48+
// batchedProcessingPlacementSchedulingQueue.
49+
type batchedProcessingPlacementSchedulingQueueOptions struct {
50+
activeQueueRateLimiter workqueue.TypedRateLimiter[any]
51+
batchedQueueRateLimiter workqueue.TypedRateLimiter[any]
52+
name string
53+
movePeriodSeconds int32
54+
}
55+
56+
var defaultBatchedProcessingPlacementSchedulingQueueOptions = batchedProcessingPlacementSchedulingQueueOptions{
57+
activeQueueRateLimiter: workqueue.DefaultTypedControllerRateLimiter[any](),
58+
batchedQueueRateLimiter: workqueue.DefaultTypedControllerRateLimiter[any](),
59+
name: "batchedProcessingPlacementSchedulingQueue",
60+
movePeriodSeconds: int32(300), // 5 minutes
61+
}
62+
63+
// Close shuts down the scheduling queue immediately.
64+
//
65+
// Note that items remaining in the active queue might not get processed any more, and items
66+
// left in the batched queue might not be moved to the active queue any more either.
67+
func (bq *batchedProcessingPlacementSchedulingQueue) Close() {
68+
// Signal the mover goroutine to exit.
69+
//
70+
// Note that this will trigger the mover goroutine to attempt another key move, but the
71+
// active queue will not be able to accept the key any more.
72+
close(bq.moveNow)
73+
74+
bq.batched.ShutDown()
75+
bq.active.ShutDown()
76+
}
77+
78+
// CloseWithDrain shuts down the scheduling queue and returns until:
79+
// a) all the items in the batched queue have been moved to the active queue; and
80+
// b) all the items in the active queue have been processed.
81+
func (bq *batchedProcessingPlacementSchedulingQueue) CloseWithDrain() {
82+
// Signal that all items in the batched queue should be moved to the active queue right away.
83+
close(bq.moveNow)
84+
85+
// Wait until all the items in the batched queue have been processed..
86+
bq.batched.ShutDownWithDrain()
87+
// Wait until all the items in the active queue have been processed.
88+
bq.active.ShutDownWithDrain()
89+
}
90+
91+
// NextPlacementKey returns the next PlacementKey (either clusterResourcePlacementKey or resourcePlacementKey)
92+
// in the work queue for the scheduler to process.
93+
func (bq *batchedProcessingPlacementSchedulingQueue) NextPlacementKey() (key PlacementKey, closed bool) {
94+
// This will block on a condition variable if the queue is empty.
95+
placementKey, shutdown := bq.active.Get()
96+
if shutdown {
97+
return "", true
98+
}
99+
return placementKey.(PlacementKey), false
100+
}
101+
102+
// Done marks a PlacementKey as done.
103+
func (bq *batchedProcessingPlacementSchedulingQueue) Done(placementKey PlacementKey) {
104+
bq.active.Done(placementKey)
105+
// It is OK for Done to be called on the batched queue even if the item has never been
106+
// added to the batched queue before. In this case the call is simply a no-op.
107+
bq.batched.Done(placementKey)
108+
}
109+
110+
// Add adds a PlacementKey to the work queue for immediate processing.
111+
//
112+
// Note that this bypasses the rate limiter (if any).
113+
func (bq *batchedProcessingPlacementSchedulingQueue) Add(placementKey PlacementKey) {
114+
bq.active.Add(placementKey)
115+
}
116+
117+
// AddAfter adds a PlacementKey to the work queue after a set duration for immediate processing.
118+
//
119+
// Note that this bypasses the rate limiter (if any).
120+
func (bq *batchedProcessingPlacementSchedulingQueue) AddAfter(placementKey PlacementKey, duration time.Duration) {
121+
bq.active.AddAfter(placementKey, duration)
122+
}
123+
124+
// AddRateLimited adds a PlacementKey to the work queue after the rate limiter (if any)
125+
// says that it is OK, for immediate processing.
126+
func (bq *batchedProcessingPlacementSchedulingQueue) AddRateLimited(placementKey PlacementKey) {
127+
bq.active.AddRateLimited(placementKey)
128+
}
129+
130+
// Forget untracks a PlacementKey from rate limiter(s) (if any) set up with the queue.
131+
func (bq *batchedProcessingPlacementSchedulingQueue) Forget(placementKey PlacementKey) {
132+
bq.active.Forget(placementKey)
133+
// It is OK for Forget to be called on the batched queue even if the item has never been
134+
// added to the batched queue before. In this case the call is simply a no-op.
135+
bq.batched.Forget(placementKey)
136+
}
137+
138+
// AddBatched tracks a PlacementKey and adds such keys in batch later to the work queue when appropriate.
139+
func (bq *batchedProcessingPlacementSchedulingQueue) AddBatched(placementKey PlacementKey) {
140+
bq.batched.Add(placementKey)
141+
}
142+
143+
// Run starts the scheduling queue.
144+
func (bq *batchedProcessingPlacementSchedulingQueue) Run() {
145+
// Spin up a goroutine to move items periodically from the batched queue to the active queue.
146+
go func() {
147+
timer := time.NewTimer(time.Duration(bq.movePeriodSeconds) * time.Second)
148+
for {
149+
select {
150+
case _, closed := <-bq.moveNow:
151+
if closed && bq.batched.ShuttingDown() {
152+
// The batched queue has been shut down, and the moveNow channel has been closed;
153+
// now it is safe to assume that after moving all the items from the batched queue to the active queue
154+
// this time, the batched queue will be drained.
155+
bq.moveAllBatchedItemsToActiveQueue()
156+
return
157+
}
158+
159+
// The batched queue might still be running; move all items and re-enter the loop.
160+
bq.moveAllBatchedItemsToActiveQueue()
161+
case <-timer.C:
162+
// The timer has fired; move all items.
163+
bq.moveAllBatchedItemsToActiveQueue()
164+
}
165+
166+
// Reset the timer for the next round.
167+
timer.Reset(time.Duration(bq.movePeriodSeconds) * time.Second)
168+
}
169+
}()
170+
}
171+
172+
func (bq *batchedProcessingPlacementSchedulingQueue) moveAllBatchedItemsToActiveQueue() {
173+
keysToMove := []PlacementKey{}
174+
175+
for bq.batched.Len() > 0 {
176+
// Note that the batched queue is an internal object and is only read here by the scheduling queue
177+
// itself (i.e., the batched queue has only one reader, though there might be multiple writers);
178+
// consequently, if the Len() > 0 check passes, the subsequent Get() call is guaranteed to return
179+
// an item (i.e., the call will not block). For simplicity reasons we do not do additional
180+
// sanity checks here.
181+
placementKey, shutdown := bq.batched.Get()
182+
if shutdown {
183+
break
184+
}
185+
keysToMove = append(keysToMove, placementKey.(PlacementKey))
186+
187+
if len(keysToMove) > maxNumberOfKeysToMoveFromBatchedToActiveQueuePerGo {
188+
// The keys popped from the batched queue are not yet added to the active queue, in other words,
189+
// they are not yet marked as done; the batched queue will still track them and adding them
190+
// to the batched queue again at this moment will not trigger the batched queue to yield the same
191+
// keys again. This implies that the at maximum we will be movine a number of keys equal to
192+
// the number of placement objects in the system at a time, which should be a finite number.
193+
// Still, to be on the safer side here KubeFleet sets a cap the number of keys to move per go.
194+
break
195+
}
196+
}
197+
198+
for _, key := range keysToMove {
199+
// Add the keys to the active queue in batch. Here the implementation does not move keys one by one
200+
// right after they are popped as this pattern risks synchronized processing (i.e., a key is popped
201+
// from the batched queue, immeidiately added to the active queue and gets processed, then added
202+
// back to the batched queue again by one of the watchers before the key moving attempt is finished,
203+
// which results in perpetual key moving).
204+
bq.active.Add(key)
205+
}
206+
}
207+
208+
// NewBatchedProcessingPlacementSchedulingQueue returns a batchedProcessingPlacementSchedulingQueue.
209+
func NewBatchedProcessingPlacementSchedulingQueue(name string, activeQRateLimiter, batchedQRateLimiter workqueue.TypedRateLimiter[any], movePeriodSeconds int32) PlacementSchedulingQueue {
210+
if len(name) == 0 {
211+
name = defaultBatchedProcessingPlacementSchedulingQueueOptions.name
212+
}
213+
if activeQRateLimiter == nil {
214+
activeQRateLimiter = defaultBatchedProcessingPlacementSchedulingQueueOptions.activeQueueRateLimiter
215+
}
216+
if batchedQRateLimiter == nil {
217+
batchedQRateLimiter = defaultBatchedProcessingPlacementSchedulingQueueOptions.batchedQueueRateLimiter
218+
}
219+
if movePeriodSeconds <= 0 {
220+
movePeriodSeconds = defaultBatchedProcessingPlacementSchedulingQueueOptions.movePeriodSeconds
221+
}
222+
223+
return &batchedProcessingPlacementSchedulingQueue{
224+
active: workqueue.NewTypedRateLimitingQueueWithConfig(activeQRateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{
225+
Name: fmt.Sprintf("%s_Active", name),
226+
}),
227+
batched: workqueue.NewTypedRateLimitingQueueWithConfig(batchedQRateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{
228+
Name: fmt.Sprintf("%s_Batched", name),
229+
}),
230+
moveNow: make(chan struct{}),
231+
movePeriodSeconds: movePeriodSeconds,
232+
}
233+
}

0 commit comments

Comments
 (0)