Skip to content

Commit f25c65d

Browse files
authored
fix: add node hash for gpu k8s node, owner ref for hypervisor, isolate shm (#352)
1 parent c0a3500 commit f25c65d

File tree

7 files changed

+125
-28
lines changed

7 files changed

+125
-28
lines changed

internal/constants/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
LabelKeyClusterOwner = Domain + "/cluster"
3131
LabelKeyNodeClass = Domain + "/node-class"
3232
LabelKeyPodTemplateHash = Domain + "/pod-template-hash"
33+
LabelNodeSelectorHash = Domain + "/node-selector-hash"
3334
LabelComponent = Domain + "/component"
3435
// used by TF connection, for matching the related connections when worker Pod state changed
3536
LabelWorkerName = Domain + "/worker-name"

internal/controller/gpunode_controller.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func (r *GPUNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
140140
return ctrl.Result{}, nil
141141
}
142142

143-
hypervisorName, err := r.reconcileHypervisorPod(ctx, node, poolObj)
143+
hypervisorName, err := r.reconcileHypervisorPod(ctx, node, poolObj, coreNode)
144144
if err != nil {
145145
return ctrl.Result{}, err
146146
}
@@ -319,7 +319,12 @@ func (r *GPUNodeReconciler) reconcileNodeDiscoveryJob(
319319
return nil
320320
}
321321

322-
func (r *GPUNodeReconciler) reconcileHypervisorPod(ctx context.Context, node *tfv1.GPUNode, pool *tfv1.GPUPool) (string, error) {
322+
func (r *GPUNodeReconciler) reconcileHypervisorPod(
323+
ctx context.Context,
324+
node *tfv1.GPUNode,
325+
pool *tfv1.GPUPool,
326+
k8sNode *corev1.Node,
327+
) (string, error) {
323328
log := log.FromContext(ctx)
324329

325330
if pool.Spec.ComponentConfig == nil || pool.Spec.ComponentConfig.Hypervisor == nil {
@@ -361,7 +366,7 @@ func (r *GPUNodeReconciler) reconcileHypervisorPod(ctx context.Context, node *tf
361366
}
362367

363368
log.Info("hypervisor pod not found, creating new one", "node", node.Name)
364-
if err := r.createHypervisorPod(ctx, key, node, pool); err != nil {
369+
if err := r.createHypervisorPod(ctx, key, node, pool, k8sNode); err != nil {
365370
if errors.IsAlreadyExists(err) {
366371
log.Info("hypervisor pod already exists, skip creation", "node", node.Name)
367372
return "", nil
@@ -372,7 +377,13 @@ func (r *GPUNodeReconciler) reconcileHypervisorPod(ctx context.Context, node *tf
372377
return key.Name, nil
373378
}
374379

375-
func (r *GPUNodeReconciler) createHypervisorPod(ctx context.Context, key client.ObjectKey, node *tfv1.GPUNode, pool *tfv1.GPUPool) error {
380+
func (r *GPUNodeReconciler) createHypervisorPod(
381+
ctx context.Context,
382+
key client.ObjectKey,
383+
node *tfv1.GPUNode,
384+
pool *tfv1.GPUPool,
385+
k8sNode *corev1.Node,
386+
) error {
376387
log := log.FromContext(ctx)
377388

378389
podTmpl := &corev1.PodTemplate{}
@@ -447,7 +458,11 @@ func (r *GPUNodeReconciler) createHypervisorPod(ctx context.Context, key client.
447458
})
448459
err = controllerutil.SetControllerReference(node, newPod, r.Scheme)
449460
if err != nil {
450-
return fmt.Errorf("failed to set controller reference: %w", err)
461+
return fmt.Errorf("failed to set controller reference for hypervisor: %w", err)
462+
}
463+
// also set node owned by k8s node to allow Karpenter to delete the node while hypervisor exists
464+
if err := controllerutil.SetOwnerReference(k8sNode, newPod, r.Scheme); err != nil {
465+
return fmt.Errorf("failed to set owner reference for hypervisor: %w", err)
451466
}
452467

453468
// create hypervisor pod

internal/controller/gpupool_controller.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@ import (
3030
"github.com/NexusGPU/tensor-fusion/internal/metrics"
3131
utils "github.com/NexusGPU/tensor-fusion/internal/utils"
3232
"golang.org/x/time/rate"
33+
corev1 "k8s.io/api/core/v1"
3334
"k8s.io/apimachinery/pkg/api/equality"
3435
"k8s.io/apimachinery/pkg/api/errors"
3536
"k8s.io/apimachinery/pkg/api/resource"
3637
"k8s.io/apimachinery/pkg/runtime"
3738
utilerrors "k8s.io/apimachinery/pkg/util/errors"
3839
"k8s.io/client-go/tools/record"
40+
"k8s.io/client-go/util/retry"
3941
"k8s.io/client-go/util/workqueue"
42+
schedulingcorev1 "k8s.io/component-helpers/scheduling/corev1"
4043
ctrl "sigs.k8s.io/controller-runtime"
4144
"sigs.k8s.io/controller-runtime/pkg/client"
4245
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -83,6 +86,9 @@ type GPUPoolReconciler struct {
8386
// and requeue until current time after that, start provisioning loop
8487
var provisioningInitializationMinTime = map[string]time.Time{}
8588

89+
// When GPU nodeSelector changed, trigger all node update
90+
var poolSelectorChangeMap = map[string]string{}
91+
8692
// +kubebuilder:rbac:groups=tensor-fusion.ai,resources=gpupools,verbs=get;list;watch;create;update;patch;delete
8793
// +kubebuilder:rbac:groups=tensor-fusion.ai,resources=gpupools/status,verbs=get;update;patch
8894
// +kubebuilder:rbac:groups=tensor-fusion.ai,resources=gpupools/finalizers,verbs=update
@@ -116,6 +122,10 @@ func (r *GPUPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
116122
return ctrl.Result{}, nil
117123
}
118124

125+
if err := r.reconcilePoolSelectorChange(ctx, pool); err != nil {
126+
return ctrl.Result{}, err
127+
}
128+
119129
if err := r.reconcilePoolCurrentCapacityAndReadiness(ctx, pool); err != nil {
120130
return ctrl.Result{}, err
121131
}
@@ -404,6 +414,59 @@ func (r *GPUPoolReconciler) reconcilePoolComponents(ctx context.Context, pool *t
404414
return ctrlResult, utilerrors.NewAggregate(errs)
405415
}
406416

417+
func (r *GPUPoolReconciler) reconcilePoolSelectorChange(ctx context.Context, pool *tfv1.GPUPool) error {
418+
if pool.Spec.NodeManagerConfig != nil && pool.Spec.NodeManagerConfig.NodeSelector != nil {
419+
hash := utils.GetObjectHash(pool.Spec.NodeManagerConfig.NodeSelector)
420+
if poolSelectorChangeMap[pool.Name] == hash {
421+
return nil
422+
}
423+
424+
// hash has changed, or first reconcile, should check all k8s nodes
425+
nodes := &corev1.NodeList{}
426+
selectors := utils.GetInitialGPUNodeSelector()
427+
if err := r.List(ctx, nodes, client.MatchingLabels{selectors[0]: selectors[1]}); err != nil {
428+
return err
429+
}
430+
for _, node := range nodes.Items {
431+
// skip no label or deleting nodes
432+
if node.Labels == nil || !node.DeletionTimestamp.IsZero() {
433+
continue
434+
}
435+
matches, err := schedulingcorev1.MatchNodeSelectorTerms(&node, pool.Spec.NodeManagerConfig.NodeSelector)
436+
if err != nil {
437+
return err
438+
}
439+
if matches {
440+
if err := UpdateK8SNodeSelectorHash(ctx, r.Client, &node, hash); err != nil {
441+
return err
442+
}
443+
}
444+
}
445+
poolSelectorChangeMap[pool.Name] = hash
446+
return nil
447+
}
448+
return nil
449+
}
450+
451+
func UpdateK8SNodeSelectorHash(ctx context.Context, k8sClient client.Client, node *corev1.Node, hash string) error {
452+
// skip nodes that already injected the hash
453+
if node.Labels[constants.LabelNodeSelectorHash] == hash {
454+
return nil
455+
}
456+
// update label to trigger the GPUNode reconcile
457+
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
458+
latest := &corev1.Node{}
459+
if err := k8sClient.Get(ctx, client.ObjectKey{Name: node.Name}, latest); err != nil {
460+
return err
461+
}
462+
latest.Labels[constants.LabelNodeSelectorHash] = hash
463+
return k8sClient.Update(ctx, latest)
464+
}); err != nil {
465+
return err
466+
}
467+
return nil
468+
}
469+
407470
func (r *GPUPoolReconciler) cleanUpPool(ctx context.Context, pool *tfv1.GPUPool) (bool, error) {
408471
log := log.FromContext(ctx)
409472
log.Info("TensorFusionGPUPool is being deleted", "name", pool.Name)

internal/controller/gpupool_controller_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ var _ = Describe("GPUPool Controller", func() {
4242
pool := tfEnv.GetGPUPool(0)
4343
g.Expect(pool.Status.Phase).Should(Equal(tfv1.TensorFusionPoolPhaseRunning))
4444
}).Should(Succeed())
45+
Eventually(func(g Gomega) {
46+
nodeList := tfEnv.GetGPUNodeList(0)
47+
for _, gpuNode := range nodeList.Items {
48+
node := &corev1.Node{}
49+
g.Expect(k8sClient.Get(ctx, client.ObjectKey{Name: gpuNode.Name}, node)).Should(Succeed())
50+
g.Expect(node.Labels).To(HaveKey(constants.LabelNodeSelectorHash))
51+
}
52+
}).Should(Succeed())
4553
tfEnv.Cleanup()
4654
})
4755
})

internal/controller/node_controller.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package controller
1919
import (
2020
"context"
2121
"fmt"
22-
"os"
23-
"strings"
2422

2523
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
2624
"github.com/NexusGPU/tensor-fusion/internal/constants"
@@ -86,12 +84,15 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
8684
return ctrl.Result{}, err
8785
}
8886
if !matched {
89-
// delete gpunode if no matched pool
90-
if err := r.Delete(ctx, &tfv1.GPUNode{
91-
ObjectMeta: metav1.ObjectMeta{
92-
Name: node.Name,
93-
},
94-
}); err != nil {
87+
existingGPUNode := &tfv1.GPUNode{}
88+
if err := r.Get(ctx, client.ObjectKey{Name: node.Name}, existingGPUNode); err != nil {
89+
if errors.IsNotFound(err) {
90+
return ctrl.Result{}, nil
91+
}
92+
return ctrl.Result{}, fmt.Errorf("can not get gpuNode(%s) : %w", node.Name, err)
93+
}
94+
// delete existing gpunode if no matched pool
95+
if err := r.Delete(ctx, existingGPUNode); err != nil {
9596
// requeue if the gpunode is not generated
9697
if errors.IsNotFound(err) {
9798
return ctrl.Result{}, nil
@@ -121,6 +122,14 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
121122
return ctrl.Result{}, nil
122123
}
123124

125+
// update k8s node hash
126+
hash := utils.GetObjectHash(pool.Spec.NodeManagerConfig.NodeSelector)
127+
if node.Labels[constants.LabelNodeSelectorHash] != hash {
128+
if err := UpdateK8SNodeSelectorHash(ctx, r.Client, node, hash); err != nil {
129+
return ctrl.Result{}, fmt.Errorf("failed to update k8s node hash: %w", err)
130+
}
131+
}
132+
124133
provisioningMode := pool.Spec.NodeManagerConfig.ProvisioningMode
125134
isDirectManagedMode := provisioningMode == tfv1.ProvisioningModeProvisioned
126135
isManagedNode := isDirectManagedMode || provisioningMode == tfv1.ProvisioningModeKarpenter
@@ -199,11 +208,7 @@ func (r *NodeReconciler) generateGPUNode(node *corev1.Node, pool *tfv1.GPUPool,
199208
// SetupWithManager sets up the controller with the Manager.
200209
func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
201210
// must choose an initial label selector to avoid performance impact in large Kubernetes clusters
202-
selector := os.Getenv("INITIAL_GPU_NODE_LABEL_SELECTOR")
203-
if selector == "" {
204-
selector = constants.InitialGPUNodeSelector
205-
}
206-
selectors := strings.Split(selector, "=")
211+
selectors := utils.GetInitialGPUNodeSelector()
207212
p, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
208213
MatchLabels: map[string]string{
209214
selectors[0]: selectors[1],

internal/utils/compose.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,9 @@ func AddTFDefaultClientConfBeforePatch(
229229
pod.Spec.Containers[injectContainerIndex].VolumeMounts = append(
230230
pod.Spec.Containers[injectContainerIndex].VolumeMounts,
231231
v1.VolumeMount{
232-
Name: constants.DataVolumeName,
233-
MountPath: constants.SharedMemDeviceName,
234-
SubPath: constants.SharedMemMountSubPath,
235-
// + constants.TFLibsVolumeMountPath, SubPathExpr: constants.TFDataPathWorkerExpr,
232+
Name: constants.DataVolumeName,
233+
MountPath: constants.TFLibsVolumeMountPath,
234+
SubPathExpr: constants.TFDataPathWorkerExpr,
236235
MountPropagation: ptr.To(v1.MountPropagationHostToContainer),
237236
})
238237

@@ -682,12 +681,9 @@ func AddWorkerConfAfterTemplate(ctx context.Context, spec *v1.PodSpec, workerCon
682681
spec.Containers[0].VolumeMounts = append(
683682
spec.Containers[0].VolumeMounts,
684683
v1.VolumeMount{
685-
Name: constants.DataVolumeName,
686-
MountPath: constants.SharedMemDeviceName,
687-
// TODO not working.
688-
// + constants.TFLibsVolumeMountPath
689-
// SubPathExpr: constants.TFDataPathWorkerExpr,
690-
SubPath: constants.SharedMemMountSubPath,
684+
Name: constants.DataVolumeName,
685+
MountPath: constants.TFLibsVolumeMountPath,
686+
SubPathExpr: constants.TFDataPathWorkerExpr,
691687
MountPropagation: ptr.To(v1.MountPropagationHostToContainer),
692688
})
693689
spec.Containers[0].Env = append(spec.Containers[0].Env, v1.EnvVar{

internal/utils/reconcile.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,15 @@ func IsTensorFusionWorker(pod *corev1.Pod) bool {
214214
return pod.Labels[constants.LabelComponent] == constants.ComponentWorker
215215
}
216216

217+
func GetInitialGPUNodeSelector() []string {
218+
selector := os.Getenv("INITIAL_GPU_NODE_LABEL_SELECTOR")
219+
if selector == "" {
220+
selector = constants.InitialGPUNodeSelector
221+
}
222+
selectors := strings.Split(selector, "=")
223+
return selectors
224+
}
225+
217226
var GPUResourceNames = []corev1.ResourceName{
218227
"nvidia.com/gpu",
219228
"amd.com/gpu",

0 commit comments

Comments
 (0)