Skip to content

Commit d5292a7

Browse files
authored
feat: output worker metrics (#67)
1 parent 985f2d2 commit d5292a7

File tree

3 files changed

+33
-27
lines changed

3 files changed

+33
-27
lines changed

internal/controller/pod_controller.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@ import (
2222

2323
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
2424
"github.com/NexusGPU/tensor-fusion/internal/constants"
25-
"github.com/NexusGPU/tensor-fusion/internal/metrics"
26-
webhookv1 "github.com/NexusGPU/tensor-fusion/internal/webhook/v1"
27-
"github.com/prometheus/client_golang/prometheus"
2825
"github.com/samber/lo"
2926
corev1 "k8s.io/api/core/v1"
3027
"k8s.io/apimachinery/pkg/api/errors"
@@ -60,10 +57,6 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
6057
log.Error(err, "Failed to get Pod")
6158
return ctrl.Result{}, err
6259
}
63-
tfInfo, err := webhookv1.ParseTensorFusionInfo(ctx, r.Client, pod)
64-
if err != nil {
65-
return ctrl.Result{}, fmt.Errorf("parse tf resources: %w", err)
66-
}
6760

6861
// generate tensor fusion connections and apply to cluster
6962
tfConnection := generateTensorFusionConnection(pod)
@@ -76,19 +69,6 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
7669
}
7770
}
7871

79-
// update metrics
80-
for _, container := range tfInfo.ContainerNames {
81-
labels := prometheus.Labels{
82-
"pod": pod.Name,
83-
"namespace": pod.Namespace,
84-
"container": container,
85-
}
86-
metrics.GpuTflopsRequest.With(labels).Set(tfInfo.Profile.Resources.Requests.Tflops.AsApproximateFloat64())
87-
metrics.GpuTflopsLimit.With(labels).Set(tfInfo.Profile.Resources.Limits.Tflops.AsApproximateFloat64())
88-
metrics.VramBytesRequest.With(labels).Set(tfInfo.Profile.Resources.Requests.Vram.AsApproximateFloat64())
89-
metrics.VramBytesLimit.With(labels).Set(tfInfo.Profile.Resources.Limits.Vram.AsApproximateFloat64())
90-
}
91-
9272
return ctrl.Result{}, nil
9373
}
9474

internal/controller/tensorfusionworkload_controller.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ import (
3636
tensorfusionaiv1 "github.com/NexusGPU/tensor-fusion/api/v1"
3737
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
3838
"github.com/NexusGPU/tensor-fusion/internal/constants"
39+
"github.com/NexusGPU/tensor-fusion/internal/metrics"
3940
scheduler "github.com/NexusGPU/tensor-fusion/internal/scheduler"
4041
"github.com/NexusGPU/tensor-fusion/internal/utils"
4142
"github.com/NexusGPU/tensor-fusion/internal/worker"
43+
"github.com/prometheus/client_golang/prometheus"
4244
)
4345

4446
// TensorFusionWorkloadReconciler reconciles a TensorFusionWorkload object
@@ -141,7 +143,7 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
141143

142144
// Calculate how many pods need to be removed
143145
podsToRemove := int(currentReplicas - desiredReplicas)
144-
if err := r.scaleDownWorkers(ctx, podList.Items[:podsToRemove]); err != nil {
146+
if err := r.scaleDownWorkers(ctx, workload, podList.Items[:podsToRemove]); err != nil {
145147
return ctrl.Result{}, err
146148
}
147149
}
@@ -185,7 +187,7 @@ func (r *TensorFusionWorkloadReconciler) tryStartWorker(
185187
}
186188

187189
// scaleDownWorkers handles the scaling down of worker pods
188-
func (r *TensorFusionWorkloadReconciler) scaleDownWorkers(ctx context.Context, pods []corev1.Pod) error {
190+
func (r *TensorFusionWorkloadReconciler) scaleDownWorkers(ctx context.Context, workload *tfv1.TensorFusionWorkload, pods []corev1.Pod) error {
189191
log := log.FromContext(ctx)
190192

191193
for i := range pods {
@@ -196,6 +198,16 @@ func (r *TensorFusionWorkloadReconciler) scaleDownWorkers(ctx context.Context, p
196198
if err := r.deletePod(ctx, podToDelete); err != nil {
197199
return err
198200
}
201+
202+
labels := prometheus.Labels{
203+
"worker": podToDelete.Name,
204+
"namespace": podToDelete.Namespace,
205+
"pool": workload.Spec.PoolName,
206+
}
207+
metrics.GpuTflopsRequest.Delete(labels)
208+
metrics.GpuTflopsLimit.Delete(labels)
209+
metrics.VramBytesRequest.Delete(labels)
210+
metrics.VramBytesLimit.Delete(labels)
199211
}
200212
return nil
201213
}
@@ -271,7 +283,7 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor
271283
return fmt.Errorf("schedule GPU: %w", err)
272284
}
273285

274-
_, err = r.tryStartWorker(ctx, workerGenerator, gpu, workload)
286+
pod, err := r.tryStartWorker(ctx, workerGenerator, gpu, workload)
275287
if err != nil {
276288
// Try to release the GPU resource if pod creation fails
277289
releaseErr := r.Scheduler.Release(ctx, workload.Spec.Resources.Requests, gpu)
@@ -280,6 +292,16 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor
280292
}
281293
return fmt.Errorf("create worker pod: %w", err)
282294
}
295+
296+
labels := prometheus.Labels{
297+
"worker": pod.Name,
298+
"namespace": pod.Namespace,
299+
"pool": workload.Spec.PoolName,
300+
}
301+
metrics.GpuTflopsRequest.With(labels).Set(workload.Spec.Resources.Requests.Tflops.AsApproximateFloat64())
302+
metrics.GpuTflopsLimit.With(labels).Set(workload.Spec.Resources.Limits.Tflops.AsApproximateFloat64())
303+
metrics.VramBytesRequest.With(labels).Set(workload.Spec.Resources.Requests.Vram.AsApproximateFloat64())
304+
metrics.VramBytesLimit.With(labels).Set(workload.Spec.Resources.Limits.Vram.AsApproximateFloat64())
283305
}
284306

285307
return nil

internal/metrics/connection.go renamed to internal/metrics/worker.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,36 @@ import (
66
)
77

88
var (
9+
labels = []string{
10+
"namespace", "worker", "pool",
11+
}
12+
913
GpuTflopsRequest = prometheus.NewGaugeVec(
1014
prometheus.GaugeOpts{
1115
Name: "gpu_tflops_request",
1216
},
13-
[]string{"namespace", "pod", "container"},
17+
labels,
1418
)
1519

1620
GpuTflopsLimit = prometheus.NewGaugeVec(
1721
prometheus.GaugeOpts{
1822
Name: "gpu_tflops_limit",
1923
},
20-
[]string{"namespace", "pod", "container"},
24+
labels,
2125
)
2226

2327
VramBytesRequest = prometheus.NewGaugeVec(
2428
prometheus.GaugeOpts{
2529
Name: "vram_bytes_request",
2630
},
27-
[]string{"namespace", "pod", "container"},
31+
labels,
2832
)
2933

3034
VramBytesLimit = prometheus.NewGaugeVec(
3135
prometheus.GaugeOpts{
3236
Name: "vram_bytes_limit",
3337
},
34-
[]string{"namespace", "pod", "container"},
38+
labels,
3539
)
3640
)
3741

0 commit comments

Comments
 (0)