@@ -26,7 +26,9 @@ import (
2626 "k8s.io/client-go/tools/record"
2727 ctrl "sigs.k8s.io/controller-runtime"
2828 "sigs.k8s.io/controller-runtime/pkg/client"
29+ "sigs.k8s.io/controller-runtime/pkg/handler"
2930 "sigs.k8s.io/controller-runtime/pkg/log"
31+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
3032
3133 tfv1 "github.com/NexusGPU/tensor-fusion-operator/api/v1"
3234 "github.com/NexusGPU/tensor-fusion-operator/internal/constants"
@@ -109,6 +111,37 @@ func (r *TensorFusionConnectionReconciler) needReSelectWorker(connection *tfv1.T
109111func (r * TensorFusionConnectionReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
110112 return ctrl .NewControllerManagedBy (mgr ).
111113 For (& tfv1.TensorFusionConnection {}).
114+ Watches (
115+ & tfv1.TensorFusionWorkload {},
116+ handler .EnqueueRequestsFromMapFunc (r .findConnectionsForWorkload ),
117+ ).
112118 Named ("tensorfusionconnection" ).
113119 Complete (r )
114120}
121+
122+ // findConnectionsForWorkload maps a TensorFusionWorkload to its associated TensorFusionConnections
123+ func (r * TensorFusionConnectionReconciler ) findConnectionsForWorkload (ctx context.Context , obj client.Object ) []reconcile.Request {
124+ workload , ok := obj .(* tfv1.TensorFusionWorkload )
125+ if ! ok {
126+ return nil
127+ }
128+
129+ // Get the list of connections associated with this workload
130+ connectionList := & tfv1.TensorFusionConnectionList {}
131+ if err := r .List (ctx , connectionList ,
132+ client .InNamespace (workload .Namespace ),
133+ client.MatchingLabels {constants .WorkloadKey : workload .Name }); err != nil {
134+ return nil
135+ }
136+ requests := []reconcile.Request {}
137+ for i := range connectionList .Items {
138+ connection := & connectionList .Items [i ]
139+ requests = append (requests , reconcile.Request {
140+ NamespacedName : client.ObjectKey {
141+ Name : connection .Name ,
142+ Namespace : connection .Namespace ,
143+ },
144+ })
145+ }
146+ return requests
147+ }
0 commit comments