@@ -87,10 +87,8 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
8787 }
8888 }
8989
90- needSelectWorker , shouldReturn , err := r .shouldSelectWorker (ctx , connection )
91- if shouldReturn {
92- // when err is not nil and shouldReturn is true,
93- // it means already cleared the existing workerName and updated status, wait next reconcile loop
90+ needSelectWorker , err := r .shouldSelectWorker (ctx , connection )
91+ if err != nil {
9492 return ctrl.Result {}, err
9593 }
9694
@@ -99,6 +97,7 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
9997 return ctrl.Result {}, nil
10098 }
10199
100+ log .Info ("Selecting worker for connection" , "connection" , connection .Name , "namespace" , connection .Namespace )
102101 if workload .Spec .IsDynamicReplica () {
103102 // 1st MODE: select the dedicated worker if it's running, otherwise wait utils it's becoming ready
104103 return ctrl.Result {}, r .syncDedicatedWorkerStatus (ctx , connection )
@@ -109,8 +108,8 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
109108}
110109
111110func (r * TensorFusionConnectionReconciler ) syncDedicatedWorkerStatus (ctx context.Context , connection * tfv1.TensorFusionConnection ) error {
112- var pod v1.Pod
113- if err := r .Get (ctx , client.ObjectKey {Name : connection .Name , Namespace : connection .Namespace }, & pod ); err != nil {
111+ pod := & v1.Pod {}
112+ if err := r .Get (ctx , client.ObjectKey {Name : connection .Name , Namespace : connection .Namespace }, pod ); err != nil {
114113 return fmt .Errorf ("failed to get dedicated worker pod for connection %w" , err )
115114 }
116115 if pod .Status .Phase != v1 .PodRunning {
@@ -124,14 +123,18 @@ func (r *TensorFusionConnectionReconciler) syncDedicatedWorkerStatus(ctx context
124123 if revision == "" {
125124 revision = "0"
126125 }
127- connection . Status . ConnectionURL = fmt . Sprintf ( "native+%s+%d+%s-%s" , pod .Status .PodIP , constants . TensorFusionWorkerPortNumber , pod .Name , revision )
126+ setConnectionWorkerURL ( connection , pod .Status .PodIP , pod .Name , revision )
128127 if err := r .Status ().Update (ctx , connection ); err != nil {
129128 return fmt .Errorf ("failed to update connection status: %w" , err )
130129 }
131130 return nil
132131 }
133132}
134133
134+ func setConnectionWorkerURL (connection * tfv1.TensorFusionConnection , podIp string , podName string , revision string ) {
135+ connection .Status .ConnectionURL = fmt .Sprintf ("native+%s+%d+%s-%s" , podIp , constants .TensorFusionWorkerPortNumber , podName , revision )
136+ }
137+
135138func (r * TensorFusionConnectionReconciler ) selectWorkerAndSyncStatusFromWorkerPool (
136139 ctx context.Context ,
137140 connection * tfv1.TensorFusionConnection ,
@@ -162,8 +165,7 @@ func (r *TensorFusionConnectionReconciler) selectWorkerAndSyncStatusFromWorkerPo
162165 if resourceVersion == "" {
163166 resourceVersion = "0"
164167 }
165-
166- connection .Status .ConnectionURL = fmt .Sprintf ("native+%s+%d+%s-%s" , s .WorkerIp , constants .TensorFusionWorkerPortNumber , s .WorkerName , resourceVersion )
168+ setConnectionWorkerURL (connection , s .WorkerIp , s .WorkerName , resourceVersion )
167169 if err := r .Status ().Update (ctx , connection ); err != nil {
168170 return ctrl.Result {}, fmt .Errorf ("update connection status: %w" , err )
169171 }
@@ -202,38 +204,48 @@ func (r *TensorFusionConnectionReconciler) patchMatchedWorkerLabel(ctx context.C
202204
203205func (r * TensorFusionConnectionReconciler ) shouldSelectWorker (
204206 ctx context.Context , connection * tfv1.TensorFusionConnection ,
205- ) (bool , bool , error ) {
206- needSelectWorker := false
207+ ) (needSelectWorker bool , err error ) {
207208 if connection .Status .WorkerName != "" {
208209 // check if worker pod is still running
209210 pod := & v1.Pod {}
210211 if err := r .Get (ctx , client.ObjectKey {Name : connection .Status .WorkerName , Namespace : connection .Namespace }, pod ); err != nil {
211212 if errors .IsNotFound (err ) {
212213 needSelectWorker = true
213214 } else {
214- return false , true , fmt .Errorf ("failed to get worker pod: %w" , err )
215+ return needSelectWorker , fmt .Errorf ("failed to get worker pod: %w" , err )
215216 }
216217 }
218+ // NOTE: no need to handle pod deleting since connection should be deleted at first, sync running status with Pod
217219 if pod .Status .Phase != v1 .PodRunning {
218220 connection .Status .WorkerName = ""
219221 connection .Status .Phase = tfv1 .WorkerFailed
220222 connection .Status .ConnectionURL = ""
221223 // set worker name to empty to trigger select worker again
222224 if updateErr := r .Status ().Update (ctx , connection ); updateErr != nil {
223- return false , true , fmt .Errorf ("failed to update connection status: %w" , updateErr )
225+ return false , fmt .Errorf ("failed to update connection status: %w" , updateErr )
226+ }
227+ // let next reconcile loop to trigger select worker
228+ return false , nil
229+ } else if connection .Status .Phase != tfv1 .WorkerRunning {
230+ // pod is running now, but connection is not running, update connection to running
231+ connection .Status .Phase = tfv1 .WorkerRunning
232+ setConnectionWorkerURL (connection , pod .Status .PodIP , pod .Name , pod .ResourceVersion )
233+ if updateErr := r .Status ().Update (ctx , connection ); updateErr != nil {
234+ return false , fmt .Errorf ("failed to update connection status: %w" , updateErr )
224235 }
225- return false , true , nil
236+ // current worker is working again, no need to select another worker
237+ return false , nil
226238 }
227239 } else {
228240 if connection .Status .Phase == "" {
229241 connection .Status .Phase = tfv1 .WorkerPending
230242 if updateErr := r .Status ().Update (ctx , connection ); updateErr != nil {
231- return false , true , fmt .Errorf ("failed to update connection status: %w" , updateErr )
243+ return false , fmt .Errorf ("failed to update connection status: %w" , updateErr )
232244 }
233245 }
234246 needSelectWorker = true
235247 }
236- return needSelectWorker , false , nil
248+ return needSelectWorker , nil
237249}
238250
239251// SetupWithManager sets up the controller with the Manager.
0 commit comments