@@ -21,6 +21,7 @@ import (
2121 "encoding/json"
2222 "fmt"
2323 "net/http"
24+ "path"
2425
2526 "gomodules.xyz/jsonpatch/v2"
2627 corev1 "k8s.io/api/core/v1"
@@ -34,6 +35,8 @@ import (
3435
3536 "github.com/NexusGPU/tensor-fusion-operator/internal/config"
3637 "github.com/NexusGPU/tensor-fusion-operator/internal/constants"
38+ "github.com/lithammer/shortuuid/v4"
39+ "github.com/samber/lo"
3740)
3841
3942// SetupPodWebhookWithManager registers the webhook for Pod in the manager.
@@ -65,7 +68,7 @@ func (m *TensorFusionPodMutator) Handle(ctx context.Context, req admission.Reque
6568 }
6669
6770 log := log .FromContext (ctx )
68- log .Info ("Mutating pod" , "name " , pod .Name , "namespace" , pod .Namespace )
71+ log .Info ("Mutating pod" , "generateName " , pod .GenerateName , "namespace" , pod .Namespace )
6972
7073 reqs := ParseTFReq (pod )
7174 if len (reqs ) == 0 {
@@ -88,9 +91,11 @@ func (m *TensorFusionPodMutator) InjectDecoder(d admission.Decoder) error {
8891}
8992
9093type TFReq struct {
91- ContainerName string
92- Tflops resource.Quantity
93- Vram resource.Quantity
94+ ContainerName string
95+ ConnectionName string
96+ ConnectionNamespace string
97+ Tflops resource.Quantity
98+ Vram resource.Quantity
9499}
95100
96101func ParseTFReq (pod * corev1.Pod ) []TFReq {
@@ -114,8 +119,25 @@ func ParseTFReq(pod *corev1.Pod) []TFReq {
114119 continue
115120 }
116121
122+ connectionNameEnv , ok := lo .Find (container .Env , func (e corev1.EnvVar ) bool {
123+ return e .Name == constants .ConnectionNameEnv
124+ })
125+
126+ if ! ok {
127+ continue
128+ }
129+ connectionNamespaceEnv , ok := lo .Find (container .Env , func (e corev1.EnvVar ) bool {
130+ return e .Name == constants .ConnectionNamespaceEnv
131+ })
132+
133+ if ! ok {
134+ continue
135+ }
136+
117137 req := TFReq {
118- ContainerName : containerName ,
138+ ContainerName : containerName ,
139+ ConnectionName : connectionNameEnv .Name ,
140+ ConnectionNamespace : connectionNamespaceEnv .Name ,
119141 }
120142
121143 // Parse TFLOPS requirement
@@ -152,6 +174,7 @@ func (m *TensorFusionPodMutator) patchTFClient(pod *corev1.Pod, tfReq []TFReq) (
152174 for i := range pod .Spec .Containers {
153175 container := & pod .Spec .Containers [i ]
154176 if container .Name == req .ContainerName {
177+ // patch from config
155178 containerJSON , err := json .Marshal (container )
156179 if err != nil {
157180 return nil , fmt .Errorf ("marshal container: %v" , err )
@@ -165,10 +188,26 @@ func (m *TensorFusionPodMutator) patchTFClient(pod *corev1.Pod, tfReq []TFReq) (
165188 if err != nil {
166189 return nil , fmt .Errorf ("apply strategic merge patch to container: %v" , err )
167190 }
168-
169191 if err := json .Unmarshal (patchedJSON , container ); err != nil {
170192 return nil , fmt .Errorf ("unmarshal patched container: %v" , err )
171193 }
194+
195+ // add connection env
196+ connectionName := fmt .Sprintf ("%s-tf-worker-%s" , pod .GenerateName + container .Name , shortuuid .New ())
197+ connectionNamespace := pod .Namespace
198+
199+ container .Env = append (container .Env , corev1.EnvVar {
200+ Name : constants .ConnectionNameEnv ,
201+ Value : connectionName ,
202+ })
203+ container .Env = append (container .Env , corev1.EnvVar {
204+ Name : constants .ConnectionNamespaceEnv ,
205+ Value : connectionNamespace ,
206+ })
207+ container .Env = append (container .Env , corev1.EnvVar {
208+ Name : constants .GetConnectionURLEnv ,
209+ Value : path .Join (m .Config .OperatorEndpoint , fmt .Sprintf ("/api/connection?name=%s,namespace=%s" , connectionName , connectionNamespace )),
210+ })
172211 }
173212 }
174213 }
0 commit comments