Skip to content

Commit cd6d274

Browse files
authored
Merge branch 'main' into fix/allow-longer-UT-IT-timeout
2 parents 6df3da7 + aa55878 commit cd6d274

File tree

12 files changed

+570
-133
lines changed

12 files changed

+570
-133
lines changed

cmd/hubagent/workload/setup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
358358
defaultProfile := profile.NewDefaultProfile()
359359
defaultFramework := framework.NewFramework(defaultProfile, mgr)
360360
defaultSchedulingQueue := queue.NewSimplePlacementSchedulingQueue(
361-
queue.WithName(schedulerQueueName),
361+
schedulerQueueName, nil,
362362
)
363363
// we use one scheduler for every 10 concurrent placement
364364
defaultScheduler := scheduler.NewScheduler("DefaultScheduler", defaultFramework, defaultSchedulingQueue, mgr,

pkg/scheduler/queue/batched.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package queue
18+
19+
import (
20+
"fmt"
21+
"time"
22+
23+
"k8s.io/client-go/util/workqueue"
24+
)
25+
26+
const (
27+
maxNumberOfKeysToMoveFromBatchedToActiveQueuePerGo = 20000
28+
)
29+
30+
// batchedProcessingPlacementSchedulingQueue implements the PlacementSchedulingQueue
31+
// interface.
32+
//
33+
// It consists of two work queues to allow processing for both immediate and batched
34+
// processing for scheduling related events (changes) of different responsiveness levels.
35+
type batchedProcessingPlacementSchedulingQueue struct {
36+
active workqueue.TypedRateLimitingInterface[any]
37+
batched workqueue.TypedRateLimitingInterface[any]
38+
39+
moveNow chan struct{}
40+
movePeriodSeconds int32
41+
}
42+
43+
// Verify that batchedProcessingPlacementSchedulingQueue implements
44+
// PlacementSchedulingQueue at compile time.
45+
var _ PlacementSchedulingQueue = &batchedProcessingPlacementSchedulingQueue{}
46+
47+
// batchedProcessingPlacementSchedulingQueueOptions are the options for the
48+
// batchedProcessingPlacementSchedulingQueue.
49+
type batchedProcessingPlacementSchedulingQueueOptions struct {
50+
activeQueueRateLimiter workqueue.TypedRateLimiter[any]
51+
batchedQueueRateLimiter workqueue.TypedRateLimiter[any]
52+
name string
53+
movePeriodSeconds int32
54+
}
55+
56+
var defaultBatchedProcessingPlacementSchedulingQueueOptions = batchedProcessingPlacementSchedulingQueueOptions{
57+
activeQueueRateLimiter: workqueue.DefaultTypedControllerRateLimiter[any](),
58+
batchedQueueRateLimiter: workqueue.DefaultTypedControllerRateLimiter[any](),
59+
name: "batchedProcessingPlacementSchedulingQueue",
60+
movePeriodSeconds: int32(300), // 5 minutes
61+
}
62+
63+
// Close shuts down the scheduling queue immediately.
64+
//
65+
// Note that items remaining in the active queue might not get processed any more, and items
66+
// left in the batched queue might not be moved to the active queue any more either.
67+
func (bq *batchedProcessingPlacementSchedulingQueue) Close() {
68+
// Signal the mover goroutine to exit.
69+
//
70+
// Note that this will trigger the mover goroutine to attempt another key move, but the
71+
// active queue might not be able to accept the key any more (which is OK and does not
72+
// result in an error).
73+
close(bq.moveNow)
74+
75+
bq.batched.ShutDown()
76+
bq.active.ShutDown()
77+
}
78+
79+
// CloseWithDrain shuts down the scheduling queue and returns until:
80+
// a) all the items in the batched queue have been moved to the active queue; and
81+
// b) all the items in the active queue have been processed.
82+
func (bq *batchedProcessingPlacementSchedulingQueue) CloseWithDrain() {
83+
// Signal that all items in the batched queue should be moved to the active queue right away.
84+
close(bq.moveNow)
85+
86+
// Wait until all the items in the moving process from the batched queue to the active queue have completed
87+
// their moves.
88+
bq.batched.ShutDownWithDrain()
89+
// Wait until all the items that are currently being processed by the scheduler to finish.
90+
bq.active.ShutDownWithDrain()
91+
}
92+
93+
// NextPlacementKey returns the next PlacementKey (either clusterResourcePlacementKey or resourcePlacementKey)
94+
// in the work queue for the scheduler to process.
95+
func (bq *batchedProcessingPlacementSchedulingQueue) NextPlacementKey() (key PlacementKey, closed bool) {
96+
// This will block on a condition variable if the queue is empty.
97+
placementKey, shutdown := bq.active.Get()
98+
if shutdown {
99+
return "", true
100+
}
101+
return placementKey.(PlacementKey), false
102+
}
103+
104+
// Done marks a PlacementKey as done.
105+
func (bq *batchedProcessingPlacementSchedulingQueue) Done(placementKey PlacementKey) {
106+
bq.active.Done(placementKey)
107+
// The keys in the batched queue are marked as done as soon as they are moved to the active queue.
108+
}
109+
110+
// Add adds a PlacementKey to the work queue for immediate processing.
111+
//
112+
// Note that this bypasses the rate limiter (if any).
113+
func (bq *batchedProcessingPlacementSchedulingQueue) Add(placementKey PlacementKey) {
114+
bq.active.Add(placementKey)
115+
}
116+
117+
// AddAfter adds a PlacementKey to the work queue after a set duration for immediate processing.
118+
//
119+
// Note that this bypasses the rate limiter (if any).
120+
func (bq *batchedProcessingPlacementSchedulingQueue) AddAfter(placementKey PlacementKey, duration time.Duration) {
121+
bq.active.AddAfter(placementKey, duration)
122+
}
123+
124+
// AddRateLimited adds a PlacementKey to the work queue after the rate limiter (if any)
125+
// says that it is OK, for immediate processing.
126+
func (bq *batchedProcessingPlacementSchedulingQueue) AddRateLimited(placementKey PlacementKey) {
127+
bq.active.AddRateLimited(placementKey)
128+
}
129+
130+
// Forget untracks a PlacementKey from rate limiter(s) (if any) set up with the queue.
131+
func (bq *batchedProcessingPlacementSchedulingQueue) Forget(placementKey PlacementKey) {
132+
bq.active.Forget(placementKey)
133+
// The keys in the batched queue are forgotten as soon as they are moved to the active queue.
134+
}
135+
136+
// AddBatched tracks a PlacementKey and adds such keys in batch later to the work queue when appropriate.
137+
func (bq *batchedProcessingPlacementSchedulingQueue) AddBatched(placementKey PlacementKey) {
138+
bq.batched.Add(placementKey)
139+
}
140+
141+
// Run starts the scheduling queue.
142+
func (bq *batchedProcessingPlacementSchedulingQueue) Run() {
143+
// Spin up a goroutine to move items periodically from the batched queue to the active queue.
144+
go func() {
145+
timer := time.NewTimer(time.Duration(bq.movePeriodSeconds) * time.Second)
146+
for {
147+
select {
148+
case _, closed := <-bq.moveNow:
149+
if closed && bq.batched.ShuttingDown() {
150+
// The batched queue has been shut down, and the moveNow channel has been closed;
151+
// now it is safe to assume that after moving all the items from the batched queue to the active queue
152+
// this time, the batched queue will be drained.
153+
bq.moveAllBatchedItemsToActiveQueue()
154+
return
155+
}
156+
157+
// The batched queue might still be running; move all items and re-enter the loop.
158+
bq.moveAllBatchedItemsToActiveQueue()
159+
case <-timer.C:
160+
// The timer has fired; move all items.
161+
bq.moveAllBatchedItemsToActiveQueue()
162+
}
163+
164+
// Reset the timer for the next round.
165+
timer.Reset(time.Duration(bq.movePeriodSeconds) * time.Second)
166+
}
167+
}()
168+
}
169+
170+
func (bq *batchedProcessingPlacementSchedulingQueue) moveAllBatchedItemsToActiveQueue() {
171+
keysToMove := []PlacementKey{}
172+
173+
for bq.batched.Len() > 0 {
174+
// Note that the batched queue is an internal object and is only read here by the scheduling queue
175+
// itself (i.e., the batched queue has only one reader, though there might be multiple writers);
176+
// consequently, if the Len() > 0 check passes, the subsequent Get() call is guaranteed to return
177+
// an item (i.e., the call will not block). For simplicity reasons we do not do additional
178+
// sanity checks here.
179+
placementKey, shutdown := bq.batched.Get()
180+
if shutdown {
181+
break
182+
}
183+
keysToMove = append(keysToMove, placementKey.(PlacementKey))
184+
185+
if len(keysToMove) > maxNumberOfKeysToMoveFromBatchedToActiveQueuePerGo {
186+
// The keys popped from the batched queue are not yet added to the active queue, in other words,
187+
// they are not yet marked as done; the batched queue will still track them and adding them
188+
// to the batched queue again at this moment will not trigger the batched queue to yield the same
189+
// keys again. This implies that the at maximum we will be moving a number of keys equal to
190+
// the number of placement objects in the system at a time, which should be a finite number.
191+
// Still, to be on the safer side here KubeFleet sets a cap the number of keys to move per go.
192+
break
193+
}
194+
}
195+
196+
for _, key := range keysToMove {
197+
// Mark the keys as done in the batched queue and add the keys to the active queue in batch. Here the
198+
// implementation keeps the keys in memory first and does not move keys right after they are popped as
199+
// this pattern risks synchronized processing (i.e., a key is popped from the batched queue, immeidiately added to the
200+
// active queue and gets marked as done by the scheduler, then added back to the batched queue again by
201+
// one of the watchers before the key moving attempt is finished, which results in perpetual key moving).
202+
bq.active.Add(key)
203+
bq.batched.Done(key)
204+
bq.batched.Forget(key)
205+
}
206+
}
207+
208+
// NewBatchedProcessingPlacementSchedulingQueue returns a batchedProcessingPlacementSchedulingQueue.
209+
func NewBatchedProcessingPlacementSchedulingQueue(name string, activeQRateLimiter, batchedQRateLimiter workqueue.TypedRateLimiter[any], movePeriodSeconds int32) PlacementSchedulingQueue {
210+
if len(name) == 0 {
211+
name = defaultBatchedProcessingPlacementSchedulingQueueOptions.name
212+
}
213+
if activeQRateLimiter == nil {
214+
activeQRateLimiter = defaultBatchedProcessingPlacementSchedulingQueueOptions.activeQueueRateLimiter
215+
}
216+
if batchedQRateLimiter == nil {
217+
batchedQRateLimiter = defaultBatchedProcessingPlacementSchedulingQueueOptions.batchedQueueRateLimiter
218+
}
219+
if movePeriodSeconds <= 0 {
220+
movePeriodSeconds = defaultBatchedProcessingPlacementSchedulingQueueOptions.movePeriodSeconds
221+
}
222+
223+
return &batchedProcessingPlacementSchedulingQueue{
224+
active: workqueue.NewTypedRateLimitingQueueWithConfig(activeQRateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{
225+
Name: fmt.Sprintf("%s_Active", name),
226+
}),
227+
batched: workqueue.NewTypedRateLimitingQueueWithConfig(batchedQRateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{
228+
Name: fmt.Sprintf("%s_Batched", name),
229+
}),
230+
moveNow: make(chan struct{}),
231+
movePeriodSeconds: movePeriodSeconds,
232+
}
233+
}

0 commit comments

Comments
 (0)