Skip to content

Commit 4718536

Browse files
committed
fix: alloc worker port to avoid port conflicts
1 parent d24f789 commit 4718536

File tree

8 files changed

+52
-24
lines changed

8 files changed

+52
-24
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/onsi/ginkgo/v2 v2.22.1
1010
github.com/onsi/gomega v1.36.2
1111
github.com/samber/lo v1.47.0
12+
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
1213
gomodules.xyz/jsonpatch/v2 v2.4.0
1314
k8s.io/api v0.32.0
1415
k8s.io/apimachinery v0.32.0
@@ -94,7 +95,6 @@ require (
9495
go.uber.org/zap v1.27.0 // indirect
9596
golang.org/x/arch v0.12.0 // indirect
9697
golang.org/x/crypto v0.31.0 // indirect
97-
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
9898
golang.org/x/net v0.33.0 // indirect
9999
golang.org/x/oauth2 v0.24.0 // indirect
100100
golang.org/x/sync v0.10.0 // indirect

internal/config/config.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ type Config struct {
1515

1616
type Worker struct {
1717
corev1.PodTemplate
18-
Port int16 `json:"port"`
1918
}
2019

2120
type PodMutation struct {
@@ -40,7 +39,6 @@ func LoadConfig(filename string) (*Config, error) {
4039
func NewDefaultConfig() *Config {
4140
return &Config{
4241
Worker: Worker{
43-
Port: 1234,
4442
PodTemplate: corev1.PodTemplate{
4543
Template: corev1.PodTemplateSpec{
4644
Spec: corev1.PodSpec{

internal/constants/constants.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,23 @@ package constants
33
import "time"
44

55
const (
6-
// TensorFusionDomain is the domain prefix used for all tensor-fusion.ai related annotations and finalizers
7-
TensorFusionDomain = "tensor-fusion.ai"
6+
// Domain is the domain prefix used for all tensor-fusion.ai related annotations and finalizers
7+
Domain = "tensor-fusion.ai"
88

99
// Finalizer constants
10-
TensorFusionFinalizerSuffix = "finalizer"
11-
TensorFusionFinalizer = TensorFusionDomain + "/" + TensorFusionFinalizerSuffix
10+
FinalizerSuffix = "finalizer"
11+
Finalizer = Domain + "/" + FinalizerSuffix
1212

1313
// Annotation key constants
14-
EnableContainerAnnotationFormat = TensorFusionDomain + "/enable-%s"
15-
TFLOPSContainerAnnotationFormat = TensorFusionDomain + "/tflops-%s"
16-
VRAMContainerAnnotationFormat = TensorFusionDomain + "/vram-%s"
14+
EnableContainerAnnotationFormat = Domain + "/enable-%s"
15+
TFLOPSContainerAnnotationFormat = Domain + "/tflops-%s"
16+
VRAMContainerAnnotationFormat = Domain + "/vram-%s"
1717

1818
PendingRequeueDuration = time.Second * 3
1919

2020
GetConnectionURLEnv = "TENSOR_FUSION_OPERATOR_GET_CONNECTION_URL"
2121
ConnectionNameEnv = "TENSOR_FUSION_CONNECTION_NAME"
2222
ConnectionNamespaceEnv = "TENSOR_FUSION_CONNECTION_NAMESPACE"
23+
24+
WorkerPortEnv = "TENSOR_FUSION_WORKER_PORT"
2325
)

internal/controller/pod_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func GenerateTensorFusionConnection(pod *corev1.Pod, tfReq []webhookv1.TFReq) []
119119
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
120120
p, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
121121
MatchLabels: map[string]string{
122-
constants.TensorFusionDomain + "/enabled": "true",
122+
constants.Domain + "/enabled": "true",
123123
},
124124
})
125125
if err != nil {

internal/controller/tensorfusioncluster_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
)
3030

3131
var (
32-
tensorFusionClusterFinalizer = constants.TensorFusionFinalizer
32+
tensorFusionClusterFinalizer = constants.Finalizer
3333
)
3434

3535
// TensorFusionClusterReconciler reconciles a TensorFusionCluster object

internal/controller/tensorfusionconnection_controller.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type TensorFusionConnectionReconciler struct {
4444
}
4545

4646
var (
47-
tensorFusionConnectionFinalizer = constants.TensorFusionFinalizer
47+
tensorFusionConnectionFinalizer = constants.Finalizer
4848
)
4949

5050
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
@@ -134,7 +134,10 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
134134

135135
if workerPod.Status.Phase == corev1.PodRunning {
136136
connection.Status.Phase = tfv1.TensorFusionConnectionRunning
137-
connection.Status.ConnectionURL = r.WorkerGenerator.GenerateConnectionURL(connection, workerPod)
137+
connection.Status.ConnectionURL, err = r.WorkerGenerator.GenerateConnectionURL(connection, workerPod)
138+
if err != nil {
139+
return ctrl.Result{}, err
140+
}
138141
}
139142
// TODO: Handle PodFailure
140143
}
@@ -157,7 +160,8 @@ func (r *TensorFusionConnectionReconciler) tryStartWorker(ctx context.Context, g
157160
if err := r.Get(ctx, namespacedName, pod); err != nil {
158161
if errors.IsNotFound(err) {
159162
// Pod doesn't exist, create a new one
160-
pod = r.WorkerGenerator.GenerateWorkerPod(gpu, connection, namespacedName)
163+
port := r.WorkerGenerator.AllocPort()
164+
pod = r.WorkerGenerator.GenerateWorkerPod(gpu, connection, namespacedName, port)
161165
if err := ctrl.SetControllerReference(connection, pod, r.Scheme); err != nil {
162166
return nil, fmt.Errorf("set owner reference %w", err)
163167
}

internal/webhook/v1/pod_webhook_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ var _ = Describe("TensorFusionPodMutator", func() {
6969
Name: "test-pod",
7070
Namespace: "default",
7171
Annotations: map[string]string{
72-
constants.TensorFusionDomain + "/tflops-main": "100",
73-
constants.TensorFusionDomain + "/vram-main": "16Gi",
72+
constants.Domain + "/tflops-main": "100",
73+
constants.Domain + "/vram-main": "16Gi",
7474
},
7575
},
7676
Spec: corev1.PodSpec{
@@ -165,8 +165,8 @@ var _ = Describe("TensorFusionPodMutator", func() {
165165
pod := &corev1.Pod{
166166
ObjectMeta: metav1.ObjectMeta{
167167
Annotations: map[string]string{
168-
constants.TensorFusionDomain + "/tflops-test-container": "100",
169-
constants.TensorFusionDomain + "/vram-test-container": "16Gi",
168+
constants.Domain + "/tflops-test-container": "100",
169+
constants.Domain + "/vram-test-container": "16Gi",
170170
},
171171
},
172172
Spec: corev1.PodSpec{

internal/worker/worker.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,51 @@ package worker
22

33
import (
44
"fmt"
5+
"strconv"
6+
"time"
57

68
tfv1 "github.com/NexusGPU/tensor-fusion-operator/api/v1"
79
"github.com/NexusGPU/tensor-fusion-operator/internal/config"
10+
"github.com/NexusGPU/tensor-fusion-operator/internal/constants"
11+
"github.com/samber/lo"
12+
"golang.org/x/exp/rand"
813
corev1 "k8s.io/api/core/v1"
914
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1015
"k8s.io/apimachinery/pkg/types"
1116
)
1217

18+
func init() {
19+
rand.Seed(uint64(time.Now().UnixNano()))
20+
}
21+
1322
type WorkerGenerator struct {
1423
WorkerConfig *config.Worker
1524
}
1625

17-
func (wg *WorkerGenerator) GenerateConnectionURL(connection *tfv1.TensorFusionConnection, pod *corev1.Pod) string {
18-
return fmt.Sprintf("native+%s+%d", pod.Status.PodIP, wg.WorkerConfig.Port)
26+
func (wg *WorkerGenerator) GenerateConnectionURL(connection *tfv1.TensorFusionConnection, pod *corev1.Pod) (string, error) {
27+
port, ok := lo.Find(pod.Spec.Containers[0].Env, func(env corev1.EnvVar) bool {
28+
return env.Name == constants.WorkerPortEnv
29+
})
30+
31+
if !ok {
32+
return "", fmt.Errorf("worker port not found in pod %s", pod.Name)
33+
}
34+
return fmt.Sprintf("native+%s+%d", pod.Status.PodIP, port.Value), nil
35+
}
36+
37+
func (wg *WorkerGenerator) AllocPort() int16 {
38+
min := 30000
39+
max := 65535
40+
return int16(rand.Intn(max-min+1) + min)
1941
}
2042

2143
func (wg *WorkerGenerator) GenerateWorkerPod(
2244
gpu *tfv1.GPU,
2345
connection *tfv1.TensorFusionConnection,
2446
namespacedName types.NamespacedName,
47+
port int16,
2548
) *corev1.Pod {
26-
2749
spec := wg.WorkerConfig.Template.Spec.DeepCopy()
28-
2950
if spec.NodeSelector == nil {
3051
spec.NodeSelector = make(map[string]string)
3152
}
@@ -34,13 +55,16 @@ func (wg *WorkerGenerator) GenerateWorkerPod(
3455
spec.Containers[0].Env = append(spec.Containers[0].Env, corev1.EnvVar{
3556
Name: "NVIDIA_VISIBLE_DEVICES",
3657
Value: gpu.Status.UUID,
58+
}, corev1.EnvVar{
59+
Name: constants.WorkerPortEnv,
60+
Value: strconv.Itoa(int(port)),
3761
})
3862

3963
return &corev1.Pod{
4064
ObjectMeta: metav1.ObjectMeta{
4165
Name: namespacedName.Name,
4266
Namespace: namespacedName.Namespace,
4367
},
44-
Spec: spec,
68+
Spec: *spec,
4569
}
4670
}

0 commit comments

Comments
 (0)