Skip to content

Commit 6d484d8

Browse files
authored
fix: ensuring idempotency for the handlePodGPUCleanup function (#178)
* fix: ensuring idempotency for the handlePodGPUCleanup function * fix lint
1 parent 75253bd commit 6d484d8

File tree

5 files changed

+29
-19
lines changed

5 files changed

+29
-19
lines changed

internal/constants/constants.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ const (
1717
LabelKeyClusterOwner = Domain + "/cluster"
1818
LabelKeyNodeClass = Domain + "/node-class"
1919
LabelKeyPodTemplateHash = Domain + "/pod-template-hash"
20-
LabelValueTrue = "true"
20+
TrueStringValue = "true"
2121

2222
GPUNodePoolIdentifierLabelPrefix = Domain + "/pool-"
2323
GPUNodePoolIdentifierLabelFormat = Domain + "/pool-%s"
@@ -41,6 +41,7 @@ const (
4141
InjectContainerAnnotation = Domain + "/inject-container"
4242
ReplicasAnnotation = Domain + "/replicas"
4343
GenWorkloadAnnotation = Domain + "/generate-workload"
44+
GpuReleasedAnnotation = Domain + "/gpu-released"
4445

4546
TensorFusionPodCounterKeyAnnotation = Domain + "/pod-counter-key"
4647
TensorFusionPodCountAnnotation = Domain + "/tf-pod-count"

internal/controller/gpupool_compaction_controller.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ func (r *GPUPoolCompactionReconciler) checkNodeCompaction(ctx context.Context, p
5454
// Strategy #1, terminate empty node
5555
allNodes := &tfv1.GPUNodeList{}
5656
if err := r.List(ctx, allNodes, client.MatchingLabels(map[string]string{
57-
fmt.Sprintf(constants.GPUNodePoolIdentifierLabelFormat, pool.Name): constants.LabelValueTrue,
57+
fmt.Sprintf(constants.GPUNodePoolIdentifierLabelFormat, pool.Name): constants.TrueStringValue,
5858
})); err != nil {
5959
return fmt.Errorf("failed to list nodes : %w", err)
6060
}
6161
for _, gpuNode := range allNodes.Items {
6262
// Skip a node that is labeled as NoDisrupt
63-
if gpuNode.Labels[constants.SchedulingDoNotDisruptLabel] == constants.LabelValueTrue {
63+
if gpuNode.Labels[constants.SchedulingDoNotDisruptLabel] == constants.TrueStringValue {
6464
continue
6565
}
6666

@@ -116,7 +116,7 @@ func (r *GPUPoolCompactionReconciler) checkNodeCompaction(ctx context.Context, p
116116
ObjectMeta: metav1.ObjectMeta{
117117
Name: gpuNode.Status.KubernetesNodeName,
118118
Labels: map[string]string{
119-
constants.NodeDeletionMark: constants.LabelValueTrue,
119+
constants.NodeDeletionMark: constants.TrueStringValue,
120120
},
121121
},
122122
}, client.Merge)

internal/controller/node_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
6868
}
6969

7070
// Remove deletion mark if updated
71-
if node.GetLabels()[constants.NodeDeletionMark] == "true" {
71+
if node.GetLabels()[constants.NodeDeletionMark] == constants.TrueStringValue {
7272
log.Info("Node should be removed due to GPUNode compaction, but it's not managed by TensorFusion, skip.", "name", node.Name)
7373
}
7474

internal/controller/pod_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
6262
// generate tensor fusion connections and apply to cluster
6363
tfConnection := generateTensorFusionConnection(pod)
6464
if tfConnection == nil {
65-
// not a tf pod skipped
65+
// not a tf client pod skipped
6666
return ctrl.Result{}, nil
6767
}
6868

internal/controller/tensorfusionworkload_controller.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ import (
3030
"sigs.k8s.io/controller-runtime/pkg/client"
3131
"sigs.k8s.io/controller-runtime/pkg/log"
3232

33-
"slices"
34-
3533
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
3634
"github.com/NexusGPU/tensor-fusion/internal/config"
3735
"github.com/NexusGPU/tensor-fusion/internal/constants"
@@ -100,7 +98,6 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
10098
if shouldReturn {
10199
return ctrl.Result{}, nil
102100
}
103-
104101
// Handle pods with finalizers that need GPU resource cleanup
105102
hasDeletion := false
106103
// Process pods with our finalizer
@@ -279,19 +276,24 @@ func (r *TensorFusionWorkloadReconciler) scaleDownWorkers(ctx context.Context, w
279276
func (r *TensorFusionWorkloadReconciler) handlePodGPUCleanup(ctx context.Context, pod *corev1.Pod, workload *tfv1.TensorFusionWorkload) (bool, error) {
280277
log := log.FromContext(ctx)
281278

282-
// Check if this is our finalizer
283-
if !containsFinalizer(pod, constants.Finalizer) {
284-
// Not our finalizer, skip processing
285-
return true, nil
286-
}
287279
log.Info("Processing pod with GPU resource cleanup finalizer", "pod", pod.Name)
280+
288281
// Get GPU name from pod label
289282
gpuName, ok := pod.Labels[constants.GpuKey]
290283
if !ok {
291284
log.Info("Pod has finalizer but no GPU label", "pod", pod.Name)
292285
return true, nil
293286
}
294287

288+
if pod.Annotations == nil {
289+
pod.Annotations = make(map[string]string)
290+
}
291+
292+
if pod.Annotations[constants.GpuReleasedAnnotation] == constants.TrueStringValue {
293+
log.Info("GPU has been released for this pod", "pod", pod.Name)
294+
return true, nil
295+
}
296+
295297
// Get the GPU
296298
gpu := &tfv1.GPU{}
297299
if err := r.Get(ctx, client.ObjectKey{Name: gpuName}, gpu); err != nil {
@@ -305,6 +307,18 @@ func (r *TensorFusionWorkloadReconciler) handlePodGPUCleanup(ctx context.Context
305307
return false, err
306308
}
307309

310+
pod.Annotations[constants.GpuReleasedAnnotation] = constants.TrueStringValue
311+
312+
// Update the annotation of the Pod to mark that GPU cleanup has been successfully processed.
313+
// This is a key part of ensuring idempotency for the handlePodGPUCleanup function.
314+
// If this function is called again for the same Pod instance (e.g., due to the client cache
315+
// not yet reflecting the finalizer's removal), Then this r.Update pod will fail.
316+
// Will not cause duplicate releases
317+
if err := r.Update(ctx, pod); err != nil {
318+
log.Error(err, "Failed to mark that GPU cleanup of pod", "gpu", gpuName, "pod", pod.Name)
319+
return false, err
320+
}
321+
308322
// Release GPU resources
309323
if err := r.Scheduler.Release(ctx, workload.Spec.Resources.Requests, gpu); err != nil {
310324
log.Error(err, "Failed to release GPU resources, will retry", "gpu", gpuName, "pod", pod.Name)
@@ -315,11 +329,6 @@ func (r *TensorFusionWorkloadReconciler) handlePodGPUCleanup(ctx context.Context
315329
return true, nil
316330
}
317331

318-
// Helper function to check if a pod has a specific finalizer
319-
func containsFinalizer(pod *corev1.Pod, finalizer string) bool {
320-
return slices.Contains(pod.Finalizers, finalizer)
321-
}
322-
323332
// deletePod deletes a pod
324333
func (r *TensorFusionWorkloadReconciler) deletePod(ctx context.Context, pod *corev1.Pod) error {
325334
log := log.FromContext(ctx)

0 commit comments

Comments
 (0)