Skip to content

Commit 82fc9c8

Browse files
authored
fix: optimize scheduler event (#299)
* fix: improve GPU resource cleanup and node compaction logic * fix: add fallback loop to clean GPU resources * fix: tmp test * fix: add pod event for gpu scheduler * fix: optimize scheduler event * fix: ut typo
1 parent 978546f commit 82fc9c8

File tree

4 files changed

+35
-7
lines changed

4 files changed

+35
-7
lines changed

internal/controller/gpupool_compaction_controller.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type GPUPoolCompactionReconciler struct {
2929
Recorder record.EventRecorder
3030

3131
Allocator *gpuallocator.GpuAllocator
32+
33+
markDeletionNodes map[string]struct{}
3234
}
3335

3436
var defaultCompactionDuration = 1 * time.Minute
@@ -71,9 +73,16 @@ func (r *GPUPoolCompactionReconciler) checkNodeCompaction(ctx context.Context, p
7173

7274
for _, gpu := range gpuStore {
7375
if !gpu.DeletionTimestamp.IsZero() || gpu.Labels[constants.GpuPoolKey] != pool.Name ||
74-
gpu.Status.UsedBy != tfv1.UsedByTensorFusion {
76+
gpu.Status.UsedBy != tfv1.UsedByTensorFusion || len(gpu.Status.NodeSelector) == 0 {
7577
continue
7678
}
79+
80+
k8sNodeName := gpu.Status.NodeSelector[constants.KubernetesHostNameLabel]
81+
if _, ok := r.markDeletionNodes[k8sNodeName]; ok {
82+
log.V(4).Info("skip node already marked for deletion when calculation capacity", "node", k8sNodeName)
83+
continue
84+
}
85+
7786
availableTFlops, _ := gpu.Status.Available.Tflops.AsInt64()
7887
poolAvailableTFlops += availableTFlops
7988
availableVRAM, _ := gpu.Status.Available.Vram.AsInt64()
@@ -153,6 +162,7 @@ func (r *GPUPoolCompactionReconciler) checkNodeCompaction(ctx context.Context, p
153162
poolAvailableVRAM -= nodeCapVRAM
154163
poolTotalTFlops -= nodeCapTFlops
155164
poolTotalVRAM -= nodeCapVRAM
165+
r.markDeletionNodes[k8sNodeName] = struct{}{}
156166

157167
log.Info("Empty node can be compacted - provision mode", "node", gpuNode.Name,
158168
"availableTFlopsAfterCompact", poolAvailableTFlops,
@@ -185,6 +195,7 @@ func (r *GPUPoolCompactionReconciler) checkNodeCompaction(ctx context.Context, p
185195
poolAvailableVRAM -= nodeCapVRAM
186196
poolTotalTFlops -= nodeCapTFlops
187197
poolTotalVRAM -= nodeCapVRAM
198+
r.markDeletionNodes[k8sNodeName] = struct{}{}
188199

189200
log.Info("Empty node can be compacted - auto-select mode", "node", gpuNode.Name,
190201
"availableTFlopsAfterCompact", poolAvailableTFlops,
@@ -227,7 +238,7 @@ func (r *GPUPoolCompactionReconciler) checkNodeCompaction(ctx context.Context, p
227238
func (r *GPUPoolCompactionReconciler) getCompactionDuration(ctx context.Context, config *tfv1.NodeManagerConfig) time.Duration {
228239
log := log.FromContext(ctx)
229240
if config == nil || config.NodeCompaction == nil || config.NodeCompaction.Period == "" {
230-
log.Info("empty node compaction config, use default value", "duration", defaultCompactionDuration)
241+
log.V(4).Info("empty node compaction config, use default value", "duration", defaultCompactionDuration)
231242
return defaultCompactionDuration
232243
}
233244
duration, err := time.ParseDuration(config.NodeCompaction.Period)
@@ -307,6 +318,7 @@ func (r *GPUPoolCompactionReconciler) Reconcile(ctx context.Context, req ctrl.Re
307318

308319
// SetupWithManager sets up the controller with the Manager.
309320
func (r *GPUPoolCompactionReconciler) SetupWithManager(mgr ctrl.Manager) error {
321+
r.markDeletionNodes = make(map[string]struct{})
310322
return ctrl.NewControllerManagedBy(mgr).
311323
Named("gpupool-compaction").
312324
WatchesMetadata(&tfv1.GPUPool{}, &handler.EnqueueRequestForObject{}).
@@ -315,5 +327,5 @@ func (r *GPUPoolCompactionReconciler) SetupWithManager(mgr ctrl.Manager) error {
315327

316328
func SetTestModeCompactionPeriod() {
317329
defaultCompactionDuration = 700 * time.Millisecond
318-
newNodeProtectionDuration = 1200 * time.Millisecond
330+
newNodeProtectionDuration = 1000 * time.Millisecond
319331
}

internal/controller/suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ func (b *TensorFusionEnvBuilder) Build() *TensorFusionEnv {
755755
gpuNode.Status.TotalVRAM = resource.MustParse(fmt.Sprintf("%dGi", 2000*gpuCount))
756756
gpuNode.Status.AvailableTFlops = gpuNode.Status.TotalTFlops
757757
gpuNode.Status.AvailableVRAM = gpuNode.Status.TotalVRAM
758-
Expect(k8sClient.Status().Update(ctx, gpuNode)).To(Succeed())
758+
g.Expect(k8sClient.Status().Update(ctx, gpuNode)).To(Succeed())
759759
}).Should(Succeed())
760760
}
761761
}

internal/scheduler/gpuresources/gpuresources.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"sort"
7+
"strconv"
78
"strings"
89

910
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
@@ -50,7 +51,7 @@ type GPUSchedulingStateData struct {
5051
ValidNodeGPUScore map[string]map[string]int
5152

5253
// In Reserve stage, bind GPUs to pod, update allocator cache
53-
// In PreBind stage, fetch final GPUs call Pod patch API to update annotation
54+
// In PostBind stage, fetch final GPUs call Pod patch API to update annotation
5455
FinalGPUs []string
5556
}
5657

@@ -93,6 +94,9 @@ func (s *GPUFit) PreFilter(ctx context.Context, state *framework.CycleState, pod
9394
// Handle progressive migration case
9495
if utils.IsProgressiveMigration() && utils.HasGPUResourceRequest(pod) {
9596
nodeNames := s.allocator.ListNonUsingNodes()
97+
s.fh.EventRecorder().Eventf(pod, pod, v1.EventTypeNormal, "ScheduleWithNativeGPU",
98+
"Scheduling non-TF workload for progressive migration",
99+
"use native GPU resources, available native GPU nodes: "+strconv.Itoa(len(nodeNames)))
96100
return &framework.PreFilterResult{
97101
NodeNames: nodeNames,
98102
}, framework.NewStatus(framework.Success, "progressive migration for native resources claim")
@@ -123,6 +127,8 @@ func (s *GPUFit) PreFilter(ctx context.Context, state *framework.CycleState, pod
123127
}
124128

125129
if err != nil {
130+
s.fh.EventRecorder().Eventf(pod, pod, v1.EventTypeWarning, "GPUQuotaOrCapacityNotEnough",
131+
"check quota and filter", "TensorFusion schedule failed, no enough resource or quotas: "+err.Error())
126132
s.logger.Error(err, "failed to check quota and filter", "pod", pod.Name)
127133
return nil, framework.NewStatus(framework.Unschedulable, err.Error())
128134
}
@@ -139,6 +145,9 @@ func (s *GPUFit) PreFilter(ctx context.Context, state *framework.CycleState, pod
139145
// assign score based on different strategies
140146
score := s.allocator.Score(ctx, s.cfg, allocRequest, validNodes)
141147

148+
s.fh.EventRecorder().Eventf(pod, pod, v1.EventTypeNormal, "PreScheduleDone", "pre filter for TensorFusion workload",
149+
"TensorFusion pre schedule done, valid GPU node count: "+strconv.Itoa(nodeNames.Len()))
150+
142151
if s.logger.V(6).Enabled() {
143152
jsonStr, _ := json.Marshal(validNodes)
144153
scoreJsonStr, _ := json.Marshal(score)
@@ -298,5 +307,10 @@ func (s *GPUFit) PostBind(ctx context.Context, state *framework.CycleState, pod
298307
err = s.client.Patch(s.ctx, pod, client.RawPatch(types.JSONPatchType, patch))
299308
if err != nil {
300309
s.logger.Error(err, "failed to patch gpu device ids", "pod", pod.Name)
310+
s.fh.EventRecorder().Eventf(pod, pod, v1.EventTypeWarning, "GPUDeviceAllocatedFailed",
311+
"Attach GPU device ID info failed", "Can not add GPU device IDs: "+gpuIDs)
312+
} else {
313+
s.fh.EventRecorder().Eventf(pod, pod, v1.EventTypeNormal, "GPUDeviceAllocated",
314+
"Attach GPU device ID info", "Attach TensorFusion GPU device IDs to Pod: "+gpuIDs)
301315
}
302316
}

internal/scheduler/gpuresources/gpuresources_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"k8s.io/apimachinery/pkg/runtime"
1616
"k8s.io/apimachinery/pkg/types"
1717
"k8s.io/client-go/kubernetes/scheme"
18+
"k8s.io/client-go/tools/events"
1819
"k8s.io/kubernetes/pkg/scheduler/framework"
1920
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
2021
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
@@ -236,6 +237,7 @@ func (s *GPUResourcesSuite) SetupTest() {
236237
s.ctx, registeredPlugins, "",
237238
frameworkruntime.WithPodNominator(testutil.NewPodNominator(nil)),
238239
frameworkruntime.WithSnapshotSharedLister(testutil.NewFakeSharedLister(pods, nodes)),
240+
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
239241
)
240242
s.NoError(err)
241243
s.fwk = fwk
@@ -494,8 +496,8 @@ func (s *GPUResourcesSuite) TestReserveAndUnreserve() {
494496
s.Len(gpu.Status.RunningApps, 1)
495497
}
496498

497-
func (s *GPUResourcesSuite) TestPreBind() {
498-
log.FromContext(s.ctx).Info("Running TestPreBind")
499+
func (s *GPUResourcesSuite) TestPostBind() {
500+
log.FromContext(s.ctx).Info("Running TestPostBind")
499501
state := framework.NewCycleState()
500502
pod := s.makePod("p1",
501503
map[string]string{

0 commit comments

Comments
 (0)