Skip to content

Commit f4d2279

Browse files
committed
fix: enhance GPU allocation logic and QoS level calculation
- Added nil checks for NodeManagerConfig and NodeSelector in getMatchedPoolName to prevent potential nil pointer dereferences. - Improved GPU allocation methods in GpuAllocator by ensuring thread safety with appropriate locking mechanisms. - Enhanced QoS level calculation to ensure that limits and requests are not zero before determining high QoS, aligning with Kubernetes behavior.
1 parent 250953d commit f4d2279

File tree

3 files changed

+80
-20
lines changed

3 files changed

+80
-20
lines changed

internal/controller/node_controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,9 @@ func (r *NodeReconciler) removeTensorFusionTaint(ctx context.Context, node *core
323323

324324
func getMatchedPoolName(node *corev1.Node, poolList []tfv1.GPUPool) (*tfv1.GPUPool, bool, error) {
325325
for _, pool := range poolList {
326+
if pool.Spec.NodeManagerConfig == nil || pool.Spec.NodeManagerConfig.NodeSelector == nil {
327+
continue
328+
}
326329
matches, err := schedulingcorev1.MatchNodeSelectorTerms(node, pool.Spec.NodeManagerConfig.NodeSelector)
327330
if err != nil {
328331
return nil, false, err

internal/gpuallocator/gpuallocator.go

Lines changed: 74 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package gpuallocator
44
import (
55
"context"
66
"fmt"
7+
"maps"
78
"math"
89
"slices"
910
"sort"
@@ -84,7 +85,7 @@ type GpuAllocator struct {
8485
poolGpuStore map[string]map[string]*tfv1.GPU
8586
nodeWorkerStore map[string]map[types.NamespacedName]struct{}
8687
storeMutex sync.RWMutex
87-
allocateMutex sync.Mutex
88+
allocateMutex sync.Mutex // serializes legacy Alloc() calls (CheckQuotaAndFilter + Bind)
8889
syncInterval time.Duration
8990
cancel context.CancelFunc
9091
ctx context.Context
@@ -153,11 +154,24 @@ func (s *GpuAllocator) GetAllocationInfo() (
153154
nodeWorkerStore map[string]map[types.NamespacedName]struct{},
154155
uniqueAllocation map[string]*tfv1.AllocRequest,
155156
) {
157+
s.storeMutex.RLock()
158+
defer s.storeMutex.RUnlock()
156159
return s.gpuStore, s.nodeWorkerStore, s.uniqueAllocation
157160
}
158161

162+
// GetNodeGpuStore returns a snapshot (shallow copy) of the node-to-GPU map.
163+
// The caller can safely iterate the returned maps without holding any lock.
164+
// The *tfv1.GPU pointers are shared with the allocator's internal state.
159165
func (s *GpuAllocator) GetNodeGpuStore() map[string]map[string]*tfv1.GPU {
160-
return s.nodeGpuStore
166+
s.storeMutex.RLock()
167+
defer s.storeMutex.RUnlock()
168+
result := make(map[string]map[string]*tfv1.GPU, len(s.nodeGpuStore))
169+
for nodeName, gpuMap := range s.nodeGpuStore {
170+
innerCopy := make(map[string]*tfv1.GPU, len(gpuMap))
171+
maps.Copy(innerCopy, gpuMap)
172+
result[nodeName] = innerCopy
173+
}
174+
return result
161175
}
162176

163177
// AllocRequest encapsulates all parameters needed for GPU allocation
@@ -404,13 +418,13 @@ func (s *GpuAllocator) Bind(
404418
return nil, fmt.Errorf("no GPUs provided to bind")
405419
}
406420

421+
s.storeMutex.Lock()
422+
defer s.storeMutex.Unlock()
423+
407424
if _, exists := s.uniqueAllocation[string(req.PodMeta.UID)]; exists {
408425
return nil, fmt.Errorf("pod %s has already allocated GPUs", req.PodMeta.UID)
409426
}
410427

411-
s.storeMutex.Lock()
412-
defer s.storeMutex.Unlock()
413-
414428
// Proceed with GPU allocation
415429
gpuNodeName := ""
416430
for _, selectedGPU := range gpuNames {
@@ -544,6 +558,7 @@ func (s *GpuAllocator) CheckQuotaAndFilter(ctx context.Context, req *tfv1.AllocR
544558
}
545559

546560
if s.maxWorkerPerNode > 0 {
561+
s.storeMutex.RLock()
547562
// First pass: check if any filtering is needed
548563
needsFiltering := false
549564
for _, gpu := range filteredGPUs {
@@ -565,6 +580,7 @@ func (s *GpuAllocator) CheckQuotaAndFilter(ctx context.Context, req *tfv1.AllocR
565580
}
566581
filteredGPUs = finalFilteredGPUs
567582
}
583+
s.storeMutex.RUnlock()
568584
}
569585

570586
return filteredGPUs, filterDetails, nil
@@ -601,6 +617,9 @@ func (s *GpuAllocator) Dealloc(
601617
podUID := string(podMeta.UID)
602618
log := log.FromContext(s.ctx)
603619

620+
s.storeMutex.Lock()
621+
defer s.storeMutex.Unlock()
622+
604623
request, exists := s.uniqueAllocation[podUID]
605624
if !exists || request == nil {
606625
// should not block finalizer
@@ -615,9 +634,6 @@ func (s *GpuAllocator) Dealloc(
615634
return
616635
}
617636

618-
s.storeMutex.Lock()
619-
defer s.storeMutex.Unlock()
620-
621637
nodeName := ""
622638
for _, gpu := range gpus {
623639
// Get the GPU from the store
@@ -677,6 +693,9 @@ func (s *GpuAllocator) Dealloc(
677693
func (s *GpuAllocator) AdjustAllocation(ctx context.Context, adjustRequest tfv1.AdjustRequest, dryRun bool) (tfv1.Resource, tfv1.Resource, error) {
678694

679695
<-s.initializedCh
696+
s.storeMutex.Lock()
697+
defer s.storeMutex.Unlock()
698+
680699
request, exists := s.uniqueAllocation[adjustRequest.PodUID]
681700
if !exists || request == nil {
682701
return tfv1.Resource{}, tfv1.Resource{}, fmt.Errorf("pod %s has not allocated GPUs", adjustRequest.PodUID)
@@ -728,9 +747,6 @@ func (s *GpuAllocator) AdjustAllocation(ctx context.Context, adjustRequest tfv1.
728747

729748
// pre check passed, change GPU request and QuotaStore and markDirty to sync to Kubernetes
730749
if !dryRun {
731-
s.storeMutex.Lock()
732-
defer s.storeMutex.Unlock()
733-
734750
for _, gpuName := range request.GPUNames {
735751
gpuNameNs := types.NamespacedName{Name: gpuName}
736752
gpu := s.gpuStore[gpuNameNs]
@@ -769,6 +785,9 @@ func (s *GpuAllocator) AdjustAllocation(ctx context.Context, adjustRequest tfv1.
769785

770786
func (s *GpuAllocator) ListNonUsingNodes() sets.Set[string] {
771787
<-s.initializedCh
788+
s.storeMutex.RLock()
789+
defer s.storeMutex.RUnlock()
790+
772791
set := sets.New[string]()
773792
for nodeName, podNames := range s.nodeWorkerStore {
774793
// If using by TF, the node can not be used by original scheduler
@@ -782,13 +801,21 @@ func (s *GpuAllocator) ListNonUsingNodes() sets.Set[string] {
782801
}
783802

784803
func (s *GpuAllocator) DeallocByPodIdentifier(ctx context.Context, podIdentifier types.NamespacedName) {
804+
// Read allocation info under RLock to avoid data race on maps
805+
s.storeMutex.RLock()
785806
podUID := s.podNamespaceNsToPodUID[podIdentifier.String()]
786-
if request, exists := s.uniqueAllocation[podUID]; exists {
807+
request, exists := s.uniqueAllocation[podUID]
808+
s.storeMutex.RUnlock()
809+
810+
if exists && request != nil {
787811
s.Dealloc(request.WorkloadNameNamespace, request.GPUNames, request.PodMeta)
788812
}
789813
}
790814

791815
func (s *GpuAllocator) GetAllocationReqByNodeName(nodeName string) []*tfv1.AllocRequest {
816+
s.storeMutex.RLock()
817+
defer s.storeMutex.RUnlock()
818+
792819
allocRequests := make([]*tfv1.AllocRequest, 0, 8)
793820
workers, exists := s.nodeWorkerStore[nodeName]
794821
if !exists || workers == nil {
@@ -1409,6 +1436,9 @@ func (s *GpuAllocator) CheckQuotaAndFilterSingleNodePreempt(
14091436
) error {
14101437
<-s.initializedCh
14111438

1439+
// Read allocation maps under RLock to avoid data race
1440+
s.storeMutex.RLock()
1441+
14121442
log.FromContext(s.ctx).V(4).Info("[PREEMPT] CheckQuotaAndFilterSingleNodePreempt start",
14131443
"node", nodeName,
14141444
"requiredGPUs", allocReq.Count,
@@ -1497,6 +1527,9 @@ func (s *GpuAllocator) CheckQuotaAndFilterSingleNodePreempt(
14971527
log.FromContext(s.ctx).V(5).Info("Preempting node and check quotas", "nodeName", nodeName, "toPreemptUsage", toPreemptUsage)
14981528
}
14991529

1530+
// Release RLock before calling FilterWithPreempt which acquires its own locks
1531+
s.storeMutex.RUnlock()
1532+
15001533
if err := s.quotaStore.CheckTotalQuotaRelaxed(allocReq, toPreemptUsage); err != nil {
15011534
return fmt.Errorf("quota check failed during preempt: %w", err)
15021535
}
@@ -1553,7 +1586,11 @@ func (s *GpuAllocator) reconcileAllocationState() {
15531586
deletedAndDeAllocated := !worker.DeletionTimestamp.IsZero() &&
15541587
!controllerutil.ContainsFinalizer(&worker, constants.Finalizer)
15551588

1556-
if scheduled {
1589+
// Only register active pods in uniqueAllocation.
1590+
// Pods that are deletedAndDeAllocated must NOT be registered: their resources
1591+
// are not counted in Available, so a late Dealloc would add resources back
1592+
// that were never subtracted, causing Available to exceed correct state.
1593+
if scheduled && !deletedAndDeAllocated {
15571594
allocRequest, msg, err := s.ComposeAllocationRequest(&worker)
15581595
if err != nil {
15591596
logger.Error(err, "Failed to compose allocation request for existing worker Pod, annotation may not be valid", "pod", worker.Name, "msg", msg)
@@ -1642,21 +1679,40 @@ func (s *GpuAllocator) startWorkerCleanUpChecker() {
16421679
for {
16431680
select {
16441681
case <-ticker.C:
1645-
cleaned := 0
1682+
// Collect candidates under RLock to avoid data race on map iteration
1683+
type deallocCandidate struct {
1684+
workloadNameNs tfv1.NameNamespace
1685+
gpuNames []string
1686+
podMeta metav1.ObjectMeta
1687+
}
1688+
var candidates []deallocCandidate
1689+
1690+
s.storeMutex.RLock()
16461691
for _, allocRequest := range s.uniqueAllocation {
16471692
if allocRequest.PodMeta.Name == "" {
16481693
continue
16491694
}
1695+
candidates = append(candidates, deallocCandidate{
1696+
workloadNameNs: allocRequest.WorkloadNameNamespace,
1697+
gpuNames: allocRequest.GPUNames,
1698+
podMeta: allocRequest.PodMeta,
1699+
})
1700+
}
1701+
totalWorkers := len(s.uniqueAllocation)
1702+
s.storeMutex.RUnlock()
1703+
1704+
cleaned := 0
1705+
for _, c := range candidates {
16501706
pod := &v1.Pod{}
1651-
err := s.Get(s.ctx, types.NamespacedName{Namespace: allocRequest.PodMeta.Namespace, Name: allocRequest.PodMeta.Name}, pod)
1707+
err := s.Get(s.ctx, types.NamespacedName{Namespace: c.podMeta.Namespace, Name: c.podMeta.Name}, pod)
16521708
if errors.IsNotFound(err) {
1653-
log.FromContext(s.ctx).Info("Pod has been deleted, deallocate GPU", "pod", allocRequest.PodMeta.Name, "namespace", allocRequest.PodMeta.Namespace)
1654-
s.Dealloc(allocRequest.WorkloadNameNamespace, allocRequest.GPUNames, allocRequest.PodMeta)
1709+
log.FromContext(s.ctx).Info("Pod has been deleted, deallocate GPU", "pod", c.podMeta.Name, "namespace", c.podMeta.Namespace)
1710+
s.Dealloc(c.workloadNameNs, c.gpuNames, c.podMeta)
16551711
cleaned++
16561712
}
16571713
}
16581714
log.FromContext(s.ctx).Info("GPU allocation cleaned up check completed", "total workers",
1659-
len(s.uniqueAllocation), "backup cleaner cleaned", cleaned)
1715+
totalWorkers, "backup cleaner cleaned", cleaned)
16601716
case <-s.ctx.Done():
16611717
return
16621718
}

internal/webhook/v1/pod_webhook.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -740,9 +740,10 @@ func calculateQoSLevel(profile *tfv1.WorkloadProfileSpec, pool *tfv1.GPUPool) tf
740740
// when not set, assign default QoS
741741
if profile.Qos == "" {
742742
sameReqLimits := profile.Resources.Limits.Tflops.Cmp(profile.Resources.Requests.Tflops) == 0 &&
743-
profile.Resources.Limits.Vram.Cmp(profile.Resources.Requests.Vram) == 0
743+
profile.Resources.Limits.Vram.Cmp(profile.Resources.Requests.Vram) == 0 &&
744+
!profile.Resources.Requests.Tflops.IsZero() && !profile.Resources.Requests.Vram.IsZero()
744745

745-
// set to high if req == limits, same logic as Kubernetes QoS
746+
// set to high if req == limits and both are not zero, same logic as Kubernetes QoS
746747
// critical QoS can preempt other pods, have to be set manually
747748
if sameReqLimits {
748749
return constants.QoSLevelHigh

0 commit comments

Comments
 (0)