Skip to content

Commit d4a8219

Browse files
authored
fix: token review permission issue for remote worker, pass annotation to workload, fix allocator mem state bug (#303)
* fix: token review permission issue for remote worker, pass annotation to workload, fix allocator mem state bug * fix: bump helm version
1 parent 59495e5 commit d4a8219

File tree

14 files changed

+155
-85
lines changed

14 files changed

+155
-85
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@
150150
"tflops",
151151
"timberio",
152152
"Tmpl",
153+
"tokenreviews",
153154
"Tolerations",
154155
"utilerrors",
155156
"utilruntime",

charts/tensor-fusion/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type: application
1515
# This is the chart version. This version number should be incremented each time you make changes
1616
# to the chart and its templates, including the app version.
1717
# Versions are expected to follow Semantic Versioning (https://semver.org/)
18-
version: 1.5.2
18+
version: 1.5.3
1919

2020
# This is the version number of the application being deployed. This version number should be
2121
# incremented each time you make changes to the application. Versions are not expected to

charts/tensor-fusion/templates/rbac-hypervisor.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ rules:
2626
- watch
2727
- update
2828
- patch
29+
- apiGroups:
30+
- authentication.k8s.io
31+
resources:
32+
- tokenreviews
33+
verbs:
34+
- create
2935
---
3036
apiVersion: rbac.authorization.k8s.io/v1
3137
kind: ClusterRoleBinding

charts/tensor-fusion/templates/rbac.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,12 @@ rules:
182182
- get
183183
- list
184184
- watch
185-
185+
- apiGroups:
186+
- authentication.k8s.io
187+
resources:
188+
- tokenreviews
189+
verbs:
190+
- create
186191
---
187192
apiVersion: rbac.authorization.k8s.io/v1
188193
kind: ClusterRoleBinding

cmd/nodediscovery/main.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,11 @@ func main() {
161161
ctrl.Log.Info("found GPU info from config", "deviceName", deviceName, "FP16 TFlops", tflops, "uuid", uuid)
162162
}
163163

164-
gpu := createOrUpdateTensorFusionGPU(k8sClient, ctx, k8sNodeName, gpunode, uuid, deviceName, memInfo, tflops)
165-
164+
gpu, err := createOrUpdateTensorFusionGPU(k8sClient, ctx, k8sNodeName, gpunode, uuid, deviceName, memInfo, tflops)
165+
if err != nil {
166+
ctrl.Log.Error(err, "failed to create or update GPU", "uuid", uuid)
167+
os.Exit(1)
168+
}
166169
totalTFlops.Add(gpu.Status.Capacity.Tflops)
167170
totalVRAM.Add(gpu.Status.Capacity.Vram)
168171
availableTFlops.Add(gpu.Status.Available.Tflops)
@@ -194,13 +197,17 @@ func patchGPUNodeStatus(k8sClient client.Client, ctx context.Context,
194197

195198
func createOrUpdateTensorFusionGPU(
196199
k8sClient client.Client, ctx context.Context, k8sNodeName string, gpunode *tfv1.GPUNode,
197-
uuid string, deviceName string, memInfo nvml.Memory_v2, tflops resource.Quantity) *tfv1.GPU {
200+
uuid string, deviceName string, memInfo nvml.Memory_v2, tflops resource.Quantity) (*tfv1.GPU, error) {
198201
gpu := &tfv1.GPU{
199202
ObjectMeta: metav1.ObjectMeta{
200203
Name: uuid,
201204
},
202205
}
203206

207+
if len(gpunode.OwnerReferences) == 0 {
208+
return nil, fmt.Errorf("GPUNode has no owner references of GPU pool")
209+
}
210+
204211
err := retry.OnError(wait.Backoff{
205212
Steps: 10,
206213
Duration: time.Second,
@@ -213,6 +220,7 @@ func createOrUpdateTensorFusionGPU(
213220
// Set metadata fields
214221
gpu.Labels = map[string]string{
215222
constants.LabelKeyOwner: gpunode.Name,
223+
constants.GpuPoolKey: gpunode.OwnerReferences[0].Name,
216224
}
217225
gpu.Annotations = map[string]string{
218226
constants.LastSyncTimeAnnotationKey: time.Now().Format(time.RFC3339),
@@ -240,7 +248,7 @@ func createOrUpdateTensorFusionGPU(
240248
})
241249
if err != nil {
242250
ctrl.Log.Error(err, "failed to create or update GPU after retries", "gpu", gpu)
243-
os.Exit(1)
251+
return nil, err
244252
}
245253

246254
err = retry.OnError(retry.DefaultBackoff, func(err error) bool {
@@ -272,10 +280,10 @@ func createOrUpdateTensorFusionGPU(
272280
})
273281
if err != nil {
274282
ctrl.Log.Error(err, "failed to update status of GPU after retries", "gpu", gpu)
275-
os.Exit(1)
283+
return nil, err
276284
}
277285

278-
return gpu
286+
return gpu, nil
279287
}
280288

281289
func kubeClient() (client.Client, error) {

cmd/nodediscovery/main_test.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ func TestCreateOrUpdateTensorFusionGPU(t *testing.T) {
3030
gpuNode := &tfv1.GPUNode{
3131
ObjectMeta: metav1.ObjectMeta{
3232
Name: gpuNodeName,
33+
OwnerReferences: []metav1.OwnerReference{
34+
{
35+
Name: "test-gpu-pool",
36+
},
37+
},
3338
},
3439
}
3540

@@ -38,7 +43,8 @@ func TestCreateOrUpdateTensorFusionGPU(t *testing.T) {
3843

3944
k8sClient := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&tfv1.GPU{}).Build()
4045

41-
gpu := createOrUpdateTensorFusionGPU(k8sClient, ctx, k8sNodeName, gpuNode, uuid, deviceName, memInfo, tflops)
46+
gpu, err := createOrUpdateTensorFusionGPU(k8sClient, ctx, k8sNodeName, gpuNode, uuid, deviceName, memInfo, tflops)
47+
assert.NoError(t, err)
4248

4349
// Assertions
4450
assert.NotNil(t, gpu, "GPU object should not be nil")
@@ -51,10 +57,13 @@ func TestCreateOrUpdateTensorFusionGPU(t *testing.T) {
5157
gpu.Status.NodeSelector, "Node selector should match")
5258

5359
// Verify labels and annotations
54-
assert.Equal(t, map[string]string{constants.LabelKeyOwner: gpuNodeName}, gpu.Labels, "GPU labels should match")
60+
assert.Equal(t, map[string]string{
61+
constants.LabelKeyOwner: gpuNodeName,
62+
constants.GpuPoolKey: "test-gpu-pool",
63+
}, gpu.Labels, "GPU labels should match")
5564
assert.Contains(t, gpu.Annotations, constants.LastSyncTimeAnnotationKey,
5665
"GPU annotations should contain last report time")
57-
_, err := time.Parse(time.RFC3339, gpu.Annotations[constants.LastSyncTimeAnnotationKey])
66+
_, err = time.Parse(time.RFC3339, gpu.Annotations[constants.LastSyncTimeAnnotationKey])
5867
assert.NoError(t, err, "Last report time annotation should be a valid RFC3339 timestamp")
5968

6069
// Verify the Available field does not change after the update
@@ -64,7 +73,8 @@ func TestCreateOrUpdateTensorFusionGPU(t *testing.T) {
6473
assert.NoError(t, err)
6574

6675
tflops.Add(resource.MustParse("100"))
67-
updatedGpu := createOrUpdateTensorFusionGPU(k8sClient, ctx, k8sNodeName, gpuNode, uuid, deviceName, memInfo, tflops)
76+
updatedGpu, err := createOrUpdateTensorFusionGPU(k8sClient, ctx, k8sNodeName, gpuNode, uuid, deviceName, memInfo, tflops)
77+
assert.NoError(t, err)
6878
assert.NotEqual(t, updatedGpu.Status.Capacity, gpu.Status.Capacity, "GPU capacity should not match")
6979
assert.Equal(t, updatedGpu.Status.Available.Tflops, gpu.Status.Available.Tflops, "GPU TFlops should match")
7080
assert.Equal(t, updatedGpu.Status.Available.Vram, gpu.Status.Available.Vram, "GPU VRAM should match")
@@ -84,6 +94,11 @@ func TestGPUControllerReference(t *testing.T) {
8494
ObjectMeta: metav1.ObjectMeta{
8595
Name: gpuNodeName,
8696
UID: "mock-uid",
97+
OwnerReferences: []metav1.OwnerReference{
98+
{
99+
Name: "test-gpu-pool",
100+
},
101+
},
87102
},
88103
}
89104

@@ -92,17 +107,24 @@ func TestGPUControllerReference(t *testing.T) {
92107

93108
k8sClient := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&tfv1.GPU{}).Build()
94109

95-
gpu := createOrUpdateTensorFusionGPU(k8sClient, ctx, k8sNodeName, gpuNode, uuid, deviceName, memInfo, tflops)
110+
gpu, err := createOrUpdateTensorFusionGPU(k8sClient, ctx, k8sNodeName, gpuNode, uuid, deviceName, memInfo, tflops)
111+
assert.NoError(t, err)
96112
assert.True(t, metav1.IsControlledBy(gpu, gpuNode))
97113

98114
newGpuNode := &tfv1.GPUNode{
99115
ObjectMeta: metav1.ObjectMeta{
100116
Name: "new-test-gpu-node",
101117
UID: "new-mock-uid",
118+
OwnerReferences: []metav1.OwnerReference{
119+
{
120+
Name: "new-test-gpu-pool",
121+
},
122+
},
102123
},
103124
}
104125

105-
gpu = createOrUpdateTensorFusionGPU(k8sClient, ctx, k8sNodeName, newGpuNode, uuid, deviceName, memInfo, tflops)
126+
gpu, err = createOrUpdateTensorFusionGPU(k8sClient, ctx, k8sNodeName, newGpuNode, uuid, deviceName, memInfo, tflops)
127+
assert.NoError(t, err)
106128
assert.NotNil(t, gpu.OwnerReferences[0].Kind)
107129
assert.NotNil(t, gpu.OwnerReferences[0].APIVersion)
108130
assert.True(t, metav1.IsControlledBy(gpu, newGpuNode))
@@ -127,6 +149,11 @@ func TestPatchGPUNodeStatus(t *testing.T) {
127149
ObjectMeta: metav1.ObjectMeta{
128150
Name: "test-gpu-node",
129151
Namespace: "default",
152+
OwnerReferences: []metav1.OwnerReference{
153+
{
154+
Name: "test-gpu-pool",
155+
},
156+
},
130157
},
131158
Status: tfv1.GPUNodeStatus{
132159
Phase: "", // Empty phase should be set to pending
@@ -161,6 +188,11 @@ func TestPatchGPUNodeStatus(t *testing.T) {
161188
ObjectMeta: metav1.ObjectMeta{
162189
Name: "test-gpu-node-running",
163190
Namespace: "default",
191+
OwnerReferences: []metav1.OwnerReference{
192+
{
193+
Name: "test-gpu-pool",
194+
},
195+
},
164196
},
165197
Status: tfv1.GPUNodeStatus{
166198
Phase: tfv1.TensorFusionGPUNodePhaseRunning,
@@ -194,6 +226,11 @@ func TestPatchGPUNodeStatus(t *testing.T) {
194226
ObjectMeta: metav1.ObjectMeta{
195227
Name: "test-gpu-node-zero",
196228
Namespace: "default",
229+
OwnerReferences: []metav1.OwnerReference{
230+
{
231+
Name: "test-gpu-pool",
232+
},
233+
},
197234
},
198235
Status: tfv1.GPUNodeStatus{
199236
Phase: "",
@@ -279,6 +316,11 @@ func TestPatchGPUNodeStatus_ErrorScenarios(t *testing.T) {
279316
ObjectMeta: metav1.ObjectMeta{
280317
Name: "nonexistent-gpu-node",
281318
Namespace: "default",
319+
OwnerReferences: []metav1.OwnerReference{
320+
{
321+
Name: "test-gpu-pool",
322+
},
323+
},
282324
},
283325
}
284326
},
@@ -315,6 +357,11 @@ func TestPatchGPUNodeStatus_Integration(t *testing.T) {
315357
ObjectMeta: metav1.ObjectMeta{
316358
Name: "integration-test-node",
317359
Namespace: "default",
360+
OwnerReferences: []metav1.OwnerReference{
361+
{
362+
Name: "test-gpu-pool",
363+
},
364+
},
318365
},
319366
Status: tfv1.GPUNodeStatus{
320367
Phase: "",

config/rbac/role.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ rules:
7676
- patch
7777
- update
7878
- watch
79+
- apiGroups:
80+
- authentication.k8s.io
81+
resources:
82+
- tokenreviews
83+
verbs:
84+
- create
7985
- apiGroups:
8086
- batch
8187
resources:

internal/constants/constants.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,18 @@ const (
7171
SetPendingOwnedWorkloadAnnotation = Domain + "/pending-owned-workload"
7272
PricingAnnotation = Domain + "/hourly-pricing"
7373

74+
WorkloadModeAnnotation = Domain + "/workload-mode"
75+
WorkloadModeDynamic = "dynamic"
76+
WorkloadModeFixed = "fixed"
77+
7478
// Annotations for killer switch: disable features
7579
// ['gpu-opt', 'mem-manager', 'gpu-limiter']
7680
DisableFeaturesAnnotation = Domain + "/disable-features"
7781
BuiltInFeaturesGpuOpt = "gpu-opt"
7882
BuiltInFeaturesGpuLimiter = "gpu-limiter"
7983
BuiltInFeaturesMemManager = "mem-manager"
84+
// For debug purpose only of Remote vGPU, disable start worker to manual start with ad-hoc command inside Pod
85+
BuiltInFeatureStartWorker = "start-worker"
8086

8187
GenHostPortLabel = Domain + "/host-port"
8288
GenHostPortLabelValue = "auto"

internal/controller/gpu_controller.go

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,10 @@ package controller
1919
import (
2020
"context"
2121
"fmt"
22-
"strings"
2322

2423
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
2524
"github.com/NexusGPU/tensor-fusion/internal/constants"
26-
"github.com/samber/lo"
2725
"k8s.io/apimachinery/pkg/api/errors"
28-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2926
"k8s.io/apimachinery/pkg/runtime"
3027
ctrl "sigs.k8s.io/controller-runtime"
3128
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -52,42 +49,12 @@ func (r *GPUReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
5249
return ctrl.Result{}, err
5350
}
5451

55-
kgvs, _, err := r.Scheme.ObjectKinds(&tfv1.GPUNode{})
56-
if err != nil {
57-
return ctrl.Result{}, fmt.Errorf("get object kinds for GPUNode: %w", err)
58-
}
59-
60-
owner, ok := lo.Find(gpu.OwnerReferences, func(or metav1.OwnerReference) bool {
61-
for _, kvg := range kgvs {
62-
if kvg.Kind == or.Kind && fmt.Sprintf("%s/%s", kvg.Group, kvg.Version) == or.APIVersion {
63-
return true
64-
}
65-
}
66-
return false
67-
})
68-
69-
if !ok {
70-
return ctrl.Result{}, fmt.Errorf("owner node of gpu(%s) not found", gpu.Name)
71-
}
72-
7352
gpunode := &tfv1.GPUNode{}
74-
if err := r.Get(ctx, client.ObjectKey{Name: owner.Name}, gpunode); err != nil {
75-
return ctrl.Result{}, fmt.Errorf("get node %s: %w", owner.Name, err)
76-
}
77-
78-
var poolName string
79-
for labelKey := range gpunode.Labels {
80-
after, ok := strings.CutPrefix(labelKey, constants.GPUNodePoolIdentifierLabelPrefix)
81-
if ok {
82-
poolName = after
83-
break
84-
}
85-
}
86-
87-
if poolName == "" {
88-
return ctrl.Result{}, fmt.Errorf("node %s is not assigned to any pool", gpunode.Name)
53+
if err := r.Get(ctx, client.ObjectKey{Name: gpu.Labels[constants.LabelKeyOwner]}, gpunode); err != nil {
54+
return ctrl.Result{}, fmt.Errorf("can not get node %s: %w", gpu.Labels[constants.LabelKeyOwner], err)
8955
}
9056

57+
// Fix old version issue when discovery job not set UsedBy field
9158
if gpu.Status.UsedBy == "" && gpu.Status.UUID != "" {
9259
patch := client.MergeFrom(gpu.DeepCopy())
9360
gpu.Status.UsedBy = tfv1.UsedByTensorFusion
@@ -97,20 +64,6 @@ func (r *GPUReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
9764
return ctrl.Result{}, nil
9865
}
9966

100-
// No need to calculate patch since GPU's owner pool not changed
101-
if gpu.Labels != nil && gpu.Labels[constants.GpuPoolKey] == poolName {
102-
return ctrl.Result{}, nil
103-
}
104-
105-
patch := client.MergeFrom(gpu.DeepCopy())
106-
if gpu.Labels == nil {
107-
gpu.Labels = make(map[string]string)
108-
}
109-
gpu.Labels[constants.GpuPoolKey] = poolName
110-
if err := r.Patch(ctx, gpu, patch); err != nil {
111-
return ctrl.Result{}, fmt.Errorf("patch gpu %s: %w", gpu.Name, err)
112-
}
113-
11467
return ctrl.Result{}, nil
11568
}
11669

internal/controller/tensorfusionconnection_controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type TensorFusionConnectionReconciler struct {
5454
// +kubebuilder:rbac:groups=tensor-fusion.ai,resources=tensorfusionconnections,verbs=get;list;watch;create;update;patch;delete
5555
// +kubebuilder:rbac:groups=tensor-fusion.ai,resources=tensorfusionconnections/status,verbs=get;update;patch
5656
// +kubebuilder:rbac:groups=tensor-fusion.ai,resources=tensorfusionconnections/finalizers,verbs=update
57+
// +kubebuilder:rbac:groups=authentication.k8s.io,resources=tokenreviews,verbs=create
5758

5859
// Add and monitor GPU worker Pod for a TensorFusionConnection
5960
func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
@@ -70,14 +71,14 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
7071
// Object not found, could have been deleted after reconcile request, return without error
7172
return ctrl.Result{}, nil
7273
}
73-
log.Error(err, "Failed to get TensorFusionConnection")
74+
log.Error(err, "Failed to get TensorFusionConnection", "name", req.Name)
7475
return ctrl.Result{}, err
7576
}
7677

7778
workloadName := connection.Labels[constants.WorkloadKey]
7879
workload := &tfv1.TensorFusionWorkload{}
7980
if err := r.Get(ctx, client.ObjectKey{Name: workloadName, Namespace: connection.Namespace}, workload); err != nil {
80-
return ctrl.Result{}, fmt.Errorf("can not found TensorFusionWorkload: %w", err)
81+
return ctrl.Result{}, fmt.Errorf("can not found TensorFusionWorkload for connection %s: %w", connection.Name, err)
8182
}
8283

8384
if workload.Spec.IsDynamicReplica() {

0 commit comments

Comments
 (0)