Skip to content

Commit 0d77024

Browse files
authored
fix: extract GPU map update logic into separate method and fix webhook domain name, virtual cap calculation (#357)
* fix: virtual tflops/vram not calculated bug * fix: extract GPU map update logic into separate method and fix webhook domain name * fix: nvidia device plugin compatible mode state consistent issue * fix: nvidia device plugin compatible mode issue
1 parent e55e53d commit 0d77024

File tree

10 files changed

+98
-46
lines changed

10 files changed

+98
-46
lines changed

.vscode/launch.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@
6161
"KUBECONFIG": "~/.kube/config-local-studio",
6262
"ENABLE_WEBHOOKS": "false",
6363
"ENABLE_SCHEDULER": "true",
64-
"ENABLE_CR_CONTROLLER": "true"
64+
"ENABLE_CR_CONTROLLER": "true",
65+
"NVIDIA_OPERATOR_PROGRESSIVE_MIGRATION": "true"
6566
},
6667
"args": [
6768
"--metrics-path", "${workspaceFolder}/logs/metrics.log",

charts/tensor-fusion/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type: application
1515
# This is the chart version. This version number should be incremented each time you make changes
1616
# to the chart and its templates, including the app version.
1717
# Versions are expected to follow Semantic Versioning (https://semver.org/)
18-
version: 1.5.7
18+
version: 1.5.8
1919

2020
# This is the version number of the application being deployed. This version number should be
2121
# incremented each time you make changes to the application. Versions are not expected to

charts/tensor-fusion/templates/admission-webhooks/mutating-webhook.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ webhooks:
1111
namespace: {{ include "tensor-fusion.namespace" . }}
1212
path: /mutate-v1-pod
1313
failurePolicy: {{ .Values.controller.admissionWebhooks.failurePolicy }}
14-
name: mpod-v1.kb.io
14+
name: mpod.tensor-fusion.ai
1515
rules:
1616
- apiGroups:
1717
- ""

cmd/main.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -364,9 +364,10 @@ func startCustomResourceController(
364364
}
365365

366366
if err = (&controller.GPUNodeReconciler{
367-
Client: mgr.GetClient(),
368-
Scheme: mgr.GetScheme(),
369-
Recorder: mgr.GetEventRecorderFor("GPUNode"),
367+
Client: mgr.GetClient(),
368+
Scheme: mgr.GetScheme(),
369+
Recorder: mgr.GetEventRecorderFor("GPUNode"),
370+
Allocator: allocator,
370371
}).SetupWithManager(mgr); err != nil {
371372
setupLog.Error(err, "unable to create controller", "controller", "GPUNode")
372373
os.Exit(1)

internal/controller/gpunode_controller.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ import (
4747
// GPUNodeReconciler reconciles a GPUNode object
4848
type GPUNodeReconciler struct {
4949
client.Client
50-
Scheme *runtime.Scheme
51-
Recorder record.EventRecorder
50+
Scheme *runtime.Scheme
51+
Recorder record.EventRecorder
52+
Allocator *gpuallocator.GpuAllocator
5253
}
5354

5455
// +kubebuilder:rbac:groups=tensor-fusion.ai,resources=gpunodes,verbs=get;list;watch;create;update;patch;delete
@@ -158,7 +159,9 @@ func (r *GPUNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
158159
return ctrl.Result{}, err
159160
}
160161

161-
func (r *GPUNodeReconciler) checkStatusAndUpdateVirtualCapacity(ctx context.Context, hypervisorName string, node *tfv1.GPUNode, poolObj *tfv1.GPUPool) error {
162+
func (r *GPUNodeReconciler) checkStatusAndUpdateVirtualCapacity(
163+
ctx context.Context, hypervisorName string, node *tfv1.GPUNode, poolObj *tfv1.GPUPool,
164+
) error {
162165
pod := &corev1.Pod{}
163166
fetchErr := r.Get(ctx, client.ObjectKey{Name: hypervisorName, Namespace: utils.CurrentNamespace()}, pod)
164167
if fetchErr != nil {
@@ -183,7 +186,7 @@ func (r *GPUNodeReconciler) checkStatusAndUpdateVirtualCapacity(ctx context.Cont
183186

184187
return nil
185188
} else {
186-
gpuModels, err := gpuallocator.RefreshGPUNodeCapacity(ctx, r.Client, node, poolObj)
189+
gpuModels, err := gpuallocator.RefreshGPUNodeCapacity(ctx, r.Client, node, poolObj, r.Allocator)
187190
if err != nil {
188191
return err
189192
}

internal/controller/suite_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,10 @@ var _ = BeforeSuite(func() {
180180
Expect(err).ToNot(HaveOccurred())
181181

182182
err = (&GPUNodeReconciler{
183-
Client: mgr.GetClient(),
184-
Scheme: mgr.GetScheme(),
185-
Recorder: mgr.GetEventRecorderFor("GPUNode"),
183+
Client: mgr.GetClient(),
184+
Scheme: mgr.GetScheme(),
185+
Recorder: mgr.GetEventRecorderFor("GPUNode"),
186+
Allocator: allocator,
186187
}).SetupWithManager(mgr)
187188
Expect(err).ToNot(HaveOccurred())
188189

internal/gpuallocator/gpuallocator.go

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -545,12 +545,13 @@ func (s *GpuAllocator) AdjustAllocation(ctx context.Context, adjustRequest tfv1.
545545
}
546546

547547
func (s *GpuAllocator) ListNonUsingNodes() sets.Set[string] {
548+
<-s.initializedCh
548549
set := sets.New[string]()
549-
for nodeName, gpuNames := range s.nodeWorkerStore {
550+
for nodeName, podNames := range s.nodeWorkerStore {
550551
// If using by TF, the node can not be used by original scheduler
551552
// If using by other scheduler, won't record as TF worker, thus the map is empty
552553
// Return non using nodes can ensure original scheduler not conflict with TF
553-
if len(gpuNames) == 0 {
554+
if len(podNames) == 0 {
554555
set.Insert(nodeName)
555556
}
556557
}
@@ -564,6 +565,20 @@ func (s *GpuAllocator) DeallocByPodIdentifier(ctx context.Context, podIdentifier
564565
}
565566
}
566567

568+
func (s *GpuAllocator) GetAllocationReqByNodeName(nodeName string) []*tfv1.AllocRequest {
569+
allocRequests := make([]*tfv1.AllocRequest, 0, 8)
570+
for workerName := range s.nodeWorkerStore[nodeName] {
571+
podUID := s.podNamespaceNsToPodUID[workerName.String()]
572+
if podUID == "" {
573+
continue
574+
}
575+
if request, exists := s.uniqueAllocation[podUID]; exists {
576+
allocRequests = append(allocRequests, request)
577+
}
578+
}
579+
return allocRequests
580+
}
581+
567582
func (s *GpuAllocator) checkGPUCapacityAndQuota(gpu *tfv1.GPU, oldRes, newRes tfv1.Resource) (tfv1.Resource, error) {
568583
if gpu.Status.Available == nil {
569584
return tfv1.Resource{}, fmt.Errorf("GPU available is nil, skip check")
@@ -870,29 +885,7 @@ func (s *GpuAllocator) handleGPUCreate(ctx context.Context, gpu *tfv1.GPU) {
870885
}
871886
s.gpuStore[key] = gpuInMem
872887

873-
if gpuInMem.Status.NodeSelector != nil {
874-
gpuNodeName := gpuInMem.Status.NodeSelector[constants.KubernetesHostNameLabel]
875-
if gpuNodeName != "" {
876-
if _, exists := s.nodeGpuStore[gpuNodeName]; !exists {
877-
s.nodeGpuStore[gpuNodeName] = make(map[string]*tfv1.GPU, 4)
878-
}
879-
s.nodeGpuStore[gpuNodeName][gpuInMem.Name] = gpuInMem
880-
}
881-
}
882-
883-
if gpuInMem.Labels != nil {
884-
pool := gpuInMem.Labels[constants.GpuPoolKey]
885-
if pool != "" {
886-
if _, exists := s.poolGpuStore[pool]; !exists {
887-
s.poolGpuStore[pool] = make(map[string]*tfv1.GPU, 128)
888-
}
889-
s.poolGpuStore[pool][gpuInMem.Name] = gpuInMem
890-
}
891-
}
892-
893-
if gpu.Status.GPUModel != "" {
894-
GPUCapacityMap[gpu.Status.GPUModel] = *gpu.Status.Capacity
895-
}
888+
s.addOrUpdateGPUMaps(gpuInMem)
896889
log.Info("Added GPU to store", "name", key.Name, "phase", gpu.Status.Phase)
897890
}
898891

@@ -942,10 +935,36 @@ func (s *GpuAllocator) handleGPUUpdate(ctx context.Context, gpu *tfv1.GPU) {
942935
log.V(6).Info("Updated GPU in store (new entry)", "name", key.Name, "phase", gpu.Status.Phase)
943936
}
944937

945-
if gpu.Status.GPUModel != "" {
946-
if _, exists := GPUCapacityMap[gpu.Status.GPUModel]; !exists {
947-
GPUCapacityMap[gpu.Status.GPUModel] = *gpu.Status.Capacity
938+
s.addOrUpdateGPUMaps(gpu)
939+
}
940+
941+
func (s *GpuAllocator) addOrUpdateGPUMaps(gpuInMem *tfv1.GPU) {
942+
if gpuInMem.Status.NodeSelector != nil {
943+
gpuNodeName := gpuInMem.Status.NodeSelector[constants.KubernetesHostNameLabel]
944+
if gpuNodeName != "" {
945+
if _, exists := s.nodeGpuStore[gpuNodeName]; !exists {
946+
s.nodeGpuStore[gpuNodeName] = make(map[string]*tfv1.GPU, 4)
947+
}
948+
s.nodeGpuStore[gpuNodeName][gpuInMem.Name] = gpuInMem
949+
if _, exists := s.nodeWorkerStore[gpuNodeName]; !exists {
950+
s.nodeWorkerStore[gpuNodeName] = make(map[types.NamespacedName]struct{}, 4)
951+
}
948952
}
953+
954+
}
955+
956+
if gpuInMem.Labels != nil {
957+
pool := gpuInMem.Labels[constants.GpuPoolKey]
958+
if pool != "" {
959+
if _, exists := s.poolGpuStore[pool]; !exists {
960+
s.poolGpuStore[pool] = make(map[string]*tfv1.GPU, 128)
961+
}
962+
s.poolGpuStore[pool][gpuInMem.Name] = gpuInMem
963+
}
964+
}
965+
966+
if gpuInMem.Status.GPUModel != "" {
967+
GPUCapacityMap[gpuInMem.Status.GPUModel] = *gpuInMem.Status.Capacity
949968
}
950969
}
951970

@@ -1166,7 +1185,7 @@ func (s *GpuAllocator) reconcileAllocationState() {
11661185
// No workers, but node contains GPU, need include into nodeWorkerStore with empty map
11671186
gpuNodeName := gpu.Status.NodeSelector[constants.KubernetesHostNameLabel]
11681187
if _, exists := s.nodeWorkerStore[gpuNodeName]; !exists {
1169-
s.nodeWorkerStore[gpuNodeName] = map[types.NamespacedName]struct{}{}
1188+
s.nodeWorkerStore[gpuNodeName] = make(map[types.NamespacedName]struct{}, 4)
11701189
}
11711190
}
11721191

internal/gpuallocator/gpuallocator_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ var _ = Describe("GPU Allocator", func() {
9797
if err := k8sClient.Get(ctx, types.NamespacedName{Name: "test-pool"}, pool); err != nil {
9898
Expect(err).NotTo(HaveOccurred())
9999
}
100-
_, _ = RefreshGPUNodeCapacity(ctx, k8sClient, gpuNode, pool)
100+
_, _ = RefreshGPUNodeCapacity(ctx, k8sClient, gpuNode, pool, allocator)
101101

102102
// Verify resources were reduced on the allocated GPU
103103
gpu := getGPU(gpus[0].Name)
@@ -107,8 +107,14 @@ var _ = Describe("GPU Allocator", func() {
107107
node := getGPUNode(gpu)
108108
diffTflops := node.Status.TotalTFlops.Value() - node.Status.AvailableTFlops.Value()
109109
diffVRAM := node.Status.TotalVRAM.Value() - node.Status.AvailableVRAM.Value()
110+
111+
diffVirtualTflops := node.Status.VirtualTFlops.Value() - node.Status.VirtualAvailableTFlops.Value()
112+
diffVirtualVRAM := node.Status.VirtualVRAM.Value() - node.Status.VirtualAvailableVRAM.Value()
110113
Expect(diffTflops).To(BeEquivalentTo(50))
111114
Expect(diffVRAM).To(BeEquivalentTo(8 * 1024 * 1024 * 1024))
115+
116+
Expect(diffVirtualTflops).To(BeEquivalentTo(50))
117+
Expect(diffVirtualVRAM).To(BeEquivalentTo(8 * 1024 * 1024 * 1024))
112118
})
113119

114120
It("should allocate multiple GPUs from the same node", func() {

internal/gpuallocator/node_capacity.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ import (
1111
"sigs.k8s.io/controller-runtime/pkg/client"
1212
)
1313

14-
func RefreshGPUNodeCapacity(ctx context.Context, k8sClient client.Client, node *tfv1.GPUNode, pool *tfv1.GPUPool) ([]string, error) {
14+
func RefreshGPUNodeCapacity(
15+
ctx context.Context, k8sClient client.Client,
16+
node *tfv1.GPUNode, pool *tfv1.GPUPool,
17+
allocator *GpuAllocator,
18+
) ([]string, error) {
1519
gpuList := &tfv1.GPUList{}
1620
if err := k8sClient.List(ctx, gpuList, client.MatchingLabels{constants.LabelKeyOwner: node.Name}); err != nil {
1721
return nil, fmt.Errorf("failed to list GPUs: %w", err)
@@ -54,6 +58,17 @@ func RefreshGPUNodeCapacity(ctx context.Context, k8sClient client.Client, node *
5458
node.Status.VirtualTFlops = virtualTFlops
5559
node.Status.VirtualVRAM = virtualVRAM
5660

61+
vramAvailable := virtualVRAM.DeepCopy()
62+
tflopsAvailable := virtualTFlops.DeepCopy()
63+
64+
allocRequests := allocator.GetAllocationReqByNodeName(node.Name)
65+
for _, allocRequest := range allocRequests {
66+
vramAvailable.Sub(allocRequest.Limit.Vram)
67+
tflopsAvailable.Sub(allocRequest.Limit.Tflops)
68+
}
69+
node.Status.VirtualAvailableVRAM = &vramAvailable
70+
node.Status.VirtualAvailableTFlops = &tflopsAvailable
71+
5772
node.Status.Phase = tfv1.TensorFusionGPUNodePhaseRunning
5873

5974
if !equality.Semantic.DeepEqual(node.Status, statusCopy) {

internal/scheduler/gpuresources/gpuresources.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,17 @@ func (s *GPUFit) PreFilter(ctx context.Context, state fwk.CycleState, pod *v1.Po
158158
continue
159159
}
160160

161+
preAllocSize := total - matched
162+
if preAllocSize <= 0 {
163+
s.logger.Error(nil, "Filtering GPU error, unexpected less than 0", "pod",
164+
pod.Name, "node", k, "totalGPU count", total, "matchedGPU count", matched)
165+
preAllocSize = 2
166+
}
161167
// range if it's not in validNodesValidGPUs, add to validNodeNonMatchingGPUs
162-
validNodeNonMatchingGPUs[k] = make([]*tfv1.GPU, 0, total-matched)
168+
validNodeNonMatchingGPUs[k] = make([]*tfv1.GPU, 0, preAllocSize)
163169
for gpuName, gpu := range allGPUs {
164170
seen := false
165-
// just loop because the number always <= 8
171+
// just loop because the number always <= 8/16
166172
for _, matchedGPU := range matchedGPUs {
167173
if gpuName == matchedGPU.Name {
168174
seen = true

0 commit comments

Comments
 (0)