Skip to content

Commit 763285d

Browse files
🌱 Reconcile topology only when necessary (#11605)
* Reconcile topology only when necessary * Address comments * Allow resync for the cluster object
1 parent c8c7a95 commit 763285d

File tree

1 file changed

+113
-4
lines changed

1 file changed

+113
-4
lines changed

internal/controllers/topology/cluster/cluster_controller.go

Lines changed: 113 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,28 @@ package cluster
1919
import (
2020
"context"
2121
"fmt"
22+
"reflect"
2223
"time"
2324

2425
"github.com/go-logr/logr"
2526
"github.com/pkg/errors"
2627
apierrors "k8s.io/apimachinery/pkg/api/errors"
28+
"k8s.io/apimachinery/pkg/runtime"
2729
"k8s.io/apimachinery/pkg/types"
2830
kerrors "k8s.io/apimachinery/pkg/util/errors"
2931
"k8s.io/client-go/tools/record"
32+
"k8s.io/klog/v2"
3033
"k8s.io/utils/ptr"
3134
ctrl "sigs.k8s.io/controller-runtime"
3235
"sigs.k8s.io/controller-runtime/pkg/builder"
3336
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
3437
"sigs.k8s.io/controller-runtime/pkg/client"
38+
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3539
"sigs.k8s.io/controller-runtime/pkg/controller"
40+
"sigs.k8s.io/controller-runtime/pkg/event"
3641
"sigs.k8s.io/controller-runtime/pkg/handler"
3742
"sigs.k8s.io/controller-runtime/pkg/log"
43+
"sigs.k8s.io/controller-runtime/pkg/predicate"
3844
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3945

4046
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
@@ -103,8 +109,11 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
103109
predicateLog := ctrl.LoggerFrom(ctx).WithValues("controller", "topology/cluster")
104110
c, err := ctrl.NewControllerManagedBy(mgr).
105111
For(&clusterv1.Cluster{}, builder.WithPredicates(
106-
// Only reconcile Cluster with topology.
107-
predicates.ClusterHasTopology(mgr.GetScheme(), predicateLog),
112+
// Only reconcile Cluster with topology and with changes relevant for this controller.
113+
predicates.All(mgr.GetScheme(), predicateLog,
114+
predicates.ClusterHasTopology(mgr.GetScheme(), predicateLog),
115+
clusterChangeIsRelevant(mgr.GetScheme(), predicateLog),
116+
),
108117
)).
109118
Named("topology/cluster").
110119
WatchesRawSource(r.ClusterCache.GetClusterSource("topology/cluster", func(_ context.Context, o client.Object) []ctrl.Request {
@@ -118,16 +127,17 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
118127
Watches(
119128
&clusterv1.MachineDeployment{},
120129
handler.EnqueueRequestsFromMapFunc(r.machineDeploymentToCluster),
121-
// Only trigger Cluster reconciliation if the MachineDeployment is topology owned.
130+
// Only trigger Cluster reconciliation if the MachineDeployment is topology owned, the resource is changed, and the change is relevant.
122131
builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog,
123132
predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog),
124133
predicates.ResourceIsTopologyOwned(mgr.GetScheme(), predicateLog),
134+
machineDeploymentChangeIsRelevant(mgr.GetScheme(), predicateLog),
125135
)),
126136
).
127137
Watches(
128138
&expv1.MachinePool{},
129139
handler.EnqueueRequestsFromMapFunc(r.machinePoolToCluster),
130-
// Only trigger Cluster reconciliation if the MachinePool is topology owned.
140+
// Only trigger Cluster reconciliation if the MachinePool is topology owned, the resource is changed.
131141
builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog,
132142
predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog),
133143
predicates.ResourceIsTopologyOwned(mgr.GetScheme(), predicateLog),
@@ -155,6 +165,105 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
155165
return nil
156166
}
157167

168+
func clusterChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs {
169+
dropNotRelevant := func(cluster *clusterv1.Cluster) *clusterv1.Cluster {
170+
c := cluster.DeepCopy()
171+
// Drop metadata fields which are impacted by not relevant changes.
172+
c.ObjectMeta.ManagedFields = nil
173+
c.ObjectMeta.ResourceVersion = ""
174+
175+
// Drop changes on v1beta2 conditions; when v1beta2 conditions will be moved top level, we will review this
176+
// selectively drop changes not relevant for this controller.
177+
c.Status.V1Beta2 = nil
178+
return c
179+
}
180+
181+
return predicate.Funcs{
182+
UpdateFunc: func(e event.UpdateEvent) bool {
183+
log := logger.WithValues("predicate", "ClusterChangeIsRelevant", "eventType", "update")
184+
if gvk, err := apiutil.GVKForObject(e.ObjectOld, scheme); err == nil {
185+
log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectOld))
186+
}
187+
188+
if e.ObjectOld.GetResourceVersion() == e.ObjectNew.GetResourceVersion() {
189+
log.V(6).Info("Cluster resync event, allowing further processing")
190+
return true
191+
}
192+
193+
oldObj, ok := e.ObjectOld.(*clusterv1.Cluster)
194+
if !ok {
195+
log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld))
196+
return false
197+
}
198+
oldObj = dropNotRelevant(oldObj)
199+
200+
newObj := e.ObjectNew.(*clusterv1.Cluster)
201+
if !ok {
202+
log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectNew))
203+
return false
204+
}
205+
newObj = dropNotRelevant(newObj)
206+
207+
if reflect.DeepEqual(oldObj, newObj) {
208+
log.V(6).Info("Cluster does not have relevant changes, blocking further processing")
209+
return false
210+
}
211+
log.V(6).Info("Cluster has relevant changes, allowing further processing")
212+
return true
213+
},
214+
CreateFunc: func(event.CreateEvent) bool { return true },
215+
DeleteFunc: func(event.DeleteEvent) bool { return true },
216+
GenericFunc: func(event.GenericEvent) bool { return true },
217+
}
218+
}
219+
220+
func machineDeploymentChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs {
221+
dropNotRelevant := func(machineDeployment *clusterv1.MachineDeployment) *clusterv1.MachineDeployment {
222+
md := machineDeployment.DeepCopy()
223+
// Drop metadata fields which are impacted by not relevant changes.
224+
md.ObjectMeta.ManagedFields = nil
225+
md.ObjectMeta.ResourceVersion = ""
226+
227+
// Drop changes on v1beta2 conditions; when v1beta2 conditions will be moved top level, we will review this
228+
// selectively drop changes not relevant for this controller.
229+
md.Status.V1Beta2 = nil
230+
return md
231+
}
232+
233+
return predicate.Funcs{
234+
UpdateFunc: func(e event.UpdateEvent) bool {
235+
log := logger.WithValues("predicate", "MachineDeploymentChangeIsRelevant", "eventType", "update")
236+
if gvk, err := apiutil.GVKForObject(e.ObjectOld, scheme); err == nil {
237+
log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectOld))
238+
}
239+
240+
oldObj, ok := e.ObjectOld.(*clusterv1.MachineDeployment)
241+
if !ok {
242+
log.V(4).Info("Expected MachineDeployment", "type", fmt.Sprintf("%T", e.ObjectOld))
243+
return false
244+
}
245+
oldObj = dropNotRelevant(oldObj)
246+
247+
newObj := e.ObjectNew.(*clusterv1.MachineDeployment)
248+
if !ok {
249+
log.V(4).Info("Expected MachineDeployment", "type", fmt.Sprintf("%T", e.ObjectNew))
250+
return false
251+
}
252+
newObj = dropNotRelevant(newObj)
253+
254+
if reflect.DeepEqual(oldObj, newObj) {
255+
log.V(6).Info("MachineDeployment does not have relevant changes, blocking further processing")
256+
return false
257+
}
258+
log.V(6).Info("MachineDeployment has relevant changes, allowing further processing")
259+
return true
260+
},
261+
CreateFunc: func(event.CreateEvent) bool { return true },
262+
DeleteFunc: func(event.DeleteEvent) bool { return true },
263+
GenericFunc: func(event.GenericEvent) bool { return true },
264+
}
265+
}
266+
158267
// SetupForDryRun prepares the Reconciler for a dry run execution.
159268
func (r *Reconciler) SetupForDryRun(recorder record.EventRecorder) {
160269
r.desiredStateGenerator = desiredstate.NewGenerator(r.Client, r.ClusterCache, r.RuntimeClient)

0 commit comments

Comments
 (0)