@@ -40,6 +40,7 @@ import (
4040 "github.com/NexusGPU/tensor-fusion/internal/portallocator"
4141 "github.com/NexusGPU/tensor-fusion/internal/utils"
4242 "github.com/NexusGPU/tensor-fusion/internal/worker"
43+ "github.com/samber/lo"
4344)
4445
4546// TensorFusionWorkloadReconciler reconciles a TensorFusionWorkload object
@@ -78,6 +79,10 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
7879 client.MatchingLabels {constants .WorkloadKey : workload .Name }); err != nil {
7980 return ctrl.Result {}, fmt .Errorf ("list pods: %w" , err )
8081 }
82+ // only calculate state based on not deleted pods, otherwise will cause wrong total replica count
83+ podList .Items = lo .Filter (podList .Items , func (pod corev1.Pod , _ int ) bool {
84+ return pod .DeletionTimestamp .IsZero ()
85+ })
8186
8287 // handle finalizer
8388 shouldReturn , err := utils .HandleFinalizer (ctx , workload , r .Client , func (ctx context.Context , workload * tfv1.TensorFusionWorkload ) (bool , error ) {
@@ -130,15 +135,16 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
130135 if err := r .Status ().Update (ctx , workload ); err != nil {
131136 return ctrl.Result {}, fmt .Errorf ("update status: %w" , err )
132137 }
138+ return ctrl.Result {}, nil
133139 }
134140
135141 // When it is not dynamic replica, workload maintains worker replicas by itself,
136142 // In this mode, allow any Pod select connection to connect to any worker,
137143 // to achieve a sub-pool for lower costs when CPU side scaling frequency is high
138144 if ! workload .Spec .IsDynamicReplica () {
139- result , err := r .reconcileScaling (ctx , workload , podList , workerGenerator , podTemplateHash )
140- if err != nil || ! result . IsZero () {
141- return result , err
145+ err := r .reconcileScaling (ctx , workload , podList , workerGenerator , podTemplateHash )
146+ if err != nil {
147+ return ctrl. Result {} , err
142148 }
143149 }
144150
@@ -156,10 +162,15 @@ func (r *TensorFusionWorkloadReconciler) reconcileScaling(
156162 podList * corev1.PodList ,
157163 workerGenerator * worker.WorkerGenerator ,
158164 podTemplateHash string ,
159- ) (ctrl. Result , error ) {
165+ ) error {
160166 log := log .FromContext (ctx )
161167 // Check if there are any Pods using the old podTemplateHash and delete them if any
162168 if len (podList .Items ) > 0 {
169+ // make oldest pod first, to delete from oldest to latest outdated pod
170+ sort .Slice (podList .Items , func (i , j int ) bool {
171+ return podList .Items [i ].CreationTimestamp .Before (& podList .Items [j ].CreationTimestamp )
172+ })
173+
163174 var outdatedPods []corev1.Pod
164175 for i := range podList .Items {
165176 pod := & podList .Items [i ]
@@ -171,10 +182,10 @@ func (r *TensorFusionWorkloadReconciler) reconcileScaling(
171182 if len (outdatedPods ) > 0 {
172183 log .Info ("Found outdated pods with different template hash" , "count" , len (outdatedPods ))
173184 if err := r .scaleDownWorkers (ctx , workload , outdatedPods ); err != nil {
174- return ctrl. Result {}, err
185+ return err
175186 }
176- // After deletion, requeue, and the next reconcile will create a new pod
177- return ctrl. Result { Requeue : true }, nil
187+ // After deletion, requeue will be triggered by deleted Pod
188+ return nil
178189 }
179190 }
180191
@@ -189,10 +200,10 @@ func (r *TensorFusionWorkloadReconciler) reconcileScaling(
189200 log .Info ("Current replicas" , "count" , currentReplicas , "desired" , desiredReplicas )
190201
191202 // Update workload status
192- if workload .Status .Replicas != currentReplicas {
193- workload .Status .Replicas = currentReplicas
203+ if workload .Status .WorkerCount != currentReplicas {
204+ workload .Status .WorkerCount = currentReplicas
194205 if err := r .Status ().Update (ctx , workload ); err != nil {
195- return ctrl. Result {}, fmt .Errorf ("update status: %w" , err )
206+ return fmt .Errorf ("update status: %w" , err )
196207 }
197208 }
198209
@@ -203,7 +214,7 @@ func (r *TensorFusionWorkloadReconciler) reconcileScaling(
203214 // Calculate how many pods need to be added
204215 podsToAdd := int (desiredReplicas - currentReplicas )
205216 if err := r .scaleUpWorkers (ctx , workerGenerator , workload , podsToAdd , podTemplateHash ); err != nil {
206- return ctrl. Result {}, fmt .Errorf ("scale up workers: %w" , err )
217+ return fmt .Errorf ("scale up workers: %w" , err )
207218 }
208219 } else if currentReplicas > desiredReplicas {
209220 log .Info ("Scaling down workers" , "from" , currentReplicas , "to" , desiredReplicas )
@@ -216,11 +227,11 @@ func (r *TensorFusionWorkloadReconciler) reconcileScaling(
216227 // Calculate how many pods need to be removed
217228 podsToRemove := int (currentReplicas - desiredReplicas )
218229 if err := r .scaleDownWorkers (ctx , workload , podList .Items [:podsToRemove ]); err != nil {
219- return ctrl. Result {}, err
230+ return err
220231 }
221232 }
222233
223- return ctrl. Result {}, nil
234+ return nil
224235}
225236
226237func handleMetricsRecorder (podList * corev1.PodList , workload * tfv1.TensorFusionWorkload ) {
@@ -260,7 +271,6 @@ func (r *TensorFusionWorkloadReconciler) tryStartWorker(
260271// scaleDownWorkers handles the scaling down of worker pods
261272func (r * TensorFusionWorkloadReconciler ) scaleDownWorkers (ctx context.Context , workload * tfv1.TensorFusionWorkload , pods []corev1.Pod ) error {
262273 log := log .FromContext (ctx )
263-
264274 for i := range pods {
265275 podToDelete := & pods [i ]
266276 log .Info ("Scaling down worker pod" , "name" , podToDelete .Name , "workload" , workload .Name )
@@ -375,15 +385,20 @@ func (r *TensorFusionWorkloadReconciler) updateStatus(
375385 conditions = append (conditions , readyCondition )
376386
377387 // Check if we need to update status
378- statusChanged := workload .Status .ReadyReplicas != readyReplicas ||
388+ totalReplicasChangedInDynamicReplicaMode :=
389+ workload .Status .WorkerCount != int32 (len (pods )) && workload .Spec .IsDynamicReplica ()
390+ if totalReplicasChangedInDynamicReplicaMode {
391+ workload .Status .WorkerCount = int32 (len (pods ))
392+ }
393+ statusChanged := totalReplicasChangedInDynamicReplicaMode || workload .Status .ReadyWorkers != readyReplicas ||
379394 workload .Status .Phase != phase ||
380395 ! utils .EqualConditionsDisregardTransitionTime (workload .Status .Conditions , conditions )
381396
382397 if statusChanged {
383398 log .Info ("Updating workload status" , "phase" , phase , "readyReplicas" , readyReplicas )
384399 workload .Status .Phase = phase
385400 workload .Status .Conditions = conditions
386- workload .Status .ReadyReplicas = readyReplicas
401+ workload .Status .ReadyWorkers = readyReplicas
387402 if err := r .Status ().Update (ctx , workload ); err != nil {
388403 return fmt .Errorf ("update workload status: %w" , err )
389404 }
0 commit comments