Skip to content

Commit 341b4f9

Browse files
authored
fix: finalizer handling and improve return value semantics across con… (#148)
* fix: finalizer handling and improve return value semantics across controllers * fix lint * chore: add Eventually block to handle async cleanup in workload test
1 parent 38f5a7a commit 341b4f9

13 files changed

+123
-82
lines changed

.vscode/settings.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
"cycjimmy",
2222
"dylib",
2323
"essd",
24+
"envtest",
2425
"Eventf",
2526
"finalizer",
2627
"Finalizers",
2728
"goconst",
2829
"golint",
30+
"Gomega",
2931
"gopsutil",
3032
"gosec",
3133
"gpunode",

internal/controller/gpunode_controller.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (r *GPUNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
7171
return ctrl.Result{}, err
7272
}
7373

74-
deleted, err := utils.HandleFinalizer(ctx, node, r.Client, func(ctx context.Context, node *tfv1.GPUNode) (bool, error) {
74+
shouldReturn, err := utils.HandleFinalizer(ctx, node, r.Client, func(ctx context.Context, node *tfv1.GPUNode) (bool, error) {
7575
if node.Status.Phase != tfv1.TensorFusionGPUNodePhaseDestroying {
7676
node.Status.Phase = tfv1.TensorFusionGPUNodePhaseDestroying
7777
if err := r.Status().Update(ctx, node); err != nil {
@@ -121,8 +121,8 @@ func (r *GPUNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
121121
if err != nil {
122122
return ctrl.Result{}, err
123123
}
124-
if deleted {
125-
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
124+
if shouldReturn {
125+
return ctrl.Result{}, nil
126126
}
127127

128128
var poolName string

internal/controller/gpupool_controller.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (r *GPUPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
7676
}
7777

7878
// TODO: if phase is destroying, stop all existing workers and hypervisors, stop time series flow aggregations
79-
deleted, err := utils.HandleFinalizer(ctx, pool, r.Client, func(ctx context.Context, pool *tfv1.GPUPool) (bool, error) {
79+
shouldReturn, err := utils.HandleFinalizer(ctx, pool, r.Client, func(ctx context.Context, pool *tfv1.GPUPool) (bool, error) {
8080
log.Info("TensorFusionGPUPool is being deleted", "name", pool.Name)
8181
if pool.Status.Phase != tfv1.TensorFusionPoolPhaseDestroying {
8282
pool.Status.Phase = tfv1.TensorFusionPoolPhaseDestroying
@@ -94,8 +94,10 @@ func (r *GPUPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
9494
if err != nil {
9595
return ctrl.Result{}, err
9696
}
97-
if deleted {
98-
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
97+
if shouldReturn {
98+
// requeue for next loop
99+
// we need manually requeue cause GenerationChangedPredicate
100+
return ctrl.Result{Requeue: true}, nil
99101
}
100102

101103
if err := r.reconcilePoolCurrentCapacityAndReadiness(ctx, pool); err != nil {

internal/controller/pod_controller.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,16 @@ type PodReconciler struct {
5252
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
5353
log := log.FromContext(ctx)
5454
pod := &corev1.Pod{}
55+
if err := r.Get(ctx, req.NamespacedName, pod); err != nil {
56+
if errors.IsNotFound(err) {
57+
return ctrl.Result{}, nil
58+
}
59+
log.Error(err, "Failed to get Pod")
60+
return ctrl.Result{}, err
61+
}
5562

5663
if _, ok := pod.Annotations[constants.TensorFusionEnabledReplicasAnnotation]; ok {
57-
deleted, err := utils.HandleFinalizer(ctx, pod, r.Client, func(context context.Context, pod *corev1.Pod) (bool, error) {
64+
shouldReturn, err := utils.HandleFinalizer(ctx, pod, r.Client, func(context context.Context, pod *corev1.Pod) (bool, error) {
5865
counter := &v1.TensorFusionPodCounter{Client: r.Client}
5966
if err := counter.Decrease(ctx, pod); err != nil {
6067
return false, err
@@ -64,19 +71,11 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
6471
if err != nil {
6572
return ctrl.Result{}, err
6673
}
67-
if deleted {
74+
if shouldReturn {
6875
return ctrl.Result{}, nil
6976
}
7077
}
7178

72-
if err := r.Get(ctx, req.NamespacedName, pod); err != nil {
73-
if errors.IsNotFound(err) {
74-
return ctrl.Result{}, nil
75-
}
76-
log.Error(err, "Failed to get Pod")
77-
return ctrl.Result{}, err
78-
}
79-
8079
// generate tensor fusion connections and apply to cluster
8180
tfConnection := generateTensorFusionConnection(pod)
8281
existConn := &tfv1.TensorFusionConnection{}

internal/controller/schedulingconfigtemplate_controller_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ var _ = Describe("SchedulingConfigTemplate Controller", func() {
3737

3838
typeNamespacedName := types.NamespacedName{
3939
Name: resourceName,
40-
Namespace: "default", // TODO(user):Modify as needed
40+
Namespace: "default",
4141
}
4242
schedulingconfigtemplate := &tfv1.SchedulingConfigTemplate{}
4343

@@ -75,7 +75,6 @@ var _ = Describe("SchedulingConfigTemplate Controller", func() {
7575
Client: k8sClient,
7676
Scheme: k8sClient.Scheme(),
7777
}
78-
7978
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
8079
NamespacedName: typeNamespacedName,
8180
})

internal/controller/suite_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,6 @@ var _ = BeforeSuite(func() {
103103
err = corev1.AddToScheme(scheme.Scheme)
104104
Expect(err).NotTo(HaveOccurred())
105105

106-
err = tfv1.AddToScheme(scheme.Scheme)
107-
Expect(err).NotTo(HaveOccurred())
108-
109106
// +kubebuilder:scaffold:scheme
110107

111108
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
@@ -122,7 +119,6 @@ var _ = BeforeSuite(func() {
122119
Scheme: scheme.Scheme,
123120
})
124121
Expect(err).ToNot(HaveOccurred())
125-
126122
err = (&TensorFusionClusterReconciler{
127123
Client: mgr.GetClient(),
128124
Scheme: mgr.GetScheme(),
@@ -200,7 +196,7 @@ var _ = BeforeSuite(func() {
200196
Client: mgr.GetClient(),
201197
Scheme: mgr.GetScheme(),
202198
Scheduler: scheduler,
203-
Recorder: mgr.GetEventRecorderFor("tensorfusionworkload"),
199+
Recorder: mgr.GetEventRecorderFor("TensorFusionWorkload"),
204200
GpuInfos: config.MockGpuInfo(),
205201
}).SetupWithManager(mgr)
206202
Expect(err).ToNot(HaveOccurred())
@@ -455,6 +451,15 @@ func (b *TensorFusionEnvBuilder) Build() *TensorFusionEnv {
455451
tfc.Spec.GPUPools = gpuPools
456452
Expect(k8sClient.Create(ctx, tfc)).To(Succeed())
457453

454+
// wait for pools are created
455+
Eventually(func(g Gomega) {
456+
gpuPoolList := &tfv1.GPUPoolList{}
457+
g.Expect(k8sClient.List(ctx, gpuPoolList, client.MatchingLabels(map[string]string{
458+
constants.LabelKeyOwner: tfc.Name,
459+
}))).Should(Succeed())
460+
g.Expect(gpuPoolList.Items).Should(HaveLen(b.poolCount))
461+
}, timeout*1000, interval).Should(Succeed())
462+
458463
// generate nodes
459464
selectors := strings.Split(constants.InitialGPUNodeSelector, "=")
460465
for poolIndex, nodeGpuMap := range b.poolNodeMap {

internal/controller/tensorfusioncluster_controller.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (r *TensorFusionClusterReconciler) Reconcile(ctx context.Context, req ctrl.
9393
}
9494
originalStatus := tfc.Status.DeepCopy()
9595

96-
deleted, err := utils.HandleFinalizer(ctx, tfc, r.Client, func(context context.Context, tfc *tfv1.TensorFusionCluster) (bool, error) {
96+
shouldReturn, err := utils.HandleFinalizer(ctx, tfc, r.Client, func(context context.Context, tfc *tfv1.TensorFusionCluster) (bool, error) {
9797
log.Info("TensorFusionCluster is being deleted", "name", tfc.Name)
9898
if tfc.Status.Phase != tfv1.TensorFusionClusterDestroying {
9999
tfc.Status.Phase = tfv1.TensorFusionClusterDestroying
@@ -110,8 +110,10 @@ func (r *TensorFusionClusterReconciler) Reconcile(ctx context.Context, req ctrl.
110110
if err != nil {
111111
return ctrl.Result{}, err
112112
}
113-
if deleted {
114-
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
113+
if shouldReturn {
114+
// requeue for next loop
115+
// we need manually requeue cause GenerationChangedPredicate
116+
return ctrl.Result{Requeue: true}, nil
115117
}
116118

117119
if tfc.Status.Phase == "" || tfc.Status.Phase == constants.PhaseUnknown {

internal/controller/tensorfusionconnection_controller_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package controller
1919
import (
2020
"context"
2121
"fmt"
22-
"time"
2322

2423
. "github.com/onsi/ginkgo/v2"
2524
. "github.com/onsi/gomega"
@@ -36,7 +35,7 @@ import (
3635
var _ = Describe("TensorFusionConnection Controller", func() {
3736
Context("When reconciling a resource", func() {
3837
const resourceName = "test-resource"
39-
const workloadName = "test-workload"
38+
const workloadName = "test-workload-1"
4039

4140
ctx := context.Background()
4241

@@ -122,8 +121,8 @@ var _ = Describe("TensorFusionConnection Controller", func() {
122121
Expect(k8sClient.Create(ctx, connectionNoLabel)).To(Succeed())
123122
Consistently(func(g Gomega) {
124123
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(connectionNoLabel), connectionNoLabel)).Should(Succeed())
125-
g.Expect(connectionNoLabel.Status.WorkerName).Should(BeEmpty())
126-
}, 5*time.Second, interval).Should(Succeed())
124+
g.Expect(connectionNoLabel.Status.WorkerName).Should(Equal(""))
125+
}, timeout, interval).Should(Succeed())
127126

128127
// Clean up the test connection
129128
Expect(k8sClient.Delete(ctx, connectionNoLabel)).To(Succeed())

internal/controller/tensorfusionworkload_controller.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
7979
return ctrl.Result{}, fmt.Errorf("list pods: %w", err)
8080
}
8181

82-
deleted, err := utils.HandleFinalizer(ctx, workload, r.Client, func(ctx context.Context, _ *tfv1.TensorFusionWorkload) (bool, error) {
82+
shouldReturn, err := utils.HandleFinalizer(ctx, workload, r.Client, func(ctx context.Context, _ *tfv1.TensorFusionWorkload) (bool, error) {
8383
// check if all pods are deleted
8484
return len(podList.Items) == 0, nil
8585
})
8686
if err != nil {
8787
return ctrl.Result{}, fmt.Errorf("handle finalizer: %w", err)
8888
}
89-
if deleted {
89+
if shouldReturn {
9090
return ctrl.Result{}, nil
9191
}
9292

@@ -95,8 +95,9 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
9595
// Process pods with our finalizer
9696
for i := range podList.Items {
9797
pod := &podList.Items[i]
98+
deleted := pod.DeletionTimestamp != nil
9899
// Handle our GPU resource cleanup finalizer
99-
deleted, err := utils.HandleFinalizer(ctx, pod, r.Client, func(ctx context.Context, obj *corev1.Pod) (bool, error) {
100+
_, err := utils.HandleFinalizer(ctx, pod, r.Client, func(ctx context.Context, obj *corev1.Pod) (bool, error) {
100101
return r.handlePodGPUCleanup(ctx, pod, workload)
101102
})
102103

internal/controller/tensorfusionworkload_controller_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ var _ = Describe("TensorFusionWorkload Controller", func() {
192192
g.Expect(k8sClient.List(ctx, podList,
193193
client.InNamespace(workload.Namespace),
194194
client.MatchingLabels{constants.WorkloadKey: workload.Name})).Should(Succeed())
195-
g.Expect(podList.Items).Should(BeNil())
195+
g.Expect(podList.Items).Should(BeEmpty())
196196
}, 5*time.Second, 100*time.Millisecond).Should(Succeed())
197197
})
198198
})
@@ -261,10 +261,14 @@ func createTensorFusionWorkload(poolName string, key client.ObjectKey, replicas
261261
func cleanupWorkload(key client.ObjectKey) {
262262
GinkgoHelper()
263263
workload := &tfv1.TensorFusionWorkload{}
264-
Expect(k8sClient.Get(ctx, key, workload)).Should(Succeed())
265-
workloadCopy := workload.DeepCopy()
266-
workloadCopy.Spec.Replicas = ptr.Int32(0)
267-
Expect(k8sClient.Update(ctx, workloadCopy)).To(Succeed())
264+
265+
// Set replicas to 0
266+
Eventually(func(g Gomega) {
267+
g.Expect(k8sClient.Get(ctx, key, workload)).Should(Succeed())
268+
workload.Spec.Replicas = ptr.Int32(0)
269+
g.Expect(k8sClient.Update(ctx, workload)).To(Succeed())
270+
}, timeout, interval).Should(Succeed())
271+
268272
Eventually(func(g Gomega) {
269273
podList := &corev1.PodList{}
270274
g.Expect(k8sClient.List(ctx, podList,

0 commit comments

Comments
 (0)