Skip to content

Commit 26ea4e3

Browse files
author
Ryan Zhang
committed
test
Signed-off-by: Ryan Zhang <[email protected]>
1 parent 778c9ef commit 26ea4e3

File tree

3 files changed

+72
-78
lines changed

3 files changed

+72
-78
lines changed

pkg/scheduler/scheduler.go

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (s *Scheduler) scheduleOnce(ctx context.Context, worker int) {
103103
// Retrieve the next item (name of a placement) from the work queue.
104104
//
105105
// Note that this will block if no item is available.
106-
placementName, closed := s.queue.NextPlacementKey()
106+
placementKey, closed := s.queue.NextPlacementKey()
107107
if closed {
108108
// End the run immediately if the work queue has been closed.
109109
klog.InfoS("Work queue has been closed")
@@ -115,48 +115,47 @@ func (s *Scheduler) scheduleOnce(ctx context.Context, worker int) {
115115
//
116116
// Note that this will happen even if an error occurs. Should the key get requeued by Add()
117117
// during the call, it will be added to the queue after this call returns.
118-
s.queue.Done(placementName)
118+
s.queue.Done(placementKey)
119119
}()
120120

121121
// keep track of the number of active scheduling loop
122122
metrics.SchedulerActiveWorkers.WithLabelValues().Add(1)
123123
defer metrics.SchedulerActiveWorkers.WithLabelValues().Add(-1)
124124

125125
startTime := time.Now()
126-
placementRef := klog.KRef("", string(placementName))
127-
klog.V(2).InfoS("Schedule once started", "placement", placementRef, "worker", worker)
126+
klog.V(2).InfoS("Schedule once started", "placement", placementKey, "worker", worker)
128127
defer func() {
129128
// Note that the time spent on pulling keys from the work queue (and the time spent on waiting
130129
// for a key to arrive) is not counted here, as we cannot reliably distinguish between
131130
// system processing latencies and actual duration of placement absence.
132131
latency := time.Since(startTime).Milliseconds()
133-
klog.V(2).InfoS("Schedule once completed", "placement", placementRef, "latency", latency, "worker", worker)
132+
klog.V(2).InfoS("Schedule once completed", "placement", placementKey, "latency", latency, "worker", worker)
134133
}()
135134

136135
// Retrieve the placement object (either ClusterResourcePlacement or ResourcePlacement).
137-
placement, err := controller.FetchPlacementFromKey(ctx, s.client, placementName)
136+
placement, err := controller.FetchPlacementFromKey(ctx, s.client, placementKey)
138137
if err != nil {
139138
if apiErrors.IsNotFound(err) {
140139
// The placement has been gone before the scheduler gets a chance to
141140
// process it; normally this would not happen as sources would not enqueue any placement that
142141
// has been marked for deletion but does not have the scheduler cleanup finalizer to
143142
// the work queue. Such placements needs no further processing any way though, as the absence
144143
// of the cleanup finalizer implies that bindings derived from the placement are no longer present.
145-
klog.ErrorS(err, "placement is already deleted", "placement", placementRef)
144+
klog.ErrorS(err, "placement is already deleted", "placement", placementKey)
146145
return
147146
}
148147
if errors.Is(err, controller.ErrUnexpectedBehavior) {
149148
// The placement is in an unexpected state; this is a scheduler-side error, and
150149
// Note that this is a scheduler-side error, so it does not return an error to the caller.
151150
// Raise an alert for it.
152-
klog.ErrorS(err, "Placement is in an unexpected state", "placement", placementRef)
151+
klog.ErrorS(err, "Placement is in an unexpected state", "placement", placementKey)
153152
return
154153
}
155154
// Wrap the error for metrics; this method does not return an error.
156-
klog.ErrorS(controller.NewAPIServerError(true, err), "Failed to get placement", "placement", placementRef)
155+
klog.ErrorS(controller.NewAPIServerError(true, err), "Failed to get placement", "placement", placementKey)
157156

158157
// Requeue for later processing.
159-
s.queue.AddRateLimited(placementName)
158+
s.queue.AddRateLimited(placementKey)
160159
return
161160
}
162161

@@ -165,21 +164,21 @@ func (s *Scheduler) scheduleOnce(ctx context.Context, worker int) {
165164
// Use SchedulerCRPCleanupFinalizer consistently for all placement types
166165
if controllerutil.ContainsFinalizer(placement, fleetv1beta1.SchedulerCleanupFinalizer) {
167166
if err := s.cleanUpAllBindingsFor(ctx, placement); err != nil {
168-
klog.ErrorS(err, "Failed to clean up all bindings for placement", "placement", placementRef)
167+
klog.ErrorS(err, "Failed to clean up all bindings for placement", "placement", placementKey)
169168
if errors.Is(err, controller.ErrUnexpectedBehavior) {
170169
// The placement is in an unexpected state; this is a scheduler-side error, and
171170
return
172171
}
173172
// Requeue for later processing.
174-
s.queue.AddRateLimited(placementName)
173+
s.queue.AddRateLimited(placementKey)
175174
return
176175
}
177176
}
178177
// The placement has been marked for deletion but no longer has the scheduler cleanup finalizer; no
179178
// additional handling is needed.
180179

181180
// Untrack the key from the rate limiter.
182-
s.queue.Forget(placementName)
181+
s.queue.Forget(placementKey)
183182
return
184183
}
185184

@@ -188,20 +187,20 @@ func (s *Scheduler) scheduleOnce(ctx context.Context, worker int) {
188187
// Verify that it has an active policy snapshot.
189188
latestPolicySnapshot, err := s.lookupLatestPolicySnapshot(ctx, placement)
190189
if err != nil {
191-
klog.ErrorS(err, "Failed to lookup latest policy snapshot", "placement", placementRef)
190+
klog.ErrorS(err, "Failed to lookup latest policy snapshot", "placement", placementKey)
192191
// No requeue is needed; the scheduler will be triggered again when an active policy
193192
// snapshot is created.
194193

195194
// Untrack the key for quicker reprocessing.
196-
s.queue.Forget(placementName)
195+
s.queue.Forget(placementKey)
197196
return
198197
}
199198

200199
// Add the scheduler cleanup finalizer to the placement (if it does not have one yet).
201200
if err := s.addSchedulerCleanUpFinalizer(ctx, placement); err != nil {
202-
klog.ErrorS(err, "Failed to add scheduler cleanup finalizer", "placement", placementRef)
201+
klog.ErrorS(err, "Failed to add scheduler cleanup finalizer", "placement", placementKey)
203202
// Requeue for later processing.
204-
s.queue.AddRateLimited(placementName)
203+
s.queue.AddRateLimited(placementKey)
205204
return
206205
}
207206

@@ -216,26 +215,26 @@ func (s *Scheduler) scheduleOnce(ctx context.Context, worker int) {
216215
// The placement is in an unexpected state; this is a scheduler-side error, and
217216
// Note that this is a scheduler-side error, so it does not return an error to the caller.
218217
// Raise an alert for it.
219-
klog.ErrorS(err, "Placement is in an unexpected state", "placement", placementRef)
218+
klog.ErrorS(err, "Placement is in an unexpected state", "placement", placementKey)
220219
observeSchedulingCycleMetrics(cycleStartTime, true, false)
221220
return
222221
}
223222
// Requeue for later processing.
224-
klog.ErrorS(err, "Failed to run scheduling cycle", "placement", placementRef)
225-
s.queue.AddRateLimited(placementName)
223+
klog.ErrorS(err, "Failed to run scheduling cycle", "placement", placementKey)
224+
s.queue.AddRateLimited(placementKey)
226225
observeSchedulingCycleMetrics(cycleStartTime, true, true)
227226
return
228227
}
229228

230229
// Requeue if the scheduling cycle suggests so.
231230
if res.Requeue {
232231
if res.RequeueAfter > 0 {
233-
s.queue.AddAfter(placementName, res.RequeueAfter)
232+
s.queue.AddAfter(placementKey, res.RequeueAfter)
234233
observeSchedulingCycleMetrics(cycleStartTime, false, true)
235234
return
236235
}
237236
// Untrack the key from the rate limiter.
238-
s.queue.Forget(placementName)
237+
s.queue.Forget(placementKey)
239238
// Requeue for later processing.
240239
//
241240
// Note that the key is added directly to the queue without having to wait for any rate limiter's
@@ -244,11 +243,11 @@ func (s *Scheduler) scheduleOnce(ctx context.Context, worker int) {
244243
// one cycle (e.g., a plugin sets up a per-cycle batch limit, and consequently the scheduler must
245244
// finish the scheduling in multiple cycles); in such cases, rate limiter should not add
246245
// any delay to the requeues.
247-
s.queue.Add(placementName)
246+
s.queue.Add(placementKey)
248247
observeSchedulingCycleMetrics(cycleStartTime, false, true)
249248
} else {
250249
// no more failure, the following queue don't need to be rate limited
251-
s.queue.Forget(placementName)
250+
s.queue.Forget(placementKey)
252251
observeSchedulingCycleMetrics(cycleStartTime, false, false)
253252
}
254253
}

pkg/scheduler/watchers/clusterresourceplacement/watcher.go

Lines changed: 37 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ package clusterresourceplacement
2121
import (
2222
"context"
2323
"fmt"
24-
"time"
2524

25+
"k8s.io/apimachinery/pkg/types"
26+
"k8s.io/client-go/util/workqueue"
2627
"k8s.io/klog/v2"
2728
ctrl "sigs.k8s.io/controller-runtime"
2829
"sigs.k8s.io/controller-runtime/pkg/client"
29-
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3030
"sigs.k8s.io/controller-runtime/pkg/event"
31-
"sigs.k8s.io/controller-runtime/pkg/predicate"
31+
"sigs.k8s.io/controller-runtime/pkg/handler"
32+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3233

3334
fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
3435
"github.com/kubefleet-dev/kubefleet/pkg/scheduler/queue"
@@ -45,65 +46,47 @@ type Reconciler struct {
4546

4647
// Reconcile reconciles the CRP.
4748
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
48-
crpRef := klog.KRef("", req.Name)
49-
startTime := time.Now()
50-
klog.V(2).InfoS("Scheduler source reconciliation starts", "clusterResourcePlacement", crpRef)
51-
defer func() {
52-
latency := time.Since(startTime).Milliseconds()
53-
klog.V(2).InfoS("Scheduler source reconciliation ends", "clusterResourcePlacement", crpRef, "latency", latency)
54-
}()
55-
56-
// Retrieve the CRP.
57-
crp := &fleetv1beta1.ClusterResourcePlacement{}
58-
if err := r.Client.Get(ctx, req.NamespacedName, crp); err != nil {
59-
klog.ErrorS(err, "Failed to get cluster resource placement", "clusterResourcePlacement", crpRef)
60-
return ctrl.Result{}, controller.NewAPIServerError(true, client.IgnoreNotFound(err))
61-
}
62-
63-
// Check if the CRP has been deleted and has the scheduler finalizer.
64-
if crp.DeletionTimestamp != nil && controllerutil.ContainsFinalizer(crp, fleetv1beta1.SchedulerCleanupFinalizer) {
65-
// The CRP has been deleted and still has the scheduler finalizer;
66-
// enqueue it for the scheduler to process.
67-
r.SchedulerWorkQueue.AddRateLimited(queue.PlacementKey(crp.Name))
68-
}
49+
r.SchedulerWorkQueue.AddRateLimited(controller.GetPlacementKeyFromRequest(req))
6950

7051
// No action is needed for the scheduler to take in other cases.
7152
return ctrl.Result{}, nil
7253
}
7354

7455
// SetupWithManager sets up the controller with the manager.
7556
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
76-
customPredicate := predicate.Funcs{
77-
CreateFunc: func(e event.CreateEvent) bool {
78-
// Ignore creation events.
79-
return false
80-
},
81-
DeleteFunc: func(e event.DeleteEvent) bool {
82-
// Ignore deletion events (events emitted when the object is actually removed
83-
// from storage).
84-
return false
85-
},
86-
UpdateFunc: func(e event.UpdateEvent) bool {
87-
// Check if the update event is valid.
88-
if e.ObjectOld == nil || e.ObjectNew == nil {
89-
err := controller.NewUnexpectedBehaviorError(fmt.Errorf("update event is invalid"))
90-
klog.ErrorS(err, "Failed to process update event")
91-
return false
92-
}
93-
94-
// Check if the deletion timestamp has been set.
95-
oldDeletionTimestamp := e.ObjectOld.GetDeletionTimestamp()
96-
newDeletionTimestamp := e.ObjectNew.GetDeletionTimestamp()
97-
if oldDeletionTimestamp == nil && newDeletionTimestamp != nil {
98-
return true
99-
}
100-
101-
return false
102-
},
57+
// Create a new controller for the CRP watcher.
58+
// This controller watches for deletions of ClusterResourcePlacement and ResourcePlacement objects.
59+
updateFunc := func(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
60+
klog.V(2).InfoS("Handling a placement update event", "placement", klog.KObj(e.ObjectNew))
61+
// Check if the update event is valid.
62+
if e.ObjectOld == nil || e.ObjectNew == nil {
63+
err := controller.NewUnexpectedBehaviorError(fmt.Errorf("update event is invalid"))
64+
klog.ErrorS(err, "Failed to process update event")
65+
}
66+
67+
// Check if the deletion timestamp has been set.
68+
oldDeletionTimestamp := e.ObjectOld.GetDeletionTimestamp()
69+
newDeletionTimestamp := e.ObjectNew.GetDeletionTimestamp()
70+
if oldDeletionTimestamp == nil && newDeletionTimestamp != nil {
71+
q.Add(reconcile.Request{
72+
NamespacedName: types.NamespacedName{Name: e.ObjectNew.GetName(), Namespace: e.ObjectNew.GetNamespace()},
73+
})
74+
klog.V(2).InfoS("Enqueued placement for scheduler processing", "placement", klog.KObj(e.ObjectNew))
75+
}
10376
}
10477

105-
return ctrl.NewControllerManagedBy(mgr).Named("clusterresourceplacement-scheduler-watcher").
106-
For(&fleetv1beta1.ClusterResourcePlacement{}).
107-
WithEventFilter(customPredicate).
78+
deleteFunc := func(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
79+
klog.V(2).InfoS("Handling a placement delete event", "placement", klog.KObj(e.Object))
80+
q.Add(reconcile.Request{
81+
NamespacedName: types.NamespacedName{Name: e.Object.GetName(), Namespace: e.Object.GetNamespace()},
82+
})
83+
}
84+
handler := handler.Funcs{
85+
DeleteFunc: deleteFunc,
86+
UpdateFunc: updateFunc,
87+
}
88+
return ctrl.NewControllerManagedBy(mgr).Named("placement-scheduler-watcher").
89+
Watches(&fleetv1beta1.ClusterResourcePlacement{}, handler).
90+
Watches(&fleetv1beta1.ResourcePlacement{}, handler).
10891
Complete(r)
10992
}

pkg/utils/controller/placement_resolver.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"strings"
2323

2424
"k8s.io/apimachinery/pkg/types"
25+
ctrl "sigs.k8s.io/controller-runtime"
2526
"sigs.k8s.io/controller-runtime/pkg/client"
2627

2728
fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
@@ -76,6 +77,17 @@ func GetPlacementKeyFromObj(obj fleetv1beta1.PlacementObj) queue.PlacementKey {
7677
}
7778
}
7879

80+
// GetPlacementKeyFromObj generates a PlacementKey from a placement object.
81+
func GetPlacementKeyFromRequest(req ctrl.Request) queue.PlacementKey {
82+
if req.Namespace == "" {
83+
// Cluster-scoped placement
84+
return queue.PlacementKey(req.Name)
85+
} else {
86+
// Namespaced placement
87+
return queue.PlacementKey(req.Namespace + namespaceSeparator + req.Name)
88+
}
89+
}
90+
7991
// ExtractNamespaceNameFromKey resolves a PlacementKey to a (namespace, name) tuple of the placement object.
8092
func ExtractNamespaceNameFromKey(placementKey queue.PlacementKey) (string, string, error) {
8193
keyStr := string(placementKey)

0 commit comments

Comments
 (0)