Skip to content

Commit f350825

Browse files
committed
Prefactor: extract resync operation
1 parent ef401c7 commit f350825

File tree

1 file changed

+74
-53
lines changed

1 file changed

+74
-53
lines changed

pkg/controller/sharder/reconciler.go

Lines changed: 74 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -76,49 +76,45 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
7676
return reconcile.Result{}, fmt.Errorf("error retrieving object from store: %w", err)
7777
}
7878

79-
log = log.WithValues("ring", client.ObjectKeyFromObject(controllerRing))
79+
o, err := r.NewOperation(ctx, controllerRing)
80+
if err != nil {
81+
return reconcile.Result{}, err
82+
}
83+
84+
if err := o.ResyncControllerRing(ctx, log); err != nil {
85+
return reconcile.Result{}, err
86+
}
87+
88+
// requeue for periodic resync
89+
return reconcile.Result{RequeueAfter: r.Config.Controller.Sharder.SyncPeriod.Duration}, nil
90+
}
8091

92+
func (r *Reconciler) NewOperation(ctx context.Context, controllerRing *shardingv1alpha1.ControllerRing) (*Operation, error) {
8193
// collect list of shards in the ring
8294
leaseList := &coordinationv1.LeaseList{}
8395
if err := r.Client.List(ctx, leaseList, client.MatchingLabelsSelector{Selector: controllerRing.LeaseSelector()}); err != nil {
84-
return reconcile.Result{}, fmt.Errorf("error listing Leases for ControllerRing: %w", err)
96+
return nil, fmt.Errorf("error listing Leases for ControllerRing: %w", err)
8597
}
8698

8799
// get ring and shards from cache
88100
hashRing, shards := ring.FromLeases(controllerRing, leaseList, r.Clock.Now())
89101

90-
namespaces, err := r.getSelectedNamespaces(ctx, controllerRing)
102+
namespaces, err := r.GetSelectedNamespaces(ctx, controllerRing)
91103
if err != nil {
92-
return reconcile.Result{}, err
93-
}
94-
95-
allErrs := &multierror.Error{
96-
ErrorFormat: utilerrors.FormatErrors,
97-
}
98-
99-
// resync all ring resources
100-
for _, ringResource := range controllerRing.Spec.Resources {
101-
allErrs = multierror.Append(allErrs,
102-
r.resyncResource(ctx, log, ringResource.GroupResource, controllerRing, namespaces, hashRing, shards, false),
103-
)
104-
105-
for _, controlledResource := range ringResource.ControlledResources {
106-
allErrs = multierror.Append(allErrs,
107-
r.resyncResource(ctx, log, controlledResource, controllerRing, namespaces, hashRing, shards, true),
108-
)
109-
}
110-
}
111-
112-
// collect all errors and return a combined error if any occurred
113-
if err := allErrs.ErrorOrNil(); err != nil {
114-
return reconcile.Result{}, err
104+
return nil, err
115105
}
116106

117-
// requeue for periodic resync
118-
return reconcile.Result{RequeueAfter: r.Config.Controller.Sharder.SyncPeriod.Duration}, nil
107+
return &Operation{
108+
Client: r.Client,
109+
Reader: r.Reader,
110+
ControllerRing: controllerRing,
111+
Namespaces: namespaces,
112+
HashRing: hashRing,
113+
Shards: shards,
114+
}, nil
119115
}
120116

121-
func (r *Reconciler) getSelectedNamespaces(ctx context.Context, controllerRing *shardingv1alpha1.ControllerRing) (sets.Set[string], error) {
117+
func (r *Reconciler) GetSelectedNamespaces(ctx context.Context, controllerRing *shardingv1alpha1.ControllerRing) (sets.Set[string], error) {
122118
namespaceSelector := r.Config.Webhook.Config.NamespaceSelector
123119
if controllerRing.Spec.NamespaceSelector != nil {
124120
namespaceSelector = controllerRing.Spec.NamespaceSelector
@@ -142,19 +138,47 @@ func (r *Reconciler) getSelectedNamespaces(ctx context.Context, controllerRing *
142138
return namespaceSet, err
143139
}
144140

145-
func (r *Reconciler) resyncResource(
141+
type Operation struct {
142+
Client client.Client
143+
Reader client.Reader
144+
145+
ControllerRing *shardingv1alpha1.ControllerRing
146+
Namespaces sets.Set[string]
147+
HashRing *consistenthash.Ring
148+
Shards leases.Shards
149+
}
150+
151+
func (o *Operation) ResyncControllerRing(ctx context.Context, log logr.Logger) error {
152+
allErrs := &multierror.Error{
153+
ErrorFormat: utilerrors.FormatErrors,
154+
}
155+
156+
// resync all ring resources
157+
for _, ringResource := range o.ControllerRing.Spec.Resources {
158+
allErrs = multierror.Append(allErrs,
159+
o.resyncResource(ctx, log, ringResource.GroupResource, false),
160+
)
161+
162+
for _, controlledResource := range ringResource.ControlledResources {
163+
allErrs = multierror.Append(allErrs,
164+
o.resyncResource(ctx, log, controlledResource, true),
165+
)
166+
}
167+
}
168+
169+
// collect all errors and return a combined error if any occurred
170+
return allErrs.ErrorOrNil()
171+
}
172+
173+
func (o *Operation) resyncResource(
146174
ctx context.Context,
147175
log logr.Logger,
148176
gr metav1.GroupResource,
149-
ring *shardingv1alpha1.ControllerRing,
150-
namespaces sets.Set[string],
151-
hashRing *consistenthash.Ring,
152-
shards leases.Shards,
153177
controlled bool,
154178
) error {
155179
log = log.WithValues("resource", gr)
156180

157-
gvks, err := r.Client.RESTMapper().KindsFor(schema.GroupVersionResource{Group: gr.Group, Resource: gr.Resource})
181+
gvks, err := o.Client.RESTMapper().KindsFor(schema.GroupVersionResource{Group: gr.Group, Resource: gr.Resource})
158182
if err != nil {
159183
return fmt.Errorf("error determining kinds for resource %q: %w", gr.String(), err)
160184
}
@@ -166,13 +190,13 @@ func (r *Reconciler) resyncResource(
166190

167191
list := &metav1.PartialObjectMetadataList{}
168192
list.SetGroupVersionKind(gvks[0])
169-
err = pager.New(r.Reader).EachListItemWithAlloc(ctx, list,
193+
err = pager.New(o.Reader).EachListItemWithAlloc(ctx, list,
170194
func(obj client.Object) error {
171-
if !namespaces.Has(obj.GetNamespace()) {
195+
if !o.Namespaces.Has(obj.GetNamespace()) {
172196
return nil
173197
}
174198

175-
allErrs = multierror.Append(allErrs, r.resyncObject(ctx, log, gr, obj.(*metav1.PartialObjectMetadata), ring, hashRing, shards, controlled))
199+
allErrs = multierror.Append(allErrs, o.resyncObject(ctx, log, gr, controlled, obj.(*metav1.PartialObjectMetadata)))
176200
return nil
177201
},
178202
// List a recent version from the API server's watch cache by setting resourceVersion=0. This reduces the load on etcd
@@ -196,15 +220,12 @@ var (
196220
KeyForController = key.ForController
197221
)
198222

199-
func (r *Reconciler) resyncObject(
223+
func (o *Operation) resyncObject(
200224
ctx context.Context,
201225
log logr.Logger,
202226
gr metav1.GroupResource,
203-
obj *metav1.PartialObjectMetadata,
204-
ring *shardingv1alpha1.ControllerRing,
205-
hashRing *consistenthash.Ring,
206-
shards leases.Shards,
207227
controlled bool,
228+
obj *metav1.PartialObjectMetadata,
208229
) error {
209230
log = log.WithValues("object", client.ObjectKeyFromObject(obj))
210231

@@ -223,8 +244,8 @@ func (r *Reconciler) resyncObject(
223244
}
224245

225246
var (
226-
desiredShard = hashRing.Hash(hashKey)
227-
currentShard = obj.Labels[ring.LabelShard()]
247+
desiredShard = o.HashRing.Hash(hashKey)
248+
currentShard = obj.Labels[o.ControllerRing.LabelShard()]
228249
)
229250

230251
if desiredShard == "" {
@@ -237,20 +258,20 @@ func (r *Reconciler) resyncObject(
237258
return nil
238259
}
239260

240-
if currentShard != "" && currentShard != desiredShard && shards.ByID(currentShard).State.IsAvailable() && !controlled {
261+
if currentShard != "" && o.Shards.ByID(currentShard).State.IsAvailable() && !controlled {
241262
// If the object should be moved and the current shard is still available, we need to drain it.
242263
// We only drain non-controlled objects, the controller's main object is used as a synchronization point for
243264
// preventing concurrent reconciliations.
244265
log.V(1).Info("Draining object from shard", "currentShard", currentShard)
245266

246267
patch := client.MergeFromWithOptions(obj.DeepCopy(), client.MergeFromWithOptimisticLock{})
247-
metav1.SetMetaDataLabel(&obj.ObjectMeta, ring.LabelDrain(), "true")
248-
if err := r.Client.Patch(ctx, obj, patch); err != nil {
268+
metav1.SetMetaDataLabel(&obj.ObjectMeta, o.ControllerRing.LabelDrain(), "true")
269+
if err := o.Client.Patch(ctx, obj, patch); err != nil {
249270
return fmt.Errorf("error draining %s %q: %w", gr.String(), client.ObjectKeyFromObject(obj), err)
250271
}
251272

252273
shardingmetrics.DrainsTotal.WithLabelValues(
253-
ring.Name, gr.Group, gr.Resource,
274+
o.ControllerRing.Name, gr.Group, gr.Resource,
254275
).Inc()
255276

256277
// object will go through the sharder webhook when shard removes the drain label, which will perform the assignment
@@ -264,14 +285,14 @@ func (r *Reconciler) resyncObject(
264285
patch := client.MergeFromWithOptions(obj.DeepCopy(), client.MergeFromWithOptimisticLock{})
265286
// remove drain label if it is still present, this might happen when trying to drain an object from a shard that
266287
// just got unavailable
267-
delete(obj.Labels, ring.LabelShard())
268-
delete(obj.Labels, ring.LabelDrain())
269-
if err := r.Client.Patch(ctx, obj, patch); err != nil {
288+
delete(obj.Labels, o.ControllerRing.LabelShard())
289+
delete(obj.Labels, o.ControllerRing.LabelDrain())
290+
if err := o.Client.Patch(ctx, obj, patch); err != nil {
270291
return fmt.Errorf("error triggering assignment for %s %q: %w", gr.String(), client.ObjectKeyFromObject(obj), err)
271292
}
272293

273294
shardingmetrics.MovementsTotal.WithLabelValues(
274-
ring.Name, gr.Group, gr.Resource,
295+
o.ControllerRing.Name, gr.Group, gr.Resource,
275296
).Inc()
276297

277298
return nil

0 commit comments

Comments
 (0)