@@ -18,9 +18,11 @@ package controller
1818
1919import (
2020 "context"
21+ "fmt"
2122
2223 "k8s.io/apimachinery/pkg/api/errors"
2324 "k8s.io/apimachinery/pkg/runtime"
25+ "k8s.io/apimachinery/pkg/types"
2426 "k8s.io/client-go/util/retry"
2527 ctrl "sigs.k8s.io/controller-runtime"
2628 "sigs.k8s.io/controller-runtime/pkg/client"
@@ -30,13 +32,15 @@ import (
3032 "github.com/NexusGPU/tensor-fusion-operator/internal/constants"
3133 scheduler "github.com/NexusGPU/tensor-fusion-operator/internal/scheduler"
3234 "github.com/NexusGPU/tensor-fusion-operator/internal/worker"
35+ corev1 "k8s.io/api/core/v1"
3336)
3437
3538// TensorFusionConnectionReconciler reconciles a TensorFusionConnection object
3639type TensorFusionConnectionReconciler struct {
3740 client.Client
38- Scheme * runtime.Scheme
39- Scheduler scheduler.Scheduler
41+ Scheme * runtime.Scheme
42+ Scheduler scheduler.Scheduler
43+ WorkerGenerator * worker.WorkerGenerator
4044}
4145
4246var (
@@ -102,25 +106,59 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
102106 log .Info (err .Error ())
103107 connection .Status .Phase = tfv1 .TensorFusionConnectionPending
104108 } else if gpu != nil {
105- connection .Status .Phase = tfv1 .TensorFusionConnectionRunning
106- connection .Status .ConnectionURL = worker .GenerateConnectionURL (gpu , connection )
109+ connection .Status .Phase = tfv1 .TensorFusionConnectionStarting
107110 // Store the gpu name for cleanup
108111 connection .Status .GPU = gpu .Name
109112 } else {
113+ // Init status
110114 connection .Status .Phase = tfv1 .TensorFusionConnectionPending
111115 }
112116 }
113117
118+ // Start worker job
119+ phase , err := r .TryStartWorker (ctx , connection , types.NamespacedName {Name : connection .Name , Namespace : connection .Namespace })
120+ if err != nil {
121+ log .Error (err , "Failed to start worker pod" )
122+ return ctrl.Result {}, err
123+ }
124+
125+ if phase == corev1 .PodRunning {
126+ connection .Status .Phase = tfv1 .TensorFusionConnectionRunning
127+ connection .Status .ConnectionURL = r .WorkerGenerator .GenerateConnectionURL (gpu , connection )
128+ }
129+ // TODO: Handle PodFailure
130+
114131 if err := r .MustUpdateStatus (ctx , connection , gpu ); err != nil {
115132 return ctrl.Result {}, err
116133 }
117134
118135 if connection .Status .Phase == tfv1 .TensorFusionConnectionPending {
136+ // requeue
119137 return ctrl.Result {RequeueAfter : constants .PendingRequeueDuration }, nil
120138 }
139+
121140 return ctrl.Result {}, nil
122141}
123142
143+ func (r * TensorFusionConnectionReconciler ) TryStartWorker (ctx context.Context , connection * tfv1.TensorFusionConnection , namespacedName types.NamespacedName ) (corev1.PodPhase , error ) {
144+ // Try to get the Pod
145+ pod := & corev1.Pod {}
146+ if err := r .Get (ctx , namespacedName , pod ); err != nil {
147+ if errors .IsNotFound (err ) {
148+ // Pod doesn't exist, create a new one
149+ pod = r .WorkerGenerator .GenerateWorkerPod (connection , namespacedName )
150+ if err := ctrl .SetControllerReference (connection , pod , r .Scheme ); err != nil {
151+ return "" , fmt .Errorf ("set owner reference %w" , err )
152+ }
153+ if err := r .Create (ctx , pod ); err != nil {
154+ return "" , fmt .Errorf ("create pod %w" , err )
155+ }
156+ return corev1 .PodPending , nil
157+ }
158+ }
159+ return pod .Status .Phase , nil
160+ }
161+
124162// handleDeletion handles cleanup of external dependencies
125163func (r * TensorFusionConnectionReconciler ) handleDeletion (ctx context.Context , connection * tfv1.TensorFusionConnection ) error {
126164 if connection .Status .GPU == "" {
@@ -209,6 +247,7 @@ func (r *TensorFusionConnectionReconciler) MustUpdateStatus(ctx context.Context,
209247func (r * TensorFusionConnectionReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
210248 return ctrl .NewControllerManagedBy (mgr ).
211249 For (& tfv1.TensorFusionConnection {}).
250+ Owns (& corev1.Pod {}).
212251 Named ("tensorfusionconnection" ).
213252 Complete (r )
214253}
0 commit comments