Skip to content

Commit f717d99

Browse files
authored
feat: Add pod template hash and Add logic to detect and replace pods … (#70)
* feat: Add pod template hash and Add logic to detect and replace pods when resource specifications change * fix: worker pod with the infix `tf-worker`
1 parent abc3bc8 commit f717d99

File tree

8 files changed

+211
-21
lines changed

8 files changed

+211
-21
lines changed

api/v1/tensorfusionworkload_types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ type TensorFusionWorkloadStatus struct {
6262
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
6363

6464
WorkerStatuses []WorkerStatus `json:"workerStatuses,omitempty"`
65+
66+
PodTemplateHash string `json:"podTemplateHash,omitempty"`
6567
}
6668

6769
// +kubebuilder:object:root=true

charts/tensor-fusion/crds/tensor-fusion.ai_tensorfusionworkloads.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ spec:
103103
description: TensorFusionWorkloadStatus defines the observed state of
104104
TensorFusionWorkload.
105105
properties:
106+
podTemplateHash:
107+
type: string
106108
readyReplicas:
107109
description: readyReplicas is the number of pods created for this
108110
Workload with a Ready Condition.

config/crd/bases/tensor-fusion.ai_tensorfusionworkloads.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ spec:
103103
description: TensorFusionWorkloadStatus defines the observed state of
104104
TensorFusionWorkload.
105105
properties:
106+
podTemplateHash:
107+
type: string
106108
readyReplicas:
107109
description: readyReplicas is the number of pods created for this
108110
Workload with a Ready Condition.

internal/constants/constants.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ const (
1010
FinalizerSuffix = "finalizer"
1111
Finalizer = Domain + "/" + FinalizerSuffix
1212

13-
LabelKeyOwner = Domain + "/managed-by"
14-
LabelKeyClusterOwner = Domain + "/cluster"
15-
LabelKeyNodeClass = Domain + "/node-class"
13+
LabelKeyOwner = Domain + "/managed-by"
14+
LabelKeyClusterOwner = Domain + "/cluster"
15+
LabelKeyNodeClass = Domain + "/node-class"
16+
LabelKeyPodTemplateHash = Domain + "/pod-template-hash"
1617

1718
GPUNodePoolIdentifierLabelPrefix = Domain + "/pool-"
1819
GPUNodePoolIdentifierLabelFormat = Domain + "/pool-%s"
@@ -80,7 +81,7 @@ const (
8081
const (
8182
// No disrupt label, similar to Karpenter, avoid TFConnection/Worker/GPUNode to be moved to another node or destroying node.
8283
// Refer: https://karpenter.sh/docs/concepts/disruption/
83-
SchedulingDoNotDisruptLabel = "tensor-fusion.ai/do-not-disrupt"
84+
SchedulingDoNotDisruptLabel = Domain + "/do-not-disrupt"
8485
)
8586

8687
const (
@@ -91,7 +92,7 @@ const (
9192

9293
// To match GPUNode with K8S node, when creating from cloud vendor, must set a label from cloud-init userdata
9394
const (
94-
ProvisionerLabelKey = "tensor-fusion.ai/node-provisioner"
95+
ProvisionerLabelKey = Domain + "/node-provisioner"
9596
ProvisionerNamePlaceholder = "__GPU_NODE_RESOURCE_NAME__"
9697
)
9798
const (
@@ -100,4 +101,4 @@ const (
100101

101102
const TFDataPath = "/tmp/tensor-fusion/data"
102103
const DataVolumeName = "tf-data"
103-
const TensorFusionPoolManualCompaction = "tensor-fusion.ai/manual-compaction"
104+
const TensorFusionPoolManualCompaction = Domain + "/manual-compaction"

internal/controller/tensorfusionworkload_controller.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,38 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
106106
// Create worker generator
107107
workerGenerator := &worker.WorkerGenerator{WorkerConfig: pool.Spec.ComponentConfig.Worker}
108108

109+
podTemplateHash, err := workerGenerator.PodTemplateHash(workload.Spec.Resources.Limits)
110+
if err != nil {
111+
return ctrl.Result{}, fmt.Errorf("get pod template hash: %w", err)
112+
}
113+
114+
if workload.Status.PodTemplateHash != podTemplateHash {
115+
workload.Status.PodTemplateHash = podTemplateHash
116+
if err := r.Status().Update(ctx, workload); err != nil {
117+
return ctrl.Result{}, fmt.Errorf("update status: %w", err)
118+
}
119+
}
120+
121+
// Check if there are any Pods using the old podTemplateHash and delete them if any
122+
if len(podList.Items) > 0 {
123+
var outdatedPods []corev1.Pod
124+
for i := range podList.Items {
125+
pod := &podList.Items[i]
126+
if pod.Labels[constants.LabelKeyPodTemplateHash] != podTemplateHash {
127+
outdatedPods = append(outdatedPods, *pod)
128+
}
129+
}
130+
131+
if len(outdatedPods) > 0 {
132+
log.Info("Found outdated pods with different template hash", "count", len(outdatedPods))
133+
if err := r.scaleDownWorkers(ctx, workload, outdatedPods); err != nil {
134+
return ctrl.Result{}, err
135+
}
136+
// After deletion, requeue, and the next reconcile will create a new pod
137+
return ctrl.Result{Requeue: true}, nil
138+
}
139+
}
140+
109141
// Determine the number of replicas
110142
desiredReplicas := int32(1)
111143
if workload.Spec.Replicas != nil {
@@ -162,7 +194,7 @@ func (r *TensorFusionWorkloadReconciler) tryStartWorker(
162194
workload *tfv1.TensorFusionWorkload,
163195
) (*corev1.Pod, error) {
164196
port := workerGenerator.AllocPort()
165-
pod, err := workerGenerator.GenerateWorkerPod(gpu, workload.Name+"-", workload.Namespace, port, workload.Spec.Resources.Limits)
197+
pod, hash, err := workerGenerator.GenerateWorkerPod(gpu, fmt.Sprintf("%s-tf-worker-", workload.Name), workload.Namespace, port, workload.Spec.Resources.Limits)
166198
if err != nil {
167199
return nil, fmt.Errorf("generate worker pod %w", err)
168200
}
@@ -173,6 +205,7 @@ func (r *TensorFusionWorkloadReconciler) tryStartWorker(
173205
}
174206
pod.Labels[constants.WorkloadKey] = workload.Name
175207
pod.Labels[constants.GpuKey] = gpu.Name
208+
pod.Labels[constants.LabelKeyPodTemplateHash] = hash
176209

177210
// Add finalizer for GPU resource cleanup
178211
pod.Finalizers = append(pod.Finalizers, constants.Finalizer)

internal/controller/tensorfusionworkload_controller_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,140 @@ var _ = Describe("TensorFusionWorkload Controller", func() {
288288
})
289289
})
290290

291+
Context("When resource limits change in a workload", func() {
292+
It("Should rebuild all worker pods", func() {
293+
// Create a workload with 2 replicas
294+
workload := &tfv1.TensorFusionWorkload{
295+
ObjectMeta: metav1.ObjectMeta{
296+
Name: resourceName,
297+
Namespace: resourceNamespace,
298+
},
299+
Spec: tfv1.TensorFusionWorkloadSpec{
300+
Replicas: ptr.Int32(2),
301+
PoolName: poolName,
302+
Resources: tfv1.Resources{
303+
Requests: tfv1.Resource{
304+
Tflops: tflopsRequests,
305+
Vram: vramRequests,
306+
},
307+
Limits: tfv1.Resource{
308+
Tflops: tflopsLimits,
309+
Vram: vramLimits,
310+
},
311+
},
312+
},
313+
}
314+
315+
Expect(k8sClient.Create(ctx, workload)).To(Succeed())
316+
317+
// First reconcile to create the initial pods
318+
_, err := reconciler.Reconcile(ctx, reconcile.Request{
319+
NamespacedName: typeNamespacedName,
320+
})
321+
Expect(err).NotTo(HaveOccurred())
322+
323+
// Check that pods are created
324+
podList := &corev1.PodList{}
325+
Eventually(func() int {
326+
err := k8sClient.List(ctx, podList,
327+
client.InNamespace(resourceNamespace),
328+
client.MatchingLabels{constants.WorkloadKey: resourceName})
329+
if err != nil {
330+
return 0
331+
}
332+
return len(podList.Items)
333+
}, 5*time.Second, 100*time.Millisecond).Should(Equal(2))
334+
335+
// Store the original pod template hash
336+
var originalPodNames []string
337+
var originalPodTemplateHash string
338+
for _, pod := range podList.Items {
339+
originalPodNames = append(originalPodNames, pod.Name)
340+
originalPodTemplateHash = pod.Labels[constants.LabelKeyPodTemplateHash]
341+
}
342+
Expect(originalPodTemplateHash).NotTo(BeEmpty())
343+
344+
// Update workload with different resource limits
345+
workload = &tfv1.TensorFusionWorkload{}
346+
Expect(k8sClient.Get(ctx, typeNamespacedName, workload)).To(Succeed())
347+
workload.Spec.Resources.Limits.Tflops = resource.MustParse("30") // Increase TFLOPS limit
348+
workload.Spec.Resources.Limits.Vram = resource.MustParse("24Gi") // Increase VRAM limit
349+
Expect(k8sClient.Update(ctx, workload)).To(Succeed())
350+
351+
// Reconcile to handle the resource limits change
352+
_, err = reconciler.Reconcile(ctx, reconcile.Request{
353+
NamespacedName: typeNamespacedName,
354+
})
355+
Expect(err).NotTo(HaveOccurred())
356+
357+
// Reconcile again to handle the Finalizer
358+
_, err = reconciler.Reconcile(ctx, reconcile.Request{
359+
NamespacedName: typeNamespacedName,
360+
})
361+
Expect(err).NotTo(HaveOccurred())
362+
363+
// Verify old pods are deleted due to template hash change
364+
Eventually(func() bool {
365+
podList := &corev1.PodList{}
366+
err := k8sClient.List(ctx, podList,
367+
client.InNamespace(resourceNamespace),
368+
client.MatchingLabels{constants.WorkloadKey: resourceName})
369+
if err != nil || len(podList.Items) != 0 {
370+
return false
371+
}
372+
return true // All pods should be deleted
373+
}, 5*time.Second, 100*time.Millisecond).Should(BeTrue())
374+
375+
// Reconcile again to create new pods
376+
_, err = reconciler.Reconcile(ctx, reconcile.Request{
377+
NamespacedName: typeNamespacedName,
378+
})
379+
Expect(err).NotTo(HaveOccurred())
380+
381+
// Verify new pods are created
382+
Eventually(func() int {
383+
err := k8sClient.List(ctx, podList,
384+
client.InNamespace(resourceNamespace),
385+
client.MatchingLabels{constants.WorkloadKey: resourceName})
386+
if err != nil {
387+
return 0
388+
}
389+
return len(podList.Items)
390+
}, 5*time.Second, 100*time.Millisecond).Should(Equal(2))
391+
392+
// Verify new pods have different names and pod template hash
393+
var newPodNames []string
394+
var newPodTemplateHash string
395+
for _, pod := range podList.Items {
396+
newPodNames = append(newPodNames, pod.Name)
397+
newPodTemplateHash = pod.Labels[constants.LabelKeyPodTemplateHash]
398+
}
399+
Expect(newPodTemplateHash).NotTo(BeEmpty())
400+
Expect(newPodTemplateHash).NotTo(Equal(originalPodTemplateHash))
401+
402+
// Verify that pod names have changed
403+
for _, originalName := range originalPodNames {
404+
Expect(newPodNames).NotTo(ContainElement(originalName))
405+
}
406+
407+
// Reconcile again to handle status
408+
_, err = reconciler.Reconcile(ctx, reconcile.Request{
409+
NamespacedName: typeNamespacedName,
410+
})
411+
Expect(err).NotTo(HaveOccurred())
412+
413+
// Verify workload status was updated
414+
Eventually(func() int32 {
415+
workload := &tfv1.TensorFusionWorkload{}
416+
err = k8sClient.Get(ctx, typeNamespacedName, workload)
417+
if err != nil {
418+
return -1
419+
}
420+
return workload.Status.Replicas
421+
}, 5*time.Second, 100*time.Millisecond).Should(Equal(int32(2)))
422+
})
423+
})
424+
291425
Context("When scaling down a workload", func() {
292426
It("Should delete excess worker pods", func() {
293427
// Create a workload with 3 replicas

internal/utils/reconcile.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package utils
22

33
import (
44
"context"
5-
"crypto/sha256"
6-
"encoding/hex"
75
"encoding/json"
86
"errors"
7+
"fmt"
8+
"hash/fnv"
99
"math"
1010
"math/rand/v2"
1111
"os"
@@ -95,15 +95,21 @@ func CurrentNamespace() string {
9595
return namespace
9696
}
9797

98-
func GetObjectHash(obj any) string {
99-
hasher := sha256.New()
100-
jsonBytes, err := json.Marshal(obj)
101-
if err != nil {
102-
panic(err)
98+
// GetObjectHash generates a shorter FNV-1a hash for one or more objects
99+
func GetObjectHash(objs ...any) string {
100+
hasher := fnv.New64a()
101+
102+
for _, obj := range objs {
103+
jsonBytes, err := json.Marshal(obj)
104+
if err != nil {
105+
panic(err)
106+
}
107+
// Add length prefix to prevent collisions when combining multiple objects
108+
hasher.Write(fmt.Appendf(nil, "%d:", len(jsonBytes)))
109+
hasher.Write(jsonBytes)
103110
}
104-
str := string(jsonBytes)
105-
hasher.Write([]byte(str))
106-
return hex.EncodeToString(hasher.Sum(nil))
111+
112+
return fmt.Sprintf("%x", hasher.Sum(nil))
107113
}
108114

109115
const DebounceKeySuffix = ":in_queue"

internal/worker/worker.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
1111
"github.com/NexusGPU/tensor-fusion/internal/constants"
12+
"github.com/NexusGPU/tensor-fusion/internal/utils"
1213
"github.com/samber/lo"
1314
"golang.org/x/exp/rand"
1415
corev1 "k8s.io/api/core/v1"
@@ -42,18 +43,28 @@ func (wg *WorkerGenerator) AllocPort() int {
4243
return rand.Intn(max-min+1) + min
4344
}
4445

46+
func (wg *WorkerGenerator) PodTemplateHash(limits tfv1.Resource) (string, error) {
47+
podTmpl := &corev1.PodTemplate{}
48+
err := json.Unmarshal(wg.WorkerConfig.PodTemplate.Raw, podTmpl)
49+
if err != nil {
50+
return "", fmt.Errorf("failed to unmarshal pod template: %w", err)
51+
}
52+
return utils.GetObjectHash(podTmpl, limits), nil
53+
}
54+
4555
func (wg *WorkerGenerator) GenerateWorkerPod(
4656
gpu *tfv1.GPU,
4757
generateName string,
4858
namespace string,
4959
port int,
5060
limits tfv1.Resource,
51-
) (*corev1.Pod, error) {
61+
) (*corev1.Pod, string, error) {
5262
podTmpl := &corev1.PodTemplate{}
5363
err := json.Unmarshal(wg.WorkerConfig.PodTemplate.Raw, podTmpl)
5464
if err != nil {
55-
return nil, fmt.Errorf("failed to unmarshal pod template: %w", err)
65+
return nil, "", fmt.Errorf("failed to unmarshal pod template: %w", err)
5666
}
67+
podTemplateHash := utils.GetObjectHash(podTmpl, limits)
5768
spec := podTmpl.Template.Spec
5869
if spec.NodeSelector == nil {
5970
spec.NodeSelector = make(map[string]string)
@@ -95,14 +106,13 @@ func (wg *WorkerGenerator) GenerateWorkerPod(
95106
},
96107
},
97108
})
98-
99109
return &corev1.Pod{
100110
ObjectMeta: metav1.ObjectMeta{
101111
GenerateName: generateName,
102112
Namespace: namespace,
103113
},
104114
Spec: spec,
105-
}, nil
115+
}, podTemplateHash, nil
106116
}
107117

108118
func SelectWorker(

0 commit comments

Comments
 (0)