Skip to content

Commit bbd1835

Browse files
committed
feat: add GPU metrics for TFLOPS and VRAM requests and limits
1 parent ca7673d commit bbd1835

File tree

7 files changed

+143
-56
lines changed

7 files changed

+143
-56
lines changed

cmd/main.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ import (
2626
// to ensure that exec-entrypoint and run can make use of them.
2727
_ "k8s.io/client-go/plugin/pkg/client/auth"
2828

29+
tensorfusionaiv1 "github.com/NexusGPU/tensor-fusion-operator/api/v1"
30+
"github.com/NexusGPU/tensor-fusion-operator/internal/config"
31+
"github.com/NexusGPU/tensor-fusion-operator/internal/controller"
32+
"github.com/NexusGPU/tensor-fusion-operator/internal/scheduler"
33+
"github.com/NexusGPU/tensor-fusion-operator/internal/server"
34+
"github.com/NexusGPU/tensor-fusion-operator/internal/server/router"
35+
webhookcorev1 "github.com/NexusGPU/tensor-fusion-operator/internal/webhook/v1"
36+
"github.com/NexusGPU/tensor-fusion-operator/internal/worker"
2937
"k8s.io/apimachinery/pkg/runtime"
3038
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3139
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
@@ -36,15 +44,6 @@ import (
3644
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
3745
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
3846
"sigs.k8s.io/controller-runtime/pkg/webhook"
39-
40-
tensorfusionaiv1 "github.com/NexusGPU/tensor-fusion-operator/api/v1"
41-
"github.com/NexusGPU/tensor-fusion-operator/internal/config"
42-
"github.com/NexusGPU/tensor-fusion-operator/internal/controller"
43-
"github.com/NexusGPU/tensor-fusion-operator/internal/scheduler"
44-
"github.com/NexusGPU/tensor-fusion-operator/internal/server"
45-
"github.com/NexusGPU/tensor-fusion-operator/internal/server/router"
46-
webhookcorev1 "github.com/NexusGPU/tensor-fusion-operator/internal/webhook/v1"
47-
"github.com/NexusGPU/tensor-fusion-operator/internal/worker"
4847
// +kubebuilder:scaffold:imports
4948
)
5049

@@ -76,7 +75,7 @@ func main() {
7675
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
7776
"Enable leader election for controller manager. "+
7877
"Enabling this will ensure there is only one active controller manager.")
79-
flag.BoolVar(&secureMetrics, "metrics-secure", true,
78+
flag.BoolVar(&secureMetrics, "metrics-secure", false,
8079
"If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.")
8180
flag.BoolVar(&enableHTTP2, "enable-http2", false,
8281
"If set, HTTP/2 will be enabled for the metrics and webhook servers")

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/lithammer/shortuuid/v4 v4.2.0
99
github.com/onsi/ginkgo/v2 v2.22.1
1010
github.com/onsi/gomega v1.36.2
11+
github.com/prometheus/client_golang v1.20.5
1112
github.com/samber/lo v1.47.0
1213
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
1314
gomodules.xyz/jsonpatch/v2 v2.4.0
@@ -72,7 +73,6 @@ require (
7273
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
7374
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
7475
github.com/pkg/errors v0.9.1 // indirect
75-
github.com/prometheus/client_golang v1.20.5 // indirect
7676
github.com/prometheus/client_model v0.6.1 // indirect
7777
github.com/prometheus/common v0.61.0 // indirect
7878
github.com/prometheus/procfs v0.15.1 // indirect

internal/constants/constants.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ const (
1111
Finalizer = Domain + "/" + FinalizerSuffix
1212

1313
// Annotation key constants
14-
EnableContainerAnnotationFormat = Domain + "/enable-%s"
15-
TFLOPSContainerAnnotationFormat = Domain + "/tflops-%s"
16-
VRAMContainerAnnotationFormat = Domain + "/vram-%s"
14+
EnableAnnotationFormat = Domain + "/enable-%s"
15+
TFLOPSRequestAnnotationFormat = Domain + "/tflops-request-%s"
16+
VRAMRequestAnnotationFormat = Domain + "/vram-request-%s"
17+
TFLOPSLimitAnnotationFormat = Domain + "/tflops-limit-%s"
18+
VRAMLimitAnnotationFormat = Domain + "/vram-limit-%s"
1719

1820
PendingRequeueDuration = time.Second * 3
1921

internal/controller/pod_controller.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import (
2222

2323
tfv1 "github.com/NexusGPU/tensor-fusion-operator/api/v1"
2424
"github.com/NexusGPU/tensor-fusion-operator/internal/constants"
25+
"github.com/NexusGPU/tensor-fusion-operator/internal/metrics"
2526
webhookv1 "github.com/NexusGPU/tensor-fusion-operator/internal/webhook/v1"
27+
"github.com/prometheus/client_golang/prometheus"
2628
corev1 "k8s.io/api/core/v1"
2729
"k8s.io/apimachinery/pkg/api/errors"
2830
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -55,13 +57,13 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
5557
log.Error(err, "Failed to get Pod")
5658
return ctrl.Result{}, err
5759
}
58-
reqs := webhookv1.ParseTFReq(pod)
59-
if len(reqs) == 0 {
60+
resources := webhookv1.ParseTFResources(pod)
61+
if len(resources) == 0 {
6062
return ctrl.Result{}, nil
6163
}
6264

6365
// generate tensor fusion connections and apply to cluster
64-
tfConnections := GenerateTensorFusionConnection(pod, reqs)
66+
tfConnections := GenerateTensorFusionConnection(pod, resources)
6567

6668
for _, tfConnection := range tfConnections {
6769
existConn := &tfv1.TensorFusionConnection{}
@@ -73,10 +75,24 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
7375
}
7476
}
7577
}
78+
79+
// update metrics
80+
for _, res := range resources {
81+
labels := prometheus.Labels{
82+
"pod": pod.Name,
83+
"namespace": pod.Namespace,
84+
"container": res.ContainerName,
85+
}
86+
metrics.GpuTflopsRequest.With(labels).Set(res.TflopsRequest.AsApproximateFloat64())
87+
metrics.GpuTflopsLimit.With(labels).Set(res.TflopsLimit.AsApproximateFloat64())
88+
metrics.VramBytesRequest.With(labels).Set(res.VramRequest.AsApproximateFloat64())
89+
metrics.VramBytesLimit.With(labels).Set(res.VramLimit.AsApproximateFloat64())
90+
}
91+
7692
return ctrl.Result{}, nil
7793
}
7894

79-
func GenerateTensorFusionConnection(pod *corev1.Pod, tfReq []webhookv1.TFReq) []*tfv1.TensorFusionConnection {
95+
func GenerateTensorFusionConnection(pod *corev1.Pod, tfReq []webhookv1.TFResource) []*tfv1.TensorFusionConnection {
8096
connections := make([]*tfv1.TensorFusionConnection, 0, len(tfReq))
8197

8298
for _, req := range tfReq {
@@ -96,12 +112,12 @@ func GenerateTensorFusionConnection(pod *corev1.Pod, tfReq []webhookv1.TFReq) []
96112
Spec: tfv1.TensorFusionConnectionSpec{
97113
Resources: tfv1.Resources{
98114
Requests: tfv1.Resource{
99-
Tflops: req.Tflops,
100-
Vram: req.Vram,
115+
Tflops: req.TflopsRequest,
116+
Vram: req.VramRequest,
101117
},
102118
Limits: tfv1.Resource{
103-
Tflops: req.Tflops,
104-
Vram: req.Vram,
119+
Tflops: req.TflopsLimit,
120+
Vram: req.VramLimit,
105121
},
106122
},
107123
},

internal/metrics/connection.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package metrics
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"sigs.k8s.io/controller-runtime/pkg/metrics"
6+
)
7+
8+
var (
9+
GpuTflopsRequest = prometheus.NewGaugeVec(
10+
prometheus.GaugeOpts{
11+
Name: "gpu_tflops_request",
12+
},
13+
[]string{"namespace", "pod", "container"},
14+
)
15+
16+
GpuTflopsLimit = prometheus.NewGaugeVec(
17+
prometheus.GaugeOpts{
18+
Name: "gpu_tflops_limit",
19+
},
20+
[]string{"namespace", "pod", "container"},
21+
)
22+
23+
VramBytesRequest = prometheus.NewGaugeVec(
24+
prometheus.GaugeOpts{
25+
Name: "vram_bytes_request",
26+
},
27+
[]string{"namespace", "pod", "container"},
28+
)
29+
30+
VramBytesLimit = prometheus.NewGaugeVec(
31+
prometheus.GaugeOpts{
32+
Name: "vram_bytes_limit",
33+
},
34+
[]string{"namespace", "pod", "container"},
35+
)
36+
)
37+
38+
func init() {
39+
metrics.Registry.MustRegister(GpuTflopsRequest, GpuTflopsLimit, VramBytesRequest, VramBytesLimit)
40+
}

internal/webhook/v1/pod_webhook.go

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,13 @@ func (m *TensorFusionPodMutator) Handle(ctx context.Context, req admission.Reque
6969
log := log.FromContext(ctx)
7070
log.Info("Mutating pod", "generateName", pod.GenerateName, "namespace", pod.Namespace)
7171

72-
reqs := ParseTFReq(pod)
73-
if len(reqs) == 0 {
72+
resources := ParseTFResources(pod)
73+
if len(resources) == 0 {
7474
return admission.Allowed("no tensor fusion requirements found")
7575
}
7676

7777
// 1. Inject initContainer and env variables
78-
patches, err := m.patchTFClient(pod, reqs)
78+
patches, err := m.patchTFClient(pod, resources)
7979
if err != nil {
8080
return admission.Errored(http.StatusInternalServerError, err)
8181
}
@@ -89,36 +89,43 @@ func (m *TensorFusionPodMutator) InjectDecoder(d admission.Decoder) error {
8989
return nil
9090
}
9191

92-
type TFReq struct {
92+
type TFResource struct {
9393
ContainerName string
9494
ConnectionName string
9595
ConnectionNamespace string
96-
Tflops resource.Quantity
97-
Vram resource.Quantity
96+
TflopsRequest resource.Quantity
97+
VramRequest resource.Quantity
98+
TflopsLimit resource.Quantity
99+
VramLimit resource.Quantity
98100
}
99101

100-
func ParseTFReq(pod *corev1.Pod) []TFReq {
102+
func ParseTFResources(pod *corev1.Pod) []TFResource {
101103
if pod.Annotations == nil {
102104
return nil
103105
}
104106

105-
reqs := make([]TFReq, 0, len(pod.Spec.Containers))
107+
reqs := make([]TFResource, 0, len(pod.Spec.Containers))
106108

107109
for _, container := range pod.Spec.Containers {
108110
containerName := container.Name
109111

110112
// Check if TF requirements exist for this container
111-
tflopsKey := fmt.Sprintf(constants.TFLOPSContainerAnnotationFormat, containerName)
112-
vramKey := fmt.Sprintf(constants.VRAMContainerAnnotationFormat, containerName)
113+
tflopsReqKey := fmt.Sprintf(constants.TFLOPSRequestAnnotationFormat, containerName)
114+
vramReqKey := fmt.Sprintf(constants.VRAMRequestAnnotationFormat, containerName)
115+
tflopsLimitKey := fmt.Sprintf(constants.TFLOPSLimitAnnotationFormat, containerName)
116+
vramLimitKey := fmt.Sprintf(constants.VRAMLimitAnnotationFormat, containerName)
113117

114-
tflopsStr, hasTflops := pod.Annotations[tflopsKey]
115-
vramStr, hasVram := pod.Annotations[vramKey]
118+
tflopsReqStr, hasTflopsReq := pod.Annotations[tflopsReqKey]
119+
vramReqStr, hasVramReq := pod.Annotations[vramReqKey]
116120

117-
if !hasTflops && !hasVram {
121+
tflopsLimitStr, hasTflopsLimit := pod.Annotations[tflopsLimitKey]
122+
vramLimitStr, hasVramLimit := pod.Annotations[vramLimitKey]
123+
124+
if !hasTflopsReq && !hasVramReq && !hasTflopsLimit && !hasVramLimit {
118125
continue
119126
}
120127

121-
req := TFReq{
128+
req := TFResource{
122129
ContainerName: containerName,
123130
}
124131
connectionNameEnv, ok := lo.Find(container.Env, func(e corev1.EnvVar) bool {
@@ -133,19 +140,35 @@ func ParseTFReq(pod *corev1.Pod) []TFReq {
133140
if ok {
134141
req.ConnectionNamespace = connectionNamespaceEnv.Value
135142
}
136-
// Parse TFLOPS requirement
137-
if hasTflops {
138-
tflops, err := resource.ParseQuantity(tflopsStr)
143+
// Parse TFLOPS request
144+
if hasTflopsReq {
145+
tflops, err := resource.ParseQuantity(tflopsReqStr)
146+
if err == nil {
147+
req.TflopsRequest = tflops
148+
}
149+
}
150+
151+
// Parse VRAM request
152+
if hasVramReq {
153+
vram, err := resource.ParseQuantity(vramReqStr)
154+
if err == nil {
155+
req.VramRequest = vram
156+
}
157+
}
158+
159+
// Parse TFLOPS limit
160+
if hasTflopsReq {
161+
tflops, err := resource.ParseQuantity(tflopsLimitStr)
139162
if err == nil {
140-
req.Tflops = tflops
163+
req.TflopsLimit = tflops
141164
}
142165
}
143166

144-
// Parse VRAM requirement
145-
if hasVram {
146-
vram, err := resource.ParseQuantity(vramStr)
167+
// Parse VRAM limit
168+
if hasVramReq {
169+
vram, err := resource.ParseQuantity(vramLimitStr)
147170
if err == nil {
148-
req.Vram = vram
171+
req.VramLimit = vram
149172
}
150173
}
151174

@@ -155,7 +178,7 @@ func ParseTFReq(pod *corev1.Pod) []TFReq {
155178
return reqs
156179
}
157180

158-
func (m *TensorFusionPodMutator) patchTFClient(pod *corev1.Pod, tfReq []TFReq) ([]jsonpatch.JsonPatchOperation, error) {
181+
func (m *TensorFusionPodMutator) patchTFClient(pod *corev1.Pod, tfReq []TFResource) ([]jsonpatch.JsonPatchOperation, error) {
159182
// Convert the current pod to JSON
160183
currentBytes, err := json.Marshal(pod)
161184
if err != nil {

internal/webhook/v1/pod_webhook_test.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package v1
1919
import (
2020
"context"
2121
"encoding/json"
22+
"fmt"
2223
"net/http"
2324

2425
tfv1 "github.com/NexusGPU/tensor-fusion-operator/api/v1"
@@ -63,14 +64,16 @@ var _ = Describe("TensorFusionPodMutator", func() {
6364
})
6465

6566
Context("Handle", func() {
66-
It("should successfully mutate a pod with TF requirements", func() {
67+
It("should successfully mutate a pod with TF resources", func() {
6768
pod := &corev1.Pod{
6869
ObjectMeta: metav1.ObjectMeta{
6970
Name: "test-pod",
7071
Namespace: "default",
7172
Annotations: map[string]string{
72-
constants.Domain + "/tflops-main": "100",
73-
constants.Domain + "/vram-main": "16Gi",
73+
fmt.Sprintf(constants.TFLOPSRequestAnnotationFormat, "main"): "10",
74+
fmt.Sprintf(constants.VRAMRequestAnnotationFormat, "main"): "1Gi",
75+
fmt.Sprintf(constants.TFLOPSLimitAnnotationFormat, "main"): "100",
76+
fmt.Sprintf(constants.VRAMLimitAnnotationFormat, "main"): "16Gi",
7477
},
7578
},
7679
Spec: corev1.PodSpec{
@@ -165,8 +168,10 @@ var _ = Describe("TensorFusionPodMutator", func() {
165168
pod := &corev1.Pod{
166169
ObjectMeta: metav1.ObjectMeta{
167170
Annotations: map[string]string{
168-
constants.Domain + "/tflops-test-container": "100",
169-
constants.Domain + "/vram-test-container": "16Gi",
171+
fmt.Sprintf(constants.TFLOPSRequestAnnotationFormat, "test-container"): "10",
172+
fmt.Sprintf(constants.VRAMRequestAnnotationFormat, "test-container"): "1Gi",
173+
fmt.Sprintf(constants.TFLOPSLimitAnnotationFormat, "test-container"): "100",
174+
fmt.Sprintf(constants.VRAMLimitAnnotationFormat, "test-container"): "16Gi",
170175
},
171176
},
172177
Spec: corev1.PodSpec{
@@ -188,11 +193,13 @@ var _ = Describe("TensorFusionPodMutator", func() {
188193
},
189194
}
190195

191-
reqs := ParseTFReq(pod)
192-
Expect(reqs).To(HaveLen(1))
193-
Expect(reqs[0].ContainerName).To(Equal("test-container"))
194-
Expect(reqs[0].Tflops.String()).To(Equal("100"))
195-
Expect(reqs[0].Vram.String()).To(Equal("16Gi"))
196+
resources := ParseTFResources(pod)
197+
Expect(resources).To(HaveLen(1))
198+
Expect(resources[0].ContainerName).To(Equal("test-container"))
199+
Expect(resources[0].TflopsRequest.String()).To(Equal("10"))
200+
Expect(resources[0].VramRequest.String()).To(Equal("1Gi"))
201+
Expect(resources[0].TflopsLimit.String()).To(Equal("100"))
202+
Expect(resources[0].VramLimit.String()).To(Equal("16Gi"))
196203
})
197204
})
198205

@@ -207,7 +214,7 @@ var _ = Describe("TensorFusionPodMutator", func() {
207214
},
208215
},
209216
}
210-
patch, err := mutator.patchTFClient(pod, []TFReq{{ContainerName: "test-container", Tflops: resource.MustParse("100"), Vram: resource.MustParse("16Gi")}})
217+
patch, err := mutator.patchTFClient(pod, []TFResource{{ContainerName: "test-container", TflopsRequest: resource.MustParse("100"), VramRequest: resource.MustParse("16Gi")}})
211218
Expect(err).NotTo(HaveOccurred())
212219
Expect(patch).NotTo(BeEmpty())
213220
Expect(patch).To(HaveLen(2))

0 commit comments

Comments
 (0)