Skip to content

Commit b303f31

Browse files
authored
fix: misuse CreateOrUpdate (#85)
1 parent ac04af7 commit b303f31

File tree

2 files changed

+88
-19
lines changed

2 files changed

+88
-19
lines changed

internal/controller/gpunode_controller.go

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -368,19 +368,59 @@ func (r *GPUNodeReconciler) reconcileNodeDiscoveryJob(
368368
}
369369

370370
func (r *GPUNodeReconciler) reconcileHypervisorPod(ctx context.Context, node *tfv1.GPUNode, pool *tfv1.GPUPool) (string, error) {
371-
if pool == nil {
372-
return "", fmt.Errorf("failed to get tensor-fusion pool, can not create hypervisor pod")
371+
log := log.FromContext(ctx)
372+
373+
log.Info("reconciling hypervisor pod")
374+
375+
if pool.Spec.ComponentConfig == nil || pool.Spec.ComponentConfig.Hypervisor == nil {
376+
return "", fmt.Errorf("missing hypervisor config")
377+
}
378+
379+
key := client.ObjectKey{
380+
Namespace: utils.CurrentNamespace(),
381+
Name: fmt.Sprintf("hypervisor-%s", node.Name),
382+
}
383+
384+
currentPod := &corev1.Pod{}
385+
if err := r.Get(ctx, key, currentPod); err != nil {
386+
if !errors.IsNotFound(err) {
387+
return "", fmt.Errorf("failed to get current hypervisor pod: %w", err)
388+
}
389+
} else {
390+
if !currentPod.DeletionTimestamp.IsZero() {
391+
log.Info("hypervisor pod is being deleted", "name", key.Name)
392+
return key.Name, nil
393+
}
394+
395+
if currentPod.Labels[constants.LabelKeyPodTemplateHash] != utils.GetObjectHash(pool.Spec.ComponentConfig.Hypervisor) {
396+
if err := r.Delete(ctx, currentPod); err != nil {
397+
return "", fmt.Errorf("failed to delete old hypervisor pod: %w", err)
398+
}
399+
log.Info("old hypervisor pod deleted", "name", currentPod.Name)
400+
} else {
401+
return key.Name, nil
402+
}
373403
}
374404

375-
namespace := utils.CurrentNamespace()
405+
// no existing pod or config changed, so create new one
406+
if err := r.createHypervisorPod(ctx, key, node, pool); err != nil {
407+
if errors.IsAlreadyExists(err) {
408+
return "", nil
409+
} else {
410+
return "", fmt.Errorf("failed to create hypervisor pod: %w", err)
411+
}
412+
}
413+
414+
return key.Name, nil
415+
}
416+
417+
func (r *GPUNodeReconciler) createHypervisorPod(ctx context.Context, key client.ObjectKey, node *tfv1.GPUNode, pool *tfv1.GPUPool) error {
376418
log := log.FromContext(ctx)
377-
hypervisorPodName := fmt.Sprintf("hypervisor-%s", node.Name)
378419

379-
hypervisorConfig := pool.Spec.ComponentConfig.Hypervisor
380420
podTmpl := &corev1.PodTemplate{}
381-
err := json.Unmarshal(hypervisorConfig.PodTemplate.Raw, podTmpl)
421+
err := json.Unmarshal(pool.Spec.ComponentConfig.Hypervisor.PodTemplate.Raw, podTmpl)
382422
if err != nil {
383-
return "", fmt.Errorf("failed to unmarshal pod template: %w", err)
423+
return fmt.Errorf("failed to unmarshal pod template: %w", err)
384424
}
385425
spec := podTmpl.Template.Spec.DeepCopy()
386426
if spec.NodeSelector == nil {
@@ -402,10 +442,11 @@ func (r *GPUNodeReconciler) reconcileHypervisorPod(ctx context.Context, node *tf
402442
})
403443
newPod := &corev1.Pod{
404444
ObjectMeta: metav1.ObjectMeta{
405-
Name: hypervisorPodName,
406-
Namespace: namespace,
445+
Name: key.Name,
446+
Namespace: key.Namespace,
407447
Labels: map[string]string{
408448
fmt.Sprintf(constants.GPUNodePoolIdentifierLabelFormat, pool.Name): "true",
449+
constants.LabelKeyPodTemplateHash: utils.GetObjectHash(pool.Spec.ComponentConfig.Hypervisor),
409450
},
410451
},
411452
Spec: *spec,
@@ -419,18 +460,18 @@ func (r *GPUNodeReconciler) reconcileHypervisorPod(ctx context.Context, node *tf
419460
Operator: corev1.TolerationOpExists,
420461
})
421462

422-
e := controllerutil.SetControllerReference(node, newPod, r.Scheme)
423-
if e != nil {
424-
return "", fmt.Errorf("failed to set controller reference: %w", e)
425-
}
426-
_, err = controllerutil.CreateOrUpdate(ctx, r.Client, newPod, func() error {
427-
log.Info("Creating or Updating hypervisor pod", "name", hypervisorPodName)
428-
return nil
429-
})
463+
err = controllerutil.SetControllerReference(node, newPod, r.Scheme)
430464
if err != nil {
431-
return "", fmt.Errorf("failed to create or update hypervisor pod: %w", err)
465+
return fmt.Errorf("failed to set controller reference: %w", err)
466+
}
467+
468+
if err = r.Create(ctx, newPod); err != nil {
469+
return fmt.Errorf("failed to create hypervisor pod: %w", err)
432470
}
433-
return hypervisorPodName, nil
471+
472+
log.Info("hypervisor pod created", "name", key.Name)
473+
474+
return nil
434475
}
435476

436477
func (r *GPUNodeReconciler) reconcileCloudVendorNode(ctx context.Context, node *tfv1.GPUNode, pool *tfv1.GPUPool) error {

internal/controller/gpunode_controller_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ package controller
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"fmt"
23+
"time"
2224

2325
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
2426
"github.com/NexusGPU/tensor-fusion/internal/constants"
2527
"github.com/NexusGPU/tensor-fusion/internal/utils"
2628
. "github.com/onsi/ginkgo/v2"
2729
. "github.com/onsi/gomega"
30+
"github.com/samber/lo"
2831
batchv1 "k8s.io/api/batch/v1"
2932
corev1 "k8s.io/api/core/v1"
3033
"k8s.io/apimachinery/pkg/api/errors"
@@ -117,6 +120,31 @@ var _ = Describe("GPUNode Controller", func() {
117120
Name: fmt.Sprintf("hypervisor-%s", gpunode.Name),
118121
Namespace: utils.CurrentNamespace(),
119122
}, pod)).To(Succeed())
123+
124+
By("Verify the hypervior pod recreated after hypervisor config change")
125+
pool := &tfv1.GPUPool{}
126+
Expect(k8sClient.Get(ctx, types.NamespacedName{
127+
Name: "mock",
128+
Namespace: "default",
129+
}, pool)).To(Succeed())
130+
podTmpl := &corev1.PodTemplate{}
131+
err = json.Unmarshal(pool.Spec.ComponentConfig.Hypervisor.PodTemplate.Raw, podTmpl)
132+
Expect(err).NotTo(HaveOccurred())
133+
podTmpl.Template.Spec.Containers[0].Name = "foo"
134+
pool.Spec.ComponentConfig.Hypervisor.PodTemplate.Raw = lo.Must(json.Marshal(podTmpl))
135+
Expect(k8sClient.Update(ctx, pool)).To(Succeed())
136+
Eventually(func() string {
137+
_, err = controllerReconciler.Reconcile(ctx, reconcile.Request{
138+
NamespacedName: typeNamespacedName,
139+
})
140+
if err = k8sClient.Get(ctx, types.NamespacedName{
141+
Name: fmt.Sprintf("hypervisor-%s", gpunode.Name),
142+
Namespace: utils.CurrentNamespace(),
143+
}, pod); err != nil {
144+
return ""
145+
}
146+
return pod.Spec.Containers[0].Name
147+
}, 5*time.Second, time.Second).Should(Equal("foo"))
120148
})
121149
})
122150
})

0 commit comments

Comments
 (0)