Skip to content

Commit a5b8f3d

Browse files
fix: critical logic bugs across plugins - hard rejection filters, metrics, topology checks (#23)
* fix: critical logic bugs across plugins - hard rejection filters, metrics, topology checks ## Critical Logic Fixes ### ResourceFragmentationScore - Add Filter plugin to hard-reject nodes where GPU or tenant tier requirements aren't met - Fix pristine island label override (label now properly overrides allocation calculation) - Add structured logging (klog.InfoS) to Filter for visibility ### Coscheduling - Fix GangCompletionLatency metric (was always ~0, now uses gang submission timestamp) - Fix pod group label mismatch between calculateTotalPods and calculateRunningPodsExcluding ### NUMATopology - Add thread-safety to gangState map with RWMutex - Fix NUMA fit score formula (inverted - penalized empty nodes, now prefers remaining capacity) ### ResourceReservation - Fix isGangComplete to skip terminating pods and check PodRunning phase ### VRAMScheduler - Add 5-second timeout to DRA ResourceSlice queries (prevents scheduler stalls) - Fix GPU default logic (only defaults to 1 GPU if VRAM explicitly requested) ### GitHub Actions - Fix security workflow permissions for CodeQL SARIF uploads * chore: add gitignore rules to prevent root binaries being committed - Exclude /scheduler and /webhook from root (should use /bin folder) - Ensures binaries are only built to bin/ directory going forward * fix: add mutex locking to gangState access in NUMATopology - Wrap gangState reads with RLock in calculateGangAffinityScore - Wrap gangState reads/writes with Lock/Unlock in recordGangPlacement - Fixes unused field linting error by actually using the mutex
1 parent 72a63f2 commit a5b8f3d

File tree

6 files changed

+118
-27
lines changed

6 files changed

+118
-27
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
*.out
77
coverage.txt
88

9-
# Binaries in root directory
9+
# Binaries in root directory (should be in /bin only)
1010
kubenexus-scheduler
1111
kubenexus-webhook
12+
/scheduler
13+
/webhook
1214

1315
# used for the code generators only
1416
/vendor/

pkg/plugins/coscheduling/coscheduling.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,15 @@ func (cs *Coscheduling) Permit(ctx context.Context, state framework.CycleState,
297297
klog.V(3).InfoS("Permit: pod group ready to schedule",
298298
"namespace", namespace, "podGroup", podGroupName, "current", current, "minAvailable", minAvailable)
299299
schedulermetrics.GangSchedulingDecisions.WithLabelValues("success", namespace).Inc()
300-
schedulermetrics.GangCompletionLatency.WithLabelValues(namespace, podGroupName, fmt.Sprintf("%d", minAvailable)).Observe(time.Since(time.Now()).Seconds())
300+
301+
// Record gang completion latency from initial submission time
302+
key := utils.GetPodGroupKey(namespace, podGroupName)
303+
if pgInfoVal, ok := cs.podGroupInfos.Load(key); ok {
304+
if pgInfo, ok := pgInfoVal.(*PodGroupInfo); ok && pgInfo.timestamp.Unix() > 0 {
305+
age := time.Since(pgInfo.timestamp)
306+
schedulermetrics.GangCompletionLatency.WithLabelValues(namespace, podGroupName, fmt.Sprintf("%d", minAvailable)).Observe(age.Seconds())
307+
}
308+
}
301309

302310
// Safely call IterateOverWaitingPods with recovery for test frameworks
303311
if cs.frameworkHandle != nil {
@@ -367,11 +375,17 @@ func (cs *Coscheduling) calculateTotalPods(podGroupName, namespace string) int {
367375
}
368376

369377
func (cs *Coscheduling) calculateRunningPodsExcluding(podGroupName, namespace string, excludeName string) int {
370-
selector := labels.Set{PodGroupName: podGroupName}.AsSelector()
378+
// Try new label first (match calculateTotalPods logic)
379+
selector := labels.Set{"pod-group.scheduling.kubenexus.io/name": podGroupName}.AsSelector()
371380
pods, err := cs.podLister.Pods(namespace).List(selector)
372-
if err != nil {
373-
klog.ErrorS(err, "calculateRunningPods: error listing pods")
374-
return 0
381+
if err != nil || len(pods) == 0 {
382+
// Fallback to old label for backward compatibility
383+
selector = labels.Set{PodGroupName: podGroupName}.AsSelector()
384+
pods, err = cs.podLister.Pods(namespace).List(selector)
385+
if err != nil {
386+
klog.ErrorS(err, "calculateRunningPods: error listing pods")
387+
return 0
388+
}
375389
}
376390

377391
running := 0

pkg/plugins/numatopology/numatopology.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"math"
2424
"strconv"
2525
"strings"
26+
"sync"
2627

2728
v1 "k8s.io/api/core/v1"
2829
"k8s.io/apimachinery/pkg/runtime"
@@ -156,6 +157,7 @@ type GangNUMAState struct {
156157
// NUMATopology implements NUMA-aware scheduling with advanced features
157158
type NUMATopology struct {
158159
handle framework.Handle
160+
mu sync.RWMutex // Protect gangState from concurrent access
159161
gangState map[string]*GangNUMAState // Gang group -> state
160162
}
161163

@@ -300,18 +302,23 @@ func (n *NUMATopology) Score(ctx context.Context, state framework.CycleState, po
300302
}
301303

302304
// 1. NUMA FIT QUALITY (40%)
303-
cpuUtilization := float64(podCPU) / float64(len(numa.CPUs)) * 100.0
304-
memUtilization := float64(podMemory) / float64(numa.TotalMemory) * 100.0
305-
306-
// Weighted average: 60% CPU, 40% memory
307-
utilization := (cpuUtilization * 0.6) + (memUtilization * 0.4)
308-
309-
// Optimal utilization: 50-70% (leaves room for growth, not too fragmented)
310-
fitScore = 100.0 - math.Abs(utilization-60.0)
311-
if fitScore < 0 {
312-
fitScore = 0
305+
// Calculate remaining capacity after placing pod
306+
cpuRemaining := float64(numa.AvailableCPUs - int(podCPU))
307+
memRemaining := float64(numa.AvailableMemory - podMemory)
308+
309+
// Normalize to 0-1 range (higher is better - more room for growth)
310+
cpuFitScore := cpuRemaining / float64(len(numa.CPUs))
311+
if cpuFitScore < 0 {
312+
cpuFitScore = 0
313+
}
314+
memFitScore := memRemaining / float64(numa.TotalMemory)
315+
if memFitScore < 0 {
316+
memFitScore = 0
313317
}
314318

319+
// Weighted average: 60% CPU, 40% memory (prefer CPU locality)
320+
fitScore = (cpuFitScore*0.6 + memFitScore*0.4) * 100.0
321+
315322
// Boost if in preferred NUMA list
316323
if n.isNUMAInList(numa.ID, preferredNUMAs) {
317324
fitScore = math.Min(100.0, fitScore*1.2) // 20% boost
@@ -688,8 +695,11 @@ func (n *NUMATopology) calculateGangAffinityScore(pod *v1.Pod, numa NUMANode, no
688695
return 50.0 // Neutral score, not a gang member
689696
}
690697

691-
// Get gang state
698+
// Get gang state (with lock)
699+
n.mu.RLock()
692700
gangState, exists := n.gangState[gangGroup]
701+
n.mu.RUnlock()
702+
693703
if !exists || len(gangState.AssignedMembers) == 0 {
694704
return 50.0 // First gang member, neutral score
695705
}
@@ -753,6 +763,10 @@ func (n *NUMATopology) recordGangPlacement(pod *v1.Pod, numaID int, node *v1.Nod
753763
return
754764
}
755765

766+
// Lock for gang state access
767+
n.mu.Lock()
768+
defer n.mu.Unlock()
769+
756770
// Initialize gang state if needed
757771
if n.gangState == nil {
758772
n.gangState = make(map[string]*GangNUMAState)

pkg/plugins/resourcefragmentation/fragmentationscore.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package resourcefragmentation
2121

2222
import (
2323
"context"
24+
"fmt"
2425
"strconv"
2526

2627
v1 "k8s.io/api/core/v1"
@@ -66,6 +67,7 @@ type ResourceFragmentationScore struct {
6667
}
6768

6869
var _ framework.ScorePlugin = &ResourceFragmentationScore{}
70+
var _ framework.FilterPlugin = &ResourceFragmentationScore{}
6971

7072
type GPUIsland struct {
7173
NodeName string
@@ -83,6 +85,49 @@ func (rf *ResourceFragmentationScore) Name() string {
8385
return Name
8486
}
8587

88+
// Filter filters out nodes that don't have sufficient GPUs or violate tenant restrictions
89+
func (rf *ResourceFragmentationScore) Filter(ctx context.Context, state framework.CycleState, pod *v1.Pod, nodeInfo framework.NodeInfo) *framework.Status {
90+
requestedGPUs := getGPURequest(pod)
91+
if requestedGPUs == 0 {
92+
// No GPU request, allow node
93+
return framework.NewStatus(framework.Success)
94+
}
95+
96+
island := rf.detectGPUIsland(nodeInfo)
97+
if island == nil {
98+
// No GPU island detected, filter out
99+
return framework.NewStatus(framework.Unschedulable, "node has no GPU resources")
100+
}
101+
102+
// Check if node has sufficient GPUs
103+
if island.AvailableGPUs < requestedGPUs {
104+
klog.V(3).InfoS("Filter: insufficient GPUs on node",
105+
"pod", klog.KObj(pod),
106+
"node", nodeInfo.Node().Name,
107+
"requested", requestedGPUs,
108+
"available", island.AvailableGPUs)
109+
return framework.NewStatus(framework.Unschedulable,
110+
fmt.Sprintf("insufficient GPUs: need %d, available %d", requestedGPUs, island.AvailableGPUs))
111+
}
112+
113+
// TENANT-AWARE ISLAND PROTECTION: Hard reject if tenant mismatch
114+
podTenantTier := rf.getPodTenantTier(state, pod)
115+
if island.TenantTier != "" && island.TenantTier != "none" {
116+
if !rf.isTenantAllowed(podTenantTier, island.TenantTier) {
117+
klog.V(3).InfoS("Filter: rejecting node due to tenant tier mismatch",
118+
"pod", klog.KObj(pod),
119+
"node", nodeInfo.Node().Name,
120+
"podTenantTier", podTenantTier,
121+
"nodeTenantTier", island.TenantTier)
122+
return framework.NewStatus(framework.Unschedulable,
123+
fmt.Sprintf("pod tenant %s cannot use node reserved for tenant %s",
124+
podTenantTier, island.TenantTier))
125+
}
126+
}
127+
128+
return framework.NewStatus(framework.Success)
129+
}
130+
86131
func (rf *ResourceFragmentationScore) Score(ctx context.Context, state framework.CycleState, pod *v1.Pod, nodeInfo framework.NodeInfo) (int64, *framework.Status) {
87132
island := rf.detectGPUIsland(nodeInfo)
88133
if island == nil {
@@ -237,8 +282,9 @@ func (rf *ResourceFragmentationScore) detectGPUIsland(nodeInfo framework.NodeInf
237282
}
238283

239284
isPristine := allocatedGPUCount == 0
240-
if val, ok := node.Labels[LabelGPUIsPristine]; ok && val == "true" {
241-
isPristine = true
285+
// Label explicitly overrides allocation count
286+
if val, ok := node.Labels[LabelGPUIsPristine]; ok {
287+
isPristine = val == "true"
242288
}
243289

244290
// Check if node is reserved for a specific tenant tier

pkg/plugins/resourcereservation/resourcereservation.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,8 +431,13 @@ func (rr *ResourceReservation) isGangComplete(pod *v1.Pod, podGroupName string,
431431
continue
432432
}
433433

434-
// Count pods that are scheduled (have NodeName assigned)
435-
if p.Spec.NodeName != "" {
434+
// Skip terminating pods (don't count them as scheduled)
435+
if p.DeletionTimestamp != nil {
436+
continue
437+
}
438+
439+
// Count pods that are scheduled (have NodeName assigned) and running
440+
if p.Spec.NodeName != "" && p.Status.Phase == v1.PodRunning {
436441
scheduledCount++
437442
}
438443
}

pkg/plugins/vramscheduler/vramscheduler.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,9 @@ func (v *VRAMScheduler) Score(ctx context.Context, state framework.CycleState, p
243243

244244
// Calculate how many GPUs the pod needs based on GPU request
245245
gpusRequested := getGPURequest(pod)
246-
if gpusRequested == 0 {
247-
gpusRequested = 1 // Default to 1 GPU if not specified
246+
if gpusRequested == 0 && vramRequest > 0 {
247+
// Only default to 1 GPU if VRAM was explicitly requested
248+
gpusRequested = 1
248249
}
249250

250251
// Calculate total VRAM needed and available
@@ -702,7 +703,11 @@ func (v *VRAMScheduler) getNodeGPUTopology(ctx context.Context, node *v1.Node) (
702703
// PRIORITY 1: DRA ResourceSlices (Kubernetes 1.34+)
703704
// Provides: Full topology (VRAM, NUMA, PCIe, NVLink), dynamic updates
704705
if v.resourceSliceLister != nil {
705-
vramPerGPU, devices, err := v.getGPUTopologyFromDRA(ctx, node)
706+
// Add 5-second timeout to DRA queries to prevent scheduler stalls
707+
draCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
708+
vramPerGPU, devices, err := v.getGPUTopologyFromDRA(draCtx, node)
709+
cancel()
710+
706711
if err == nil && len(devices) > 0 {
707712
klog.V(4).InfoS("✅ Using GPU topology from DRA ResourceSlices",
708713
"node", node.Name,
@@ -712,9 +717,14 @@ func (v *VRAMScheduler) getNodeGPUTopology(ctx context.Context, node *v1.Node) (
712717
schedulermetrics.DataSourceUsage.WithLabelValues("DRA").Inc()
713718
return vramPerGPU, devices
714719
}
715-
klog.V(5).InfoS("DRA ResourceSlices not available, trying NFD labels",
716-
"node", node.Name,
717-
"reason", err)
720+
if err == context.DeadlineExceeded {
721+
klog.V(3).InfoS("DRA query timeout, falling back to NFD",
722+
"node", node.Name)
723+
} else {
724+
klog.V(5).InfoS("DRA ResourceSlices not available, trying NFD labels",
725+
"node", node.Name,
726+
"reason", err)
727+
}
718728
} else {
719729
klog.V(5).InfoS("DRA not available (lister nil), trying NFD labels",
720730
"node", node.Name)

0 commit comments

Comments
 (0)