Skip to content

Commit 4d3bc2d

Browse files
committed
feat(scheduler): implement naive scheduler release with resource tracking
- Update Release method in naive scheduler to properly track and restore resources - Add requeue duration for pending connections - Fix status updates in TensorFusionConnection controller - Update tests to verify partial and full resource releases - Make QosClass optional in TensorFusionConnection status
1 parent 4bc43c4 commit 4d3bc2d

File tree

14 files changed

+259
-57
lines changed

14 files changed

+259
-57
lines changed

api/v1/tensorfusionconnection_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ const (
4848
type TensorFusionConnectionStatus struct {
4949
Phase TensorFusionConnectionPhase `json:"phase"`
5050
ConnectionURL string `json:"connectionURL"`
51-
QosClass string `json:"qosClass"`
51+
QosClass string `json:"qosClass,omitempty"`
5252
Node string `json:"node,omitempty"`
5353
}
5454

cmd/main.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,16 @@ func main() {
152152

153153
ctx := context.Background()
154154
config := config.NewDefaultConfig()
155+
scheduler := scheduler.NewNaiveScheduler()
155156
if err = (&controller.TensorFusionConnectionReconciler{
156-
Client: mgr.GetClient(),
157-
Scheme: mgr.GetScheme(),
157+
Client: mgr.GetClient(),
158+
Scheme: mgr.GetScheme(),
159+
Scheduler: scheduler,
158160
}).SetupWithManager(mgr); err != nil {
159161
setupLog.Error(err, "unable to create controller", "controller", "TensorFusionConnection")
160162
os.Exit(1)
161163
}
162164

163-
scheduler := scheduler.NewNaiveScheduler()
164165
if err = (&controller.GPUNodeReconciler{
165166
Client: mgr.GetClient(),
166167
Scheme: mgr.GetScheme(),

config/crd/bases/tensor-fusion.ai_tensorfusionconnections.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ spec:
100100
required:
101101
- connectionURL
102102
- phase
103-
- qosClass
104103
type: object
105104
type: object
106105
served: true

config/manager/kustomization.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1+
namespace: tensor-fusion
2+
13
resources:
24
- manager.yaml

config/samples/v1_gpunode.yaml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,13 @@ metadata:
55
app.kubernetes.io/name: tensor-fusion-operator
66
app.kubernetes.io/managed-by: kustomize
77
name: gpunode-sample
8-
spec:
9-
# TODO(user): Add fields here
8+
namespace: tensor-fusion
9+
status:
10+
capacity:
11+
tflops: '200'
12+
vram: 100Gi
13+
available:
14+
tflops: '200'
15+
vram: 100Gi
16+
devices: []
17+

config/samples/v1_tensorfusionconnection.yaml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,12 @@ metadata:
55
app.kubernetes.io/name: tensor-fusion-operator
66
app.kubernetes.io/managed-by: kustomize
77
name: tensorfusionconnection-sample
8+
namespace: tensor-fusion
89
spec:
9-
# TODO(user): Add fields here
10+
resources:
11+
limit:
12+
tflops: '100'
13+
vram: 8Gi
14+
request:
15+
tflops: '20'
16+
vram: 9Gi

config/webhook/kustomization.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
namespace: tensor-fusion
2+
13
resources:
24
- manifests.yaml
35
- service.yaml
46

57
configurations:
68
- kustomizeconfig.yaml
9+

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/gin-gonic/gin v1.10.0
88
github.com/onsi/ginkgo/v2 v2.19.0
99
github.com/onsi/gomega v1.33.1
10+
gomodules.xyz/jsonpatch/v2 v2.4.0
1011
k8s.io/api v0.31.0
1112
k8s.io/apimachinery v0.31.0
1213
k8s.io/client-go v0.31.0
@@ -97,11 +98,11 @@ require (
9798
golang.org/x/text v0.16.0 // indirect
9899
golang.org/x/time v0.3.0 // indirect
99100
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
100-
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
101101
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
102102
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
103103
google.golang.org/grpc v1.65.0 // indirect
104104
google.golang.org/protobuf v1.34.2 // indirect
105+
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
105106
gopkg.in/inf.v0 v0.9.1 // indirect
106107
gopkg.in/yaml.v2 v2.4.0 // indirect
107108
gopkg.in/yaml.v3 v3.0.1 // indirect

internal/constants/constants.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package constants
22

3+
import "time"
4+
35
const (
46
// TensorFusionDomain is the domain prefix used for all tensor-fusion.ai related annotations and finalizers
57
TensorFusionDomain = "tensor-fusion.ai"
@@ -12,4 +14,6 @@ const (
1214
EnableContainerAnnotationFormat = TensorFusionDomain + "/enable-%s"
1315
TFLOPSContainerAnnotationFormat = TensorFusionDomain + "/tflops-%s"
1416
VRAMContainerAnnotationFormat = TensorFusionDomain + "/vram-%s"
17+
18+
PendingRequeueDuration = time.Second * 3
1519
)

internal/controller/tensorfusionconnection_controller.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,16 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
9696
// If status is not set or pending, try to schedule
9797
if connection.Status.Phase == "" || connection.Status.Phase == tfv1.TensorFusionConnectionPending {
9898
// Try to get an available node from scheduler
99-
node, err := r.Scheduler.Schedule(connection.Spec.Resources.Request)
99+
var err error
100+
node, err = r.Scheduler.Schedule(connection.Spec.Resources.Request)
100101
if err != nil {
101-
log.Error(err, "Failed to schedule connection")
102+
log.Info(err.Error())
102103
connection.Status.Phase = tfv1.TensorFusionConnectionPending
103104
} else if node != nil {
104105
connection.Status.Phase = tfv1.TensorFusionConnectionRunning
105106
connection.Status.ConnectionURL = worker.GenerateConnectionURL(node, connection)
106-
connection.Status.Node = node.Name // Store the node name for cleanup
107+
// Store the node name for cleanup
108+
connection.Status.Node = node.Name
107109
} else {
108110
connection.Status.Phase = tfv1.TensorFusionConnectionPending
109111
}
@@ -113,6 +115,9 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
113115
return ctrl.Result{}, err
114116
}
115117

118+
if connection.Status.Phase == tfv1.TensorFusionConnectionPending {
119+
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
120+
}
116121
return ctrl.Result{}, nil
117122
}
118123

@@ -124,7 +129,7 @@ func (r *TensorFusionConnectionReconciler) handleDeletion(ctx context.Context, c
124129

125130
// Get the node
126131
node := &tfv1.GPUNode{}
127-
if err := r.Get(ctx, client.ObjectKey{Name: connection.Status.Node}, node); err != nil {
132+
if err := r.Get(ctx, client.ObjectKey{Name: connection.Status.Node, Namespace: connection.Namespace}, node); err != nil {
128133
if errors.IsNotFound(err) {
129134
// Node is already gone, nothing to do
130135
return nil
@@ -133,11 +138,11 @@ func (r *TensorFusionConnectionReconciler) handleDeletion(ctx context.Context, c
133138
}
134139

135140
// Release the resources
136-
if err := r.Scheduler.Release(node); err != nil {
141+
if err := r.Scheduler.Release(connection.Spec.Resources.Request, node); err != nil {
137142
return err
138143
}
139144

140-
return nil
145+
return r.MustUpdateStatus(ctx, connection, node)
141146
}
142147

143148
// Helper functions to handle finalizers
@@ -172,8 +177,7 @@ func (r *TensorFusionConnectionReconciler) MustUpdateStatus(ctx context.Context,
172177
}
173178

174179
// Update the status fields we care about
175-
latestConnection.Status.Phase = connection.Status.Phase
176-
latestConnection.Status.ConnectionURL = connection.Status.ConnectionURL
180+
latestConnection.Status = connection.Status
177181

178182
// Update the connection status
179183
if err := r.Status().Update(ctx, latestConnection); err != nil {

0 commit comments

Comments
 (0)