Skip to content

Commit 7d147df

Browse files
0x5457Code2Life
andauthored
fix: Skip deleted workers during allocation state reconciliation (#239)
* fix: Skip deleted workers during allocation state reconciliation * fix: tsdb SQL schema typing issue --------- Co-authored-by: Joey <[email protected]>
1 parent d26a533 commit 7d147df

File tree

6 files changed

+32
-25
lines changed

6 files changed

+32
-25
lines changed

internal/controller/tensorfusionworkload_controller.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -359,10 +359,7 @@ func (r *TensorFusionWorkloadReconciler) handlePodGPUCleanup(ctx context.Context
359359
return types.NamespacedName{Name: gpuName}
360360
})
361361
// Release GPU resources
362-
if err := r.Allocator.Dealloc(ctx, tfv1.NameNamespace{Name: workload.Name, Namespace: workload.Namespace}, workload.Spec.Resources.Requests, gpus); err != nil {
363-
log.Error(err, "Failed to release GPU resources, will retry", "gpus", gpus, "pod", pod.Name)
364-
return false, err
365-
}
362+
r.Allocator.Dealloc(ctx, tfv1.NameNamespace{Name: workload.Name, Namespace: workload.Namespace}, workload.Spec.Resources.Requests, gpus)
366363
log.Info("Released GPU resources via finalizer", "gpus", gpus, "pod", pod.Name)
367364

368365
return true, nil
@@ -383,7 +380,6 @@ func (r *TensorFusionWorkloadReconciler) deletePod(ctx context.Context, pod *cor
383380

384381
// scaleUpWorkers handles the scaling up of worker pods
385382
func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, workerGenerator *worker.WorkerGenerator, workload *tfv1.TensorFusionWorkload, count int, hash string) (ctrl.Result, error) {
386-
log := log.FromContext(ctx)
387383
workloadNameNs := tfv1.NameNamespace{Namespace: workload.Namespace, Name: workload.Name}
388384
// Create worker pods
389385
for range count {
@@ -408,10 +404,7 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor
408404
gpus := lo.Map(gpus, func(gpu *tfv1.GPU, _ int) types.NamespacedName {
409405
return client.ObjectKeyFromObject(gpu)
410406
})
411-
releaseErr := r.Allocator.Dealloc(ctx, workloadNameNs, workload.Spec.Resources.Requests, gpus)
412-
if releaseErr != nil {
413-
log.Error(releaseErr, "Failed to release GPU after pod creation failure", "gpus", gpus)
414-
}
407+
r.Allocator.Dealloc(ctx, workloadNameNs, workload.Spec.Resources.Requests, gpus)
415408
return ctrl.Result{}, fmt.Errorf("create worker pod: %w", err)
416409
}
417410

internal/gpuallocator/gpuallocator.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (s *GpuAllocator) Alloc(ctx context.Context, req AllocRequest) ([]*tfv1.GPU
150150
}
151151

152152
// Dealloc a request from gpu to release available resources on it.
153-
func (s *GpuAllocator) Dealloc(ctx context.Context, workloadNameNamespace tfv1.NameNamespace, request tfv1.Resource, gpus []types.NamespacedName) error {
153+
func (s *GpuAllocator) Dealloc(ctx context.Context, workloadNameNamespace tfv1.NameNamespace, request tfv1.Resource, gpus []types.NamespacedName) {
154154
log := log.FromContext(ctx)
155155
s.storeMutex.Lock()
156156
defer s.storeMutex.Unlock()
@@ -175,7 +175,6 @@ func (s *GpuAllocator) Dealloc(ctx context.Context, workloadNameNamespace tfv1.N
175175
s.markGPUDirty(gpu)
176176
}
177177

178-
return nil
179178
}
180179

181180
func NewGpuAllocator(ctx context.Context, client client.Client, syncInterval time.Duration) *GpuAllocator {
@@ -452,7 +451,7 @@ func (s *GpuAllocator) syncToK8s(ctx context.Context) {
452451
if node.Annotations == nil {
453452
// Create annotations if they don't exist
454453
patch = []byte(`[{
455-
"op": "add",
454+
"op": "add",
456455
"path": "/metadata/annotations",
457456
"value": {
458457
"` + constants.GPULastReportTimeAnnotationKey + `": "` + timeValue + `"
@@ -464,7 +463,7 @@ func (s *GpuAllocator) syncToK8s(ctx context.Context) {
464463
"op": "add",
465464
"path": "/metadata/annotations/` + encodedKey + `",
466465
"value": "` + timeValue + `"
467-
}]`)
466+
}]`)
468467
}
469468

470469
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
@@ -501,6 +500,10 @@ func (s *GpuAllocator) markGPUDirty(key types.NamespacedName) {
501500
s.dirtyQueue[key] = struct{}{}
502501
}
503502

503+
func (s *GpuAllocator) markGPUDirtyLoced(key types.NamespacedName) {
504+
s.dirtyQueue[key] = struct{}{}
505+
}
506+
504507
// When it's leader, should reconcile state based on existing workers
505508
// this function is run inside storeMutex lock
506509
func (s *GpuAllocator) reconcileAllocationState(ctx context.Context) {
@@ -517,6 +520,9 @@ func (s *GpuAllocator) reconcileAllocationState(ctx context.Context) {
517520
vramCapacityMap := make(map[types.NamespacedName]resource.Quantity)
518521
gpuMap := make(map[types.NamespacedName]*tfv1.GPU)
519522

523+
defer s.storeMutex.Unlock()
524+
s.storeMutex.Lock()
525+
520526
for gpuKey, gpu := range s.gpuStore {
521527
if gpu.Status.Capacity != nil {
522528
tflopsCapacityMap[gpuKey] = gpu.Status.Capacity.Tflops
@@ -527,6 +533,9 @@ func (s *GpuAllocator) reconcileAllocationState(ctx context.Context) {
527533
}
528534

529535
for _, worker := range workers.Items {
536+
if !worker.DeletionTimestamp.IsZero() {
537+
continue
538+
}
530539
tflopsRequest, _ := resource.ParseQuantity(worker.Annotations[constants.TFLOPSRequestAnnotation])
531540
vramRequest, _ := resource.ParseQuantity(worker.Annotations[constants.VRAMRequestAnnotation])
532541
gpuIds := worker.Annotations[constants.GpuKey]
@@ -559,7 +568,7 @@ func (s *GpuAllocator) reconcileAllocationState(ctx context.Context) {
559568
if !sameTflops || !sameVRAM {
560569
gpu.Status.Available.Tflops = tflopsCapacityMap[gpuKey]
561570
gpu.Status.Available.Vram = vramCapacityMap[gpuKey]
562-
s.markGPUDirty(gpuKey)
571+
s.markGPUDirtyLoced(gpuKey)
563572
log.FromContext(ctx).Info("Correcting gpu available resources", "gpu", gpuKey.Name, "tflops", gpu.Status.Available.Tflops.String(), "vram", gpu.Status.Available.Vram.String())
564573
}
565574
}

internal/gpuallocator/gpuallocator_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,9 @@ var _ = Describe("GPU Allocator", func() {
4848
}
4949

5050
deallocateAndSync := func(gpus []*tfv1.GPU, request tfv1.Resource) {
51-
err := allocator.Dealloc(ctx, workloadNameNs, request, lo.Map(gpus, func(gpu *tfv1.GPU, _ int) types.NamespacedName {
51+
allocator.Dealloc(ctx, workloadNameNs, request, lo.Map(gpus, func(gpu *tfv1.GPU, _ int) types.NamespacedName {
5252
return client.ObjectKeyFromObject(gpu)
5353
}))
54-
Expect(err).NotTo(HaveOccurred())
5554
allocator.syncToK8s(ctx)
5655
}
5756

internal/metrics/recorder.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,20 @@ type ActiveNodeAndWorker struct {
4040
}
4141

4242
func RemoveWorkerMetrics(workerName string, deletionTime time.Time) {
43+
defer workerMetricsLock.Unlock()
4344
workerMetricsLock.Lock()
45+
4446
// to get more accurate metrics, should record the deletion timestamp to calculate duration for the last metrics
45-
workerMetricsMap[workerName].deletionTimestamp = &deletionTime
46-
workerMetricsLock.Unlock()
47+
if _, ok := workerMetricsMap[workerName]; ok {
48+
workerMetricsMap[workerName].deletionTimestamp = &deletionTime
49+
}
4750
}
4851

4952
func RemoveNodeMetrics(nodeName string) {
53+
defer nodeMetricsLock.Unlock()
5054
nodeMetricsLock.Lock()
5155
// Node lifecycle is much longer than worker, so just delete the metrics, 1 minute metrics interval is enough
5256
delete(nodeMetricsMap, nodeName)
53-
nodeMetricsLock.Unlock()
5457
}
5558

5659
func SetWorkerMetricsByWorkload(pod *corev1.Pod, workload *tfv1.TensorFusionWorkload, now time.Time) {

internal/metrics/tag_parser.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,19 @@ func getInitTableSQL(model schema.Tabler, ttl string) string {
4141
var indexClass string
4242
var isIndex bool
4343
var extraOption string
44+
timePrecision := "ns"
4445

4546
// Split by semicolon first
46-
parts := strings.Split(gormTag, ";")
47-
for _, part := range parts {
47+
parts := strings.SplitSeq(gormTag, ";")
48+
for part := range parts {
4849
if part == "" {
4950
continue
5051
}
5152

5253
// Split by colon
53-
keyValue := strings.Split(part, ",")
54+
keyValue := strings.SplitSeq(part, ",")
5455

55-
for _, key := range keyValue {
56+
for key := range keyValue {
5657
if strings.HasPrefix(key, "column:") {
5758
columnName = strings.TrimPrefix(key, "column:")
5859
} else if strings.HasPrefix(key, "index:") {
@@ -61,6 +62,8 @@ func getInitTableSQL(model schema.Tabler, ttl string) string {
6162
indexClass = strings.TrimPrefix(key, "class:")
6263
} else if strings.HasPrefix(key, "option:") {
6364
extraOption = strings.TrimPrefix(key, "option:")
65+
} else if strings.HasPrefix(key, "precision:") {
66+
timePrecision = strings.TrimPrefix(key, "precision:")
6467
}
6568
}
6669
}
@@ -85,7 +88,7 @@ func getInitTableSQL(model schema.Tabler, ttl string) string {
8588
default:
8689
// Check if it's time.Time
8790
if field.Type == reflect.TypeOf(time.Time{}) {
88-
dbType = "Timestamp_ms"
91+
dbType = fmt.Sprintf("Timestamp_%s", timePrecision)
8992
isNullable = false
9093
} else {
9194
// Default to String for unknown types

internal/metrics/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ type TFSystemLog struct {
110110

111111
// NOTE: make sure new fields will be migrated in SetupTable function
112112

113-
GreptimeTimestamp time.Time `json:"greptime_timestamp" gorm:"column:greptime_timestamp;index:,class:TIME"`
113+
GreptimeTimestamp time.Time `json:"greptime_timestamp" gorm:"column:greptime_timestamp;index:,class:TIME;precision:ms"`
114114
}
115115

116116
func (sl TFSystemLog) TableName() string {

0 commit comments

Comments
 (0)