Skip to content

Commit a808fed

Browse files
authored
fix: prevent worker start in pending phase and correct port order (#20)
1 parent 19c377b commit a808fed

File tree

2 files changed

+13
-11
lines changed

2 files changed

+13
-11
lines changed

internal/controller/tensorfusionconnection_controller.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,20 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
116116
}
117117
}
118118

119-
// Start worker job
120-
workerPod, err := r.tryStartWorker(ctx, connection, types.NamespacedName{Name: connection.Name, Namespace: connection.Namespace})
121-
if err != nil {
122-
log.Error(err, "Failed to start worker pod")
123-
return ctrl.Result{}, err
124-
}
119+
if connection.Status.Phase != tfv1.TensorFusionConnectionPending {
120+
// Start worker job
121+
workerPod, err := r.tryStartWorker(ctx, connection, types.NamespacedName{Name: connection.Name, Namespace: connection.Namespace})
122+
if err != nil {
123+
log.Error(err, "Failed to start worker pod")
124+
return ctrl.Result{}, err
125+
}
125126

126-
if workerPod.Status.Phase == corev1.PodRunning {
127-
connection.Status.Phase = tfv1.TensorFusionConnectionRunning
128-
connection.Status.ConnectionURL = r.WorkerGenerator.GenerateConnectionURL(gpu, connection, workerPod)
127+
if workerPod.Status.Phase == corev1.PodRunning {
128+
connection.Status.Phase = tfv1.TensorFusionConnectionRunning
129+
connection.Status.ConnectionURL = r.WorkerGenerator.GenerateConnectionURL(gpu, connection, workerPod)
130+
}
131+
// TODO: Handle PodFailure
129132
}
130-
// TODO: Handle PodFailure
131133

132134
if err := r.mustUpdateStatus(ctx, connection, gpu); err != nil {
133135
return ctrl.Result{}, err

internal/worker/worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type WorkerGenerator struct {
1515
}
1616

1717
func (wg *WorkerGenerator) GenerateConnectionURL(_gpu *tfv1.GPU, connection *tfv1.TensorFusionConnection, pod *corev1.Pod) string {
18-
return fmt.Sprintf("native+%s+%d+%d", pod.Status.PodIP, wg.WorkerConfig.SendPort, wg.WorkerConfig.ReceivePort)
18+
return fmt.Sprintf("native+%s+%d+%d", pod.Status.PodIP, wg.WorkerConfig.ReceivePort, wg.WorkerConfig.SendPort)
1919
}
2020

2121
func (wg *WorkerGenerator) GenerateWorkerPod(

0 commit comments

Comments
 (0)