Skip to content

Commit 688eb9a

Browse files
authored
fix: potential panic when add schema twice (#292)
* fix: potential panic when add schema twice * fix: gpu node claim and resource calc bugs * fix: lint * fix: add compact log * fix: improve GPU resource cleanup and node compaction logic * fix: use post bind to allocate GPU to avoid side effect of preBind * fix: add fallback loop to clean GPU resources * fix: compaction unit test issues * fix: ut and lint issues * fix: ut issue * fix: resolve potential conflict
1 parent d7480ab commit 688eb9a

31 files changed

+374
-205
lines changed

.vscode/launch.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
"--gpu-info-config", "${workspaceFolder}/config/samples/gpu-info-config.yaml",
6868
"--dynamic-config", "${workspaceFolder}/config/samples/dynamic-config.yaml",
6969
"--scheduler-config", "${workspaceFolder}/config/samples/scheduler-config.yaml",
70+
"-v", "4"
7071
],
7172
"program": "${workspaceFolder}/cmd/main.go",
7273
},

api/v1/gpu_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ const (
8282
// +kubebuilder:object:root=true
8383
// +kubebuilder:subresource:status
8484
// +kubebuilder:resource:scope=Cluster
85-
// +kubebuilder:printcolumn:name="GPU Model",type="string",JSONPath=".spec.gpuModel"
85+
// +kubebuilder:printcolumn:name="GPU Model",type="string",JSONPath=".status.gpuModel"
8686
// +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"
8787
// +kubebuilder:printcolumn:name="Total TFlops",type="string",JSONPath=".status.capacity.tflops"
8888
// +kubebuilder:printcolumn:name="Total VRAM",type="string",JSONPath=".status.capacity.vram"

charts/tensor-fusion/Chart.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ type: application
1515
# This is the chart version. This version number should be incremented each time you make changes
1616
# to the chart and its templates, including the app version.
1717
# Versions are expected to follow Semantic Versioning (https://semver.org/)
18-
version: 1.4.8
18+
version: 1.5.1
1919

2020
# This is the version number of the application being deployed. This version number should be
2121
# incremented each time you make changes to the application. Versions are not expected to
2222
# follow Semantic Versioning. They should reflect the version the application is using.
2323
# It is recommended to use it with quotes.
24-
appVersion: "1.42.1"
24+
appVersion: "1.43.5"

charts/tensor-fusion/crds/tensor-fusion.ai_gpus.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ spec:
1515
scope: Cluster
1616
versions:
1717
- additionalPrinterColumns:
18-
- jsonPath: .spec.gpuModel
18+
- jsonPath: .status.gpuModel
1919
name: GPU Model
2020
type: string
2121
- jsonPath: .status.phase

charts/tensor-fusion/templates/controller-deployment.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ spec:
4343
{{- toYaml .Values.controller.readinessProbe | nindent 12 }}
4444
resources:
4545
{{- toYaml .Values.controller.resources | nindent 12 }}
46+
ports:
47+
- name: http
48+
containerPort: 8080
49+
- name: metrics
50+
containerPort: 8081
4651
env:
4752
- name: OPERATOR_NAMESPACE
4853
valueFrom:

charts/tensor-fusion/templates/rbac.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,25 @@ rules:
163163
- get
164164
- patch
165165
- update
166+
- apiGroups:
167+
- karpenter.sh
168+
resources:
169+
- nodeclaims
170+
verbs:
171+
- delete
172+
- get
173+
- list
174+
- patch
175+
- update
176+
- watch
177+
- apiGroups:
178+
- karpenter.*
179+
resources:
180+
- *
181+
verbs:
182+
- get
183+
- list
184+
- watch
166185

167186
---
168187
apiVersion: rbac.authorization.k8s.io/v1

charts/tensor-fusion/values.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ schedulerConfig:
185185
reserve:
186186
enabled:
187187
- name: GPUResourcesFit
188-
preBind:
188+
postBind:
189189
enabled:
190190
- name: GPUResourcesFit
191191
pluginConfig:

cmd/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ var schedulerConfigPath string
9797
func init() {
9898
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
9999
utilruntime.Must(tfv1.AddToScheme(scheme))
100-
utilruntime.Must(tfv1.AddToScheme(scheme))
101100
// +kubebuilder:scaffold:scheme
102101
}
103102

cmd/nodediscovery/main.go

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import (
3535
"sigs.k8s.io/controller-runtime/pkg/log/zap"
3636
)
3737

38+
const TMP_PATH = "/tmp"
39+
3840
var Scheme = runtime.NewScheme()
3941

4042
func init() {
@@ -167,27 +169,22 @@ func main() {
167169
availableVRAM.Add(gpu.Status.Available.Vram)
168170
}
169171

170-
ns := nodeStatus()
171-
ns.TotalTFlops = totalTFlops
172-
ns.TotalVRAM = totalVRAM
173-
ns.AvailableTFlops = availableTFlops
174-
ns.AvailableVRAM = availableVRAM
175-
ns.TotalGPUs = int32(count)
176-
ns.ManagedGPUs = int32(count)
177-
ns.ManagedGPUDeviceIDs = allDeviceIDs
178-
ns.NodeInfo.RAMSize = *resource.NewQuantity(getTotalHostRAM(), resource.DecimalSI)
179-
ns.NodeInfo.DataDiskSize = *resource.NewQuantity(getDiskInfo(constants.TFDataPath), resource.DecimalSI)
180-
gpunode.Status = *ns
181-
172+
// Use proper patch-based update with retry on conflict
182173
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
174+
// Get the latest version of the resource
183175
currentGPUNode := &tfv1.GPUNode{}
184176
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(gpunode), currentGPUNode); err != nil {
185177
return err
186178
}
187179

188-
currentGPUNode.Status = *ns
180+
// Create a patch from the original to the desired state
181+
patch := client.MergeFrom(currentGPUNode.DeepCopy())
182+
183+
// Update status fields conditionally
184+
updateGPUNodeStatus(&currentGPUNode.Status, totalTFlops, totalVRAM, int32(count), allDeviceIDs)
189185

190-
return k8sClient.Status().Update(ctx, currentGPUNode)
186+
// Apply the patch using the status subresource
187+
return k8sClient.Status().Patch(ctx, currentGPUNode, patch)
191188
})
192189
if err != nil {
193190
ctrl.Log.Error(err, "failed to update status of GPUNode after retries")
@@ -268,6 +265,9 @@ func createOrUpdateTensorFusionGPU(
268265
if gpu.Status.UsedBy == "" {
269266
gpu.Status.UsedBy = tfv1.UsedByTensorFusion
270267
}
268+
if gpu.Status.Phase == "" {
269+
gpu.Status.Phase = tfv1.TensorFusionGPUPhasePending
270+
}
271271
return k8sClient.Status().Patch(ctx, gpu, client.Merge)
272272
})
273273
if err != nil {
@@ -278,12 +278,6 @@ func createOrUpdateTensorFusionGPU(
278278
return gpu
279279
}
280280

281-
func nodeStatus() *tfv1.GPUNodeStatus {
282-
return &tfv1.GPUNodeStatus{
283-
Phase: tfv1.TensorFusionGPUNodePhaseRunning,
284-
}
285-
}
286-
287281
func kubeClient() (client.Client, error) {
288282
kubeConfigEnvVar := os.Getenv("KUBECONFIG")
289283
var config *rest.Config
@@ -316,7 +310,7 @@ func kubeClient() (client.Client, error) {
316310
func getTotalHostRAM() int64 {
317311
v, err := mem.VirtualMemory()
318312
if err != nil {
319-
fmt.Printf("error getting memory info: %v\n", err)
313+
fmt.Printf("[warning] getting memory info failed: %v\n", err)
320314
return 0
321315
}
322316
return int64(v.Total)
@@ -325,7 +319,7 @@ func getTotalHostRAM() int64 {
325319
func getDiskInfo(path string) (total int64) {
326320
absPath, err := filepath.Abs(path)
327321
if err != nil {
328-
fmt.Printf("error getting disk path: %v\n", err)
322+
fmt.Printf("[warning] getting disk path failed: %v\n", err)
329323
return 0
330324
}
331325

@@ -335,20 +329,42 @@ func getDiskInfo(path string) (total int64) {
335329
if errors.Is(err, syscall.ENOENT) {
336330
err = os.MkdirAll(absPath, 0o755)
337331
if err != nil {
338-
fmt.Printf("error creating folder: %s, err: %v\n", absPath, err)
332+
fmt.Printf("[warning] creating folder to discover disk space failed: %s, err: %v\n", absPath, err)
339333
return 0
340334
}
341335
err = syscall.Statfs(absPath, &stat)
342336
if err != nil {
343-
fmt.Printf("error getting disk stats after creation: %v\n", err)
337+
fmt.Printf("[warning] getting disk stats after creation failed: %v\n", err)
344338
return 0
345339
}
346340
} else {
347-
fmt.Printf("error getting disk stats: %v\n", err)
341+
fmt.Printf("[warning] getting disk stats failed: %v\n", err)
348342
return 0
349343
}
350344
}
351345

352346
total = int64(stat.Blocks * uint64(stat.Bsize))
353347
return total
354348
}
349+
350+
// updateGPUNodeStatus conditionally updates GPUNode status fields
351+
// Only updates phase if it's empty, and available resources if they are empty
352+
func updateGPUNodeStatus(
353+
status *tfv1.GPUNodeStatus,
354+
totalTFlops, totalVRAM resource.Quantity,
355+
totalGPUs int32, deviceIDs []string) {
356+
// Always update these fields as they represent current state
357+
status.TotalTFlops = totalTFlops
358+
status.TotalVRAM = totalVRAM
359+
status.TotalGPUs = totalGPUs
360+
status.ManagedGPUs = totalGPUs
361+
status.ManagedGPUDeviceIDs = deviceIDs
362+
status.NodeInfo = tfv1.GPUNodeInfo{
363+
RAMSize: *resource.NewQuantity(getTotalHostRAM(), resource.DecimalSI),
364+
DataDiskSize: *resource.NewQuantity(getDiskInfo(TMP_PATH), resource.DecimalSI),
365+
}
366+
// Only update phase if it's empty (unset)
367+
if status.Phase == "" {
368+
status.Phase = tfv1.TensorFusionGPUNodePhasePending
369+
}
370+
}

cmd/sched/setup.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,6 @@ func SetupScheduler(
9595
recorderFactory := getRecorderFactory(&cc)
9696
completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
9797

98-
// TODO share the same informer with controller, do not use 'cc' to avoid duplicated watch
99-
10098
sched, err := scheduler.New(ctx,
10199
cc.Client,
102100
cc.InformerFactory,
@@ -145,9 +143,7 @@ func RunScheduler(ctx context.Context,
145143
cz.Set(cc.ComponentConfig)
146144
}
147145

148-
// Start events processing pipeline.
149146
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
150-
defer cc.EventBroadcaster.Shutdown()
151147

152148
startInformersAndWaitForSync := func(ctx context.Context) {
153149
// Start all informers.
@@ -176,6 +172,7 @@ func RunScheduler(ctx context.Context,
176172
<-mgr.Elected()
177173
logger.Info("Starting scheduling cycle")
178174
sched.Run(ctx)
175+
cc.EventBroadcaster.Shutdown()
179176
}()
180177
return nil
181178
}
@@ -199,24 +196,24 @@ func preHandleConfig(cfgPath string) (string, error) {
199196
if err != nil {
200197
return "", err
201198
}
202-
var cfgRaw map[string]interface{}
199+
var cfgRaw map[string]any
203200
err = yaml.Unmarshal(cfgBytes, &cfgRaw)
204201
if err != nil {
205202
return "", err
206203
}
207204

208205
// Replace $HOME with actual home directory
209-
if cfgRaw[clientConnectionCfgKey].(map[string]interface{})[kubeConfigCfgKey] != "" {
210-
cfgRaw[clientConnectionCfgKey].(map[string]interface{})[kubeConfigCfgKey] = strings.ReplaceAll(
211-
cfgRaw[clientConnectionCfgKey].(map[string]interface{})[kubeConfigCfgKey].(string),
206+
if cfgRaw[clientConnectionCfgKey].(map[string]any)[kubeConfigCfgKey] != "" {
207+
cfgRaw[clientConnectionCfgKey].(map[string]any)[kubeConfigCfgKey] = strings.ReplaceAll(
208+
cfgRaw[clientConnectionCfgKey].(map[string]any)[kubeConfigCfgKey].(string),
212209
"$HOME",
213210
os.Getenv("HOME"),
214211
)
215212
}
216213

217214
// Replace to KUBECONFIG path if env var exists
218215
if os.Getenv("KUBECONFIG") != "" {
219-
cfgRaw[clientConnectionCfgKey].(map[string]interface{})[kubeConfigCfgKey] = os.Getenv("KUBECONFIG")
216+
cfgRaw[clientConnectionCfgKey].(map[string]any)[kubeConfigCfgKey] = os.Getenv("KUBECONFIG")
220217
}
221218

222219
cfgBytes, err = yaml.Marshal(cfgRaw)

0 commit comments

Comments
 (0)