Skip to content

Commit c349d65

Browse files
authored
feat: support rp in clusterSchedulingPolicySnapshot controller (#160)
1 parent c8b3a86 commit c349d65

File tree

5 files changed

+201
-35
lines changed

5 files changed

+201
-35
lines changed

cmd/hubagent/workload/setup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
215215
if err := (&clusterschedulingpolicysnapshot.Reconciler{
216216
Client: mgr.GetClient(),
217217
PlacementController: clusterResourcePlacementControllerV1Beta1,
218-
}).SetupWithManager(mgr); err != nil {
218+
}).SetupWithManagerForClusterSchedulingPolicySnapshot(mgr); err != nil {
219219
klog.ErrorS(err, "Unable to set up the clusterSchedulingPolicySnapshot watcher")
220220
return err
221221
}

pkg/controllers/clusterresourceplacement/suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ var _ = BeforeSuite(func() {
136136
err = (&clusterschedulingpolicysnapshot.Reconciler{
137137
Client: mgr.GetClient(),
138138
PlacementController: crpController,
139-
}).SetupWithManager(mgr)
139+
}).SetupWithManagerForClusterSchedulingPolicySnapshot(mgr)
140140
Expect(err).Should(Succeed(), "failed to create clusterSchedulingPolicySnapshot watcher")
141141

142142
err = (&clusterresourceplacementwatcher.Reconciler{

pkg/controllers/clusterschedulingpolicysnapshot/controller.go

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"fmt"
2323
"time"
2424

25-
"k8s.io/apimachinery/pkg/api/errors"
2625
"k8s.io/klog/v2"
2726
ctrl "sigs.k8s.io/controller-runtime"
2827
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -33,48 +32,58 @@ import (
3332
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
3433
)
3534

36-
// Reconciler reconciles a clusterSchedulingPolicySnapshot object.
35+
// Reconciler reconciles a clusterSchedulingPolicySnapshot or schedulingPolicySnapshot object.
3736
type Reconciler struct {
3837
client.Client
3938

4039
// PlacementController exposes the placement queue for the reconciler to push to.
4140
PlacementController controller.Controller
4241
}
4342

44-
// Reconcile triggers a single CRP reconcile round when scheduling policy has changed.
43+
// Reconcile triggers a single placement reconcile round when scheduling policy has changed.
4544
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
46-
name := req.NamespacedName
47-
snapshot := fleetv1beta1.ClusterSchedulingPolicySnapshot{}
48-
snapshotKRef := klog.KRef(name.Namespace, name.Name)
45+
var snapshot fleetv1beta1.PolicySnapshotObj
46+
var err error
47+
snapshotKRef := klog.KRef(req.Namespace, req.Name)
4948

5049
startTime := time.Now()
51-
klog.V(2).InfoS("Reconciliation starts", "clusterSchedulingPolicySnapshot", snapshotKRef)
50+
klog.V(2).InfoS("Reconciliation starts", "policySnapshot", snapshotKRef)
5251
defer func() {
5352
latency := time.Since(startTime).Milliseconds()
54-
klog.V(2).InfoS("Reconciliation ends", "clusterSchedulingPolicySnapshot", snapshotKRef, "latency", latency)
53+
klog.V(2).InfoS("Reconciliation ends", "policySnapshot", snapshotKRef, "latency", latency)
5554
}()
5655

57-
if err := r.Client.Get(ctx, name, &snapshot); err != nil {
58-
if errors.IsNotFound(err) {
59-
klog.V(4).InfoS("Ignoring NotFound clusterSchedulingPolicySnapshot", "clusterSchedulingPolicySnapshot", snapshotKRef)
60-
return ctrl.Result{}, nil
56+
if req.Namespace == "" {
57+
// ClusterSchedulingPolicySnapshot (cluster-scoped)
58+
var clusterSchedulingPolicySnapshot fleetv1beta1.ClusterSchedulingPolicySnapshot
59+
if err = r.Client.Get(ctx, req.NamespacedName, &clusterSchedulingPolicySnapshot); err != nil {
60+
klog.ErrorS(err, "Failed to get cluster scheduling policy snapshot", "policySnapshot", snapshotKRef)
61+
return ctrl.Result{}, controller.NewAPIServerError(true, client.IgnoreNotFound(err))
6162
}
62-
klog.ErrorS(err, "Failed to get clusterSchedulingPolicySnapshot", "clusterSchedulingPolicySnapshot", snapshotKRef)
63-
return ctrl.Result{}, controller.NewAPIServerError(true, err)
63+
snapshot = &clusterSchedulingPolicySnapshot
64+
} else {
65+
// schedulingPolicySnapshot (namespaced)
66+
var schedulingPolicySnapshot fleetv1beta1.SchedulingPolicySnapshot
67+
if err = r.Client.Get(ctx, req.NamespacedName, &schedulingPolicySnapshot); err != nil {
68+
klog.ErrorS(err, "Failed to get scheduling policy snapshot", "policySnapshot", snapshotKRef)
69+
return ctrl.Result{}, controller.NewAPIServerError(true, client.IgnoreNotFound(err))
70+
}
71+
snapshot = &schedulingPolicySnapshot
6472
}
65-
crp := snapshot.Labels[fleetv1beta1.PlacementTrackingLabel]
66-
if len(crp) == 0 {
67-
err := fmt.Errorf("invalid label value %s", fleetv1beta1.PlacementTrackingLabel)
68-
klog.ErrorS(err, "Invalid clusterSchedulingPolicySnapshot", "clusterSchedulingPolicySnapshot", snapshotKRef)
73+
74+
placementName, exist := snapshot.GetLabels()[fleetv1beta1.PlacementTrackingLabel]
75+
if !exist || len(placementName) == 0 {
76+
err := fmt.Errorf("label %s is missing or has empty value", fleetv1beta1.PlacementTrackingLabel)
77+
klog.ErrorS(err, "Failed to enqueue placement due to invalid schedulingPolicySnapshot", "policySnapshot", snapshotKRef)
6978
return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
7079
}
7180

72-
r.PlacementController.Enqueue(crp)
81+
r.PlacementController.Enqueue(controller.GetObjectKeyFromNamespaceName(req.Namespace, placementName))
7382
return ctrl.Result{}, nil
7483
}
7584

76-
// SetupWithManager sets up the controller with the Manager.
77-
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
85+
// SetupWithManagerForClusterSchedulingPolicySnapshot sets up the controller with the Manager for ClusterSchedulingPolicySnapshot.
86+
func (r *Reconciler) SetupWithManagerForClusterSchedulingPolicySnapshot(mgr ctrl.Manager) error {
7887
return ctrl.NewControllerManagedBy(mgr).Named("clusterschedulingpolicysnapshot-watcher").
7988
For(&fleetv1beta1.ClusterSchedulingPolicySnapshot{}).
8089
WithEventFilter(predicate.Funcs{
@@ -87,3 +96,18 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
8796
},
8897
}).Complete(r)
8998
}
99+
100+
// SetupWithManagerForSchedulingPolicySnapshot sets up the controller with the Manager for SchedulingPolicySnapshot.
101+
func (r *Reconciler) SetupWithManagerForSchedulingPolicySnapshot(mgr ctrl.Manager) error {
102+
return ctrl.NewControllerManagedBy(mgr).Named("schedulingpolicysnapshot-watcher").
103+
For(&fleetv1beta1.SchedulingPolicySnapshot{}).
104+
WithEventFilter(predicate.Funcs{
105+
// skipping delete and create events so that RP controller does not need to update the status.
106+
DeleteFunc: func(e event.DeleteEvent) bool {
107+
return false
108+
},
109+
CreateFunc: func(e event.CreateEvent) bool {
110+
return false
111+
},
112+
}).Complete(r)
113+
}

pkg/controllers/clusterschedulingpolicysnapshot/controller_integration_test.go

Lines changed: 137 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,21 @@ import (
2727
"k8s.io/apimachinery/pkg/types"
2828

2929
fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
30+
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
3031
)
3132

3233
const (
3334
testCRPName = "my-crp"
35+
testRPName = "my-rp"
3436
testSnapshotName = "my-snapshot"
37+
testNamespace = "test-namespace"
38+
39+
eventuallyTimeout = time.Second * 10
40+
consistentlyDuration = time.Second * 10
41+
interval = time.Millisecond * 250
3542
)
3643

37-
func policySnapshot() *fleetv1beta1.ClusterSchedulingPolicySnapshot {
44+
func clusterSchedulingPolicySnapshotForTest() *fleetv1beta1.ClusterSchedulingPolicySnapshot {
3845
return &fleetv1beta1.ClusterSchedulingPolicySnapshot{
3946
ObjectMeta: metav1.ObjectMeta{
4047
Name: testSnapshotName,
@@ -50,28 +57,39 @@ func policySnapshot() *fleetv1beta1.ClusterSchedulingPolicySnapshot {
5057
}
5158
}
5259

53-
var _ = Describe("Test clusterSchedulingPolicySnapshot Controller", func() {
54-
const (
55-
eventuallyTimeout = time.Second * 10
56-
consistentlyDuration = time.Second * 10
57-
interval = time.Millisecond * 250
58-
)
60+
func schedulingPolicySnapshotForTest() *fleetv1beta1.SchedulingPolicySnapshot {
61+
return &fleetv1beta1.SchedulingPolicySnapshot{
62+
ObjectMeta: metav1.ObjectMeta{
63+
Name: testSnapshotName,
64+
Namespace: testNamespace,
65+
Labels: map[string]string{
66+
fleetv1beta1.PolicyIndexLabel: "1",
67+
fleetv1beta1.IsLatestSnapshotLabel: "true",
68+
fleetv1beta1.PlacementTrackingLabel: testRPName,
69+
},
70+
},
71+
Spec: fleetv1beta1.SchedulingPolicySnapshotSpec{
72+
PolicyHash: []byte("hash"),
73+
},
74+
}
75+
}
5976

77+
var _ = Describe("Test clusterSchedulingPolicySnapshot Controller", func() {
6078
var (
6179
createdSnapshot = &fleetv1beta1.ClusterSchedulingPolicySnapshot{}
6280
)
6381

6482
BeforeEach(func() {
6583
fakePlacementController.ResetQueue()
6684
By("By creating a new clusterSchedulingPolicySnapshot")
67-
snapshot := policySnapshot()
85+
snapshot := clusterSchedulingPolicySnapshotForTest()
6886
Expect(k8sClient.Create(ctx, snapshot)).Should(Succeed())
6987
})
7088

7189
Context("When creating new clusterSchedulingPolicySnapshot", func() {
7290
AfterEach(func() {
7391
By("By deleting snapshot")
74-
createdSnapshot = policySnapshot()
92+
createdSnapshot = clusterSchedulingPolicySnapshotForTest()
7593
Expect(k8sClient.Delete(ctx, createdSnapshot)).Should(Succeed())
7694

7795
By("By checking snapshot")
@@ -97,7 +115,7 @@ var _ = Describe("Test clusterSchedulingPolicySnapshot Controller", func() {
97115

98116
AfterEach(func() {
99117
By("By deleting snapshot")
100-
createdSnapshot = policySnapshot()
118+
createdSnapshot = clusterSchedulingPolicySnapshotForTest()
101119
Expect(k8sClient.Delete(ctx, createdSnapshot)).Should(Succeed())
102120

103121
By("By checking snapshot")
@@ -147,7 +165,7 @@ var _ = Describe("Test clusterSchedulingPolicySnapshot Controller", func() {
147165

148166
It("Should ignore the event", func() {
149167
By("By deleting snapshot")
150-
createdSnapshot = policySnapshot()
168+
createdSnapshot = clusterSchedulingPolicySnapshotForTest()
151169
Expect(k8sClient.Delete(ctx, createdSnapshot)).Should(Succeed())
152170

153171
By("By checking snapshot")
@@ -162,3 +180,111 @@ var _ = Describe("Test clusterSchedulingPolicySnapshot Controller", func() {
162180
})
163181
})
164182
})
183+
184+
var _ = Describe("Test schedulingPolicySnapshot Controller", func() {
185+
var (
186+
createdSnapshot = &fleetv1beta1.SchedulingPolicySnapshot{}
187+
key = controller.GetObjectKeyFromNamespaceName(testNamespace, testRPName)
188+
)
189+
190+
BeforeEach(func() {
191+
fakePlacementController.ResetQueue()
192+
By("By creating a new schedulingPolicySnapshot")
193+
snapshot := schedulingPolicySnapshotForTest()
194+
Expect(k8sClient.Create(ctx, snapshot)).Should(Succeed())
195+
})
196+
197+
Context("When creating new schedulingPolicySnapshot", func() {
198+
AfterEach(func() {
199+
By("By deleting snapshot")
200+
createdSnapshot = schedulingPolicySnapshotForTest()
201+
Expect(k8sClient.Delete(ctx, createdSnapshot)).Should(Succeed())
202+
203+
By("By checking snapshot")
204+
Eventually(func() bool {
205+
return errors.IsNotFound(k8sClient.Get(ctx, types.NamespacedName{Name: testSnapshotName, Namespace: testNamespace}, createdSnapshot))
206+
}, eventuallyTimeout, interval).Should(BeTrue(), "snapshot should be deleted")
207+
})
208+
209+
It("Should ignore the event", func() {
210+
By("By checking placement controller queue")
211+
Consistently(func() bool {
212+
return fakePlacementController.Key() == ""
213+
}, consistentlyDuration, interval).Should(BeTrue(), "controller should ignore the create event and not enqueue the request into the placementController queue")
214+
215+
})
216+
})
217+
218+
Context("When updating schedulingPolicySnapshot", func() {
219+
BeforeEach(func() {
220+
By("By resetting the placement queue")
221+
fakePlacementController.ResetQueue()
222+
})
223+
224+
AfterEach(func() {
225+
By("By deleting snapshot")
226+
createdSnapshot = schedulingPolicySnapshotForTest()
227+
Expect(k8sClient.Delete(ctx, createdSnapshot)).Should(Succeed())
228+
229+
By("By checking snapshot")
230+
Eventually(func() bool {
231+
return errors.IsNotFound(k8sClient.Get(ctx, types.NamespacedName{Name: testSnapshotName, Namespace: testNamespace}, createdSnapshot))
232+
}, eventuallyTimeout, interval).Should(BeTrue(), "snapshot should be deleted")
233+
})
234+
235+
It("Updating the spec and should enqueue the event", func() {
236+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testSnapshotName, Namespace: testNamespace}, createdSnapshot)).Should(Succeed())
237+
238+
By("By updating the schedulingPolicySnapshot spec")
239+
createdSnapshot.Spec.PolicyHash = []byte("modified-hash")
240+
Expect(k8sClient.Update(ctx, createdSnapshot)).Should(Succeed())
241+
242+
By("By checking placement controller queue")
243+
Eventually(func() bool {
244+
return fakePlacementController.Key() == key
245+
}, eventuallyTimeout, interval).Should(BeTrue(), "placementController should receive the RP key")
246+
})
247+
248+
It("Updating the status and should enqueue the event", func() {
249+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testSnapshotName, Namespace: testNamespace}, createdSnapshot)).Should(Succeed())
250+
251+
By("By updating the schedulingPolicySnapshot status")
252+
newCondition := metav1.Condition{
253+
Type: string(fleetv1beta1.PolicySnapshotScheduled),
254+
Status: metav1.ConditionTrue,
255+
Reason: "scheduled",
256+
ObservedGeneration: createdSnapshot.GetGeneration(),
257+
}
258+
meta.SetStatusCondition(&createdSnapshot.Status.Conditions, newCondition)
259+
Expect(k8sClient.Status().Update(ctx, createdSnapshot)).Should(Succeed())
260+
261+
By("By checking placement controller queue")
262+
Eventually(func() bool {
263+
return fakePlacementController.Key() == key
264+
}, eventuallyTimeout, interval).Should(BeTrue(), "placementController should receive the RP key")
265+
})
266+
})
267+
268+
Context("When deleting schedulingPolicySnapshot", func() {
269+
BeforeEach(func() {
270+
By("By resetting the placement queue")
271+
fakePlacementController.ResetQueue()
272+
})
273+
274+
It("Should ignore the event", func() {
275+
By("By deleting snapshot")
276+
createdSnapshot = schedulingPolicySnapshotForTest()
277+
Expect(k8sClient.Delete(ctx, createdSnapshot)).Should(Succeed())
278+
279+
By("By checking snapshot")
280+
Eventually(func() bool {
281+
return errors.IsNotFound(k8sClient.Get(ctx, types.NamespacedName{Name: testSnapshotName, Namespace: testNamespace}, createdSnapshot))
282+
}, eventuallyTimeout, interval).Should(BeTrue(), "snapshot should be deleted")
283+
284+
By("By checking placement controller queue")
285+
Consistently(func() bool {
286+
return fakePlacementController.Key() == ""
287+
}, consistentlyDuration, interval).Should(BeTrue(), "controller should ignore the delete event and not enqueue the request into the placementController queue")
288+
})
289+
})
290+
})

pkg/controllers/clusterschedulingpolicysnapshot/suite_test.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424

2525
. "github.com/onsi/ginkgo/v2"
2626
. "github.com/onsi/gomega"
27+
corev1 "k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2729
"k8s.io/client-go/kubernetes/scheme"
2830
"k8s.io/client-go/rest"
2931
"k8s.io/klog/v2"
@@ -80,6 +82,14 @@ var _ = BeforeSuite(func() {
8082
Expect(err).Should(Succeed())
8183
Expect(k8sClient).NotTo(BeNil())
8284

85+
By("creating a test namespace")
86+
var ns = corev1.Namespace{
87+
ObjectMeta: metav1.ObjectMeta{
88+
Name: testNamespace,
89+
},
90+
}
91+
Expect(k8sClient.Create(ctx, &ns)).Should(Succeed(), "failed to create namespace")
92+
8393
By("starting the controller manager")
8494
klog.InitFlags(flag.CommandLine)
8595
flag.Parse()
@@ -98,7 +108,13 @@ var _ = BeforeSuite(func() {
98108
err = (&Reconciler{
99109
Client: mgr.GetClient(),
100110
PlacementController: fakePlacementController,
101-
}).SetupWithManager(mgr)
111+
}).SetupWithManagerForClusterSchedulingPolicySnapshot(mgr)
112+
Expect(err).Should(Succeed())
113+
114+
err = (&Reconciler{
115+
Client: mgr.GetClient(),
116+
PlacementController: fakePlacementController,
117+
}).SetupWithManagerForSchedulingPolicySnapshot(mgr)
102118
Expect(err).Should(Succeed())
103119

104120
go func() {

0 commit comments

Comments
 (0)