@@ -21,6 +21,7 @@ import (
2121 "sigs.k8s.io/controller-runtime/pkg/builder"
2222 "sigs.k8s.io/controller-runtime/pkg/client"
2323 "sigs.k8s.io/controller-runtime/pkg/predicate"
24+ workcontrollers "sigs.k8s.io/work-api/pkg/controllers"
2425
2526 "go.goms.io/fleet/apis"
2627 fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
@@ -31,26 +32,35 @@ import (
3132type Reconciler struct {
3233 hubClient client.Client
3334 memberClient client.Client
34- recorder record.EventRecorder
35+
36+ // the join/leave agent maintains the list of controllers in the member cluster
37+ // so that it can make sure that all the agents on the member cluster have joined/left
38+ // before updating the internal member cluster CR status
39+ workController * workcontrollers.ApplyWorkReconciler
40+
41+ recorder record.EventRecorder
3542}
3643
3744const (
38- eventReasonInternalMemberClusterHealthy = "InternalMemberClusterHealthy"
39- eventReasonInternalMemberClusterUnhealthy = "InternalMemberClusterUnhealthy"
40- eventReasonInternalMemberClusterJoined = "InternalMemberClusterJoined"
41- eventReasonInternalMemberClusterLeft = "InternalMemberClusterLeft"
45+ eventReasonInternalMemberClusterHealthy = "InternalMemberClusterHealthy"
46+ eventReasonInternalMemberClusterUnhealthy = "InternalMemberClusterUnhealthy"
47+ eventReasonInternalMemberClusterJoined = "InternalMemberClusterJoined"
48+ eventReasonInternalMemberClusterFailedToJoin = "InternalMemberClusterFailedToJoin"
49+ eventReasonInternalMemberClusterFailedToLeave = "InternalMemberClusterFailedToLeave"
50+ eventReasonInternalMemberClusterLeft = "InternalMemberClusterLeft"
4251)
4352
4453// NewReconciler creates a new reconciler for the internalMemberCluster CR
45- func NewReconciler (hubClient client.Client , memberClient client.Client ) * Reconciler {
54+ func NewReconciler (hubClient client.Client , memberClient client.Client , workController * workcontrollers. ApplyWorkReconciler ) * Reconciler {
4655 return & Reconciler {
47- hubClient : hubClient ,
48- memberClient : memberClient ,
56+ hubClient : hubClient ,
57+ memberClient : memberClient ,
58+ workController : workController ,
4959 }
5060}
5161
5262func (r * Reconciler ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
53- klog .V (3 ).InfoS ("Reconcile" , "InternalMemberCluster" , req .NamespacedName )
63+ klog .V (2 ).InfoS ("Reconcile" , "InternalMemberCluster" , req .NamespacedName )
5464
5565 var imc fleetv1alpha1.InternalMemberCluster
5666 if err := r .hubClient .Get (ctx , req .NamespacedName , & imc ); err != nil {
@@ -60,6 +70,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
6070
6171 switch imc .Spec .State {
6272 case fleetv1alpha1 .ClusterStateJoin :
73+ if err := r .startAgents (ctx , & imc ); err != nil {
74+ return ctrl.Result {}, err
75+ }
6376 updateMemberAgentHeartBeat (& imc )
6477 updateHealthErr := r .updateHealth (ctx , & imc )
6578 r .markInternalMemberClusterJoined (& imc )
@@ -74,6 +87,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
7487 return ctrl.Result {RequeueAfter : time .Second * time .Duration (imc .Spec .HeartbeatPeriodSeconds )}, nil
7588
7689 case fleetv1alpha1 .ClusterStateLeave :
90+ if err := r .stopAgents (ctx , & imc ); err != nil {
91+ return ctrl.Result {}, err
92+ }
7793 r .markInternalMemberClusterLeft (& imc )
7894 if err := r .updateInternalMemberClusterWithRetry (ctx , & imc ); err != nil {
7995 klog .ErrorS (err , "failed to update status for %s" , klog .KObj (& imc ))
@@ -87,9 +103,33 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
87103 }
88104}
89105
106+ // startAgents start all the member agents running on the member cluster
107+ func (r * Reconciler ) startAgents (ctx context.Context , imc * fleetv1alpha1.InternalMemberCluster ) error {
108+ // TODO: handle all the controllers uniformly if we have more
109+ if err := r .workController .Join (ctx ); err != nil {
110+ r .markInternalMemberClusterJoinFailed (imc , err )
111+ // ignore the update error since we will return an error anyway
112+ _ = r .updateInternalMemberClusterWithRetry (ctx , imc )
113+ return err
114+ }
115+ return nil
116+ }
117+
118+ // stopAgents stops all the member agents running on the member cluster
119+ func (r * Reconciler ) stopAgents (ctx context.Context , imc * fleetv1alpha1.InternalMemberCluster ) error {
120+ // TODO: handle all the controllers uniformly if we have more
121+ if err := r .workController .Leave (ctx ); err != nil {
122+ r .markInternalMemberClusterLeaveFailed (imc , err )
123+ // ignore the update error since we will return an error anyway
124+ _ = r .updateInternalMemberClusterWithRetry (ctx , imc )
125+ return err
126+ }
127+ return nil
128+ }
129+
90130// updateHealth collects and updates member cluster resource stats and set ConditionTypeInternalMemberClusterHealth.
91131func (r * Reconciler ) updateHealth (ctx context.Context , imc * fleetv1alpha1.InternalMemberCluster ) error {
92- klog .V (3 ).InfoS ("updateHealth" , "InternalMemberCluster" , klog .KObj (imc ))
132+ klog .V (2 ).InfoS ("updateHealth" , "InternalMemberCluster" , klog .KObj (imc ))
93133
94134 if err := r .updateResourceStats (ctx , imc ); err != nil {
95135 r .markInternalMemberClusterUnhealthy (imc , errors .Wrapf (err , "failed to update resource stats %s" , klog .KObj (imc )))
@@ -102,7 +142,7 @@ func (r *Reconciler) updateHealth(ctx context.Context, imc *fleetv1alpha1.Intern
102142
103143// updateResourceStats collects and updates resource usage stats of the member cluster.
104144func (r * Reconciler ) updateResourceStats (ctx context.Context , imc * fleetv1alpha1.InternalMemberCluster ) error {
105- klog .V (5 ).InfoS ("updateResourceStats" , "InternalMemberCluster" , klog .KObj (imc ))
145+ klog .V (4 ).InfoS ("updateResourceStats" , "InternalMemberCluster" , klog .KObj (imc ))
106146 var nodes corev1.NodeList
107147 if err := r .memberClient .List (ctx , & nodes ); err != nil {
108148 return errors .Wrapf (err , "failed to list nodes for member cluster %s" , klog .KObj (imc ))
@@ -132,7 +172,7 @@ func (r *Reconciler) updateResourceStats(ctx context.Context, imc *fleetv1alpha1
132172
133173// updateInternalMemberClusterWithRetry updates InternalMemberCluster status.
134174func (r * Reconciler ) updateInternalMemberClusterWithRetry (ctx context.Context , imc * fleetv1alpha1.InternalMemberCluster ) error {
135- klog .V (5 ).InfoS ("updateInternalMemberClusterWithRetry" , "InternalMemberCluster" , klog .KObj (imc ))
175+ klog .V (4 ).InfoS ("updateInternalMemberClusterWithRetry" , "InternalMemberCluster" , klog .KObj (imc ))
136176 backOffPeriod := retry .DefaultBackoff
137177 backOffPeriod .Cap = time .Second * time .Duration (imc .Spec .HeartbeatPeriodSeconds )
138178
@@ -147,15 +187,15 @@ func (r *Reconciler) updateInternalMemberClusterWithRetry(ctx context.Context, i
147187
148188// updateMemberAgentHeartBeat is used to update member agent heart beat for Internal member cluster.
149189func updateMemberAgentHeartBeat (imc * fleetv1alpha1.InternalMemberCluster ) {
150- klog .V (5 ).InfoS ("update Internal member cluster heartbeat" , "InternalMemberCluster" , klog .KObj (imc ))
190+ klog .V (4 ).InfoS ("update Internal member cluster heartbeat" , "InternalMemberCluster" , klog .KObj (imc ))
151191 desiredAgentStatus := imc .GetAgentStatus (fleetv1alpha1 .MemberAgent )
152192 if desiredAgentStatus != nil {
153193 desiredAgentStatus .LastReceivedHeartbeat = metav1 .Now ()
154194 }
155195}
156196
157197func (r * Reconciler ) markInternalMemberClusterHealthy (imc apis.ConditionedAgentObj ) {
158- klog .V (5 ).InfoS ("markInternalMemberClusterHealthy" , "InternalMemberCluster" , klog .KObj (imc ))
198+ klog .V (4 ).InfoS ("markInternalMemberClusterHealthy" , "InternalMemberCluster" , klog .KObj (imc ))
159199 newCondition := metav1.Condition {
160200 Type : string (fleetv1alpha1 .AgentHealthy ),
161201 Status : metav1 .ConditionTrue ,
@@ -174,7 +214,7 @@ func (r *Reconciler) markInternalMemberClusterHealthy(imc apis.ConditionedAgentO
174214}
175215
176216func (r * Reconciler ) markInternalMemberClusterUnhealthy (imc apis.ConditionedAgentObj , err error ) {
177- klog .V (5 ).InfoS ("markInternalMemberClusterUnhealthy" , "InternalMemberCluster" , klog .KObj (imc ))
217+ klog .V (4 ).InfoS ("markInternalMemberClusterUnhealthy" , "InternalMemberCluster" , klog .KObj (imc ))
178218 newCondition := metav1.Condition {
179219 Type : string (fleetv1alpha1 .AgentHealthy ),
180220 Status : metav1 .ConditionFalse ,
@@ -194,7 +234,7 @@ func (r *Reconciler) markInternalMemberClusterUnhealthy(imc apis.ConditionedAgen
194234}
195235
196236func (r * Reconciler ) markInternalMemberClusterJoined (imc apis.ConditionedAgentObj ) {
197- klog .V (5 ).InfoS ("markInternalMemberClusterJoined" , "InternalMemberCluster" , klog .KObj (imc ))
237+ klog .V (4 ).InfoS ("markInternalMemberClusterJoined" , "InternalMemberCluster" , klog .KObj (imc ))
198238 newCondition := metav1.Condition {
199239 Type : string (fleetv1alpha1 .AgentJoined ),
200240 Status : metav1 .ConditionTrue ,
@@ -213,8 +253,28 @@ func (r *Reconciler) markInternalMemberClusterJoined(imc apis.ConditionedAgentOb
213253 imc .SetConditionsWithType (fleetv1alpha1 .MemberAgent , newCondition )
214254}
215255
256+ func (r * Reconciler ) markInternalMemberClusterJoinFailed (imc apis.ConditionedAgentObj , err error ) {
257+ klog .V (4 ).InfoS ("markInternalMemberCluster join failed" , "error" , err , "InternalMemberCluster" , klog .KObj (imc ))
258+ newCondition := metav1.Condition {
259+ Type : string (fleetv1alpha1 .AgentJoined ),
260+ Status : metav1 .ConditionUnknown ,
261+ Reason : eventReasonInternalMemberClusterFailedToJoin ,
262+ Message : err .Error (),
263+ ObservedGeneration : imc .GetGeneration (),
264+ }
265+
266+ // Joined status changed.
267+ existingCondition := imc .GetConditionWithType (fleetv1alpha1 .MemberAgent , newCondition .Type )
268+ if existingCondition == nil || existingCondition .ObservedGeneration != imc .GetGeneration () || existingCondition .Status != newCondition .Status {
269+ r .recorder .Event (imc , corev1 .EventTypeNormal , eventReasonInternalMemberClusterFailedToJoin , "internal member cluster failed to join" )
270+ klog .ErrorS (err , "agent join failed" , "InternalMemberCluster" , klog .KObj (imc ))
271+ }
272+
273+ imc .SetConditionsWithType (fleetv1alpha1 .MemberAgent , newCondition )
274+ }
275+
216276func (r * Reconciler ) markInternalMemberClusterLeft (imc apis.ConditionedAgentObj ) {
217- klog .V (5 ).InfoS ("markInternalMemberClusterLeft" , "InternalMemberCluster" , klog .KObj (imc ))
277+ klog .V (4 ).InfoS ("markInternalMemberClusterLeft" , "InternalMemberCluster" , klog .KObj (imc ))
218278 newCondition := metav1.Condition {
219279 Type : string (fleetv1alpha1 .AgentJoined ),
220280 Status : metav1 .ConditionFalse ,
@@ -233,6 +293,26 @@ func (r *Reconciler) markInternalMemberClusterLeft(imc apis.ConditionedAgentObj)
233293 imc .SetConditionsWithType (fleetv1alpha1 .MemberAgent , newCondition )
234294}
235295
296+ func (r * Reconciler ) markInternalMemberClusterLeaveFailed (imc apis.ConditionedAgentObj , err error ) {
297+ klog .V (4 ).InfoS ("markInternalMemberCluster leave failed" , "error" , err , "InternalMemberCluster" , klog .KObj (imc ))
298+ newCondition := metav1.Condition {
299+ Type : string (fleetv1alpha1 .AgentJoined ),
300+ Status : metav1 .ConditionUnknown ,
301+ Reason : eventReasonInternalMemberClusterFailedToLeave ,
302+ Message : err .Error (),
303+ ObservedGeneration : imc .GetGeneration (),
304+ }
305+
306+ // Joined status changed.
307+ existingCondition := imc .GetConditionWithType (fleetv1alpha1 .MemberAgent , newCondition .Type )
308+ if existingCondition == nil || existingCondition .ObservedGeneration != imc .GetGeneration () || existingCondition .Status != newCondition .Status {
309+ r .recorder .Event (imc , corev1 .EventTypeNormal , eventReasonInternalMemberClusterFailedToLeave , "internal member cluster failed to leave" )
310+ klog .ErrorS (err , "agent leave failed" , "InternalMemberCluster" , klog .KObj (imc ))
311+ }
312+
313+ imc .SetConditionsWithType (fleetv1alpha1 .MemberAgent , newCondition )
314+ }
315+
236316// SetupWithManager sets up the controller with the Manager.
237317func (r * Reconciler ) SetupWithManager (mgr ctrl.Manager ) error {
238318 r .recorder = mgr .GetEventRecorderFor ("InternalMemberClusterController" )
0 commit comments