Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 74 additions & 53 deletions pkg/controller/sharder/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,49 +76,45 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
return reconcile.Result{}, fmt.Errorf("error retrieving object from store: %w", err)
}

log = log.WithValues("ring", client.ObjectKeyFromObject(controllerRing))
o, err := r.NewOperation(ctx, controllerRing)
if err != nil {
return reconcile.Result{}, err
}

if err := o.ResyncControllerRing(ctx, log); err != nil {
return reconcile.Result{}, err
}

// requeue for periodic resync
return reconcile.Result{RequeueAfter: r.Config.Controller.Sharder.SyncPeriod.Duration}, nil
}

func (r *Reconciler) NewOperation(ctx context.Context, controllerRing *shardingv1alpha1.ControllerRing) (*Operation, error) {
// collect list of shards in the ring
leaseList := &coordinationv1.LeaseList{}
if err := r.Client.List(ctx, leaseList, client.MatchingLabelsSelector{Selector: controllerRing.LeaseSelector()}); err != nil {
return reconcile.Result{}, fmt.Errorf("error listing Leases for ControllerRing: %w", err)
return nil, fmt.Errorf("error listing Leases for ControllerRing: %w", err)
}

// get ring and shards from cache
hashRing, shards := ring.FromLeases(controllerRing, leaseList, r.Clock.Now())

namespaces, err := r.getSelectedNamespaces(ctx, controllerRing)
namespaces, err := r.GetSelectedNamespaces(ctx, controllerRing)
if err != nil {
return reconcile.Result{}, err
}

allErrs := &multierror.Error{
ErrorFormat: utilerrors.FormatErrors,
}

// resync all ring resources
for _, ringResource := range controllerRing.Spec.Resources {
allErrs = multierror.Append(allErrs,
r.resyncResource(ctx, log, ringResource.GroupResource, controllerRing, namespaces, hashRing, shards, false),
)

for _, controlledResource := range ringResource.ControlledResources {
allErrs = multierror.Append(allErrs,
r.resyncResource(ctx, log, controlledResource, controllerRing, namespaces, hashRing, shards, true),
)
}
}

// collect all errors and return a combined error if any occurred
if err := allErrs.ErrorOrNil(); err != nil {
return reconcile.Result{}, err
return nil, err
}

// requeue for periodic resync
return reconcile.Result{RequeueAfter: r.Config.Controller.Sharder.SyncPeriod.Duration}, nil
return &Operation{
Client: r.Client,
Reader: r.Reader,
ControllerRing: controllerRing,
Namespaces: namespaces,
HashRing: hashRing,
Shards: shards,
}, nil
}

func (r *Reconciler) getSelectedNamespaces(ctx context.Context, controllerRing *shardingv1alpha1.ControllerRing) (sets.Set[string], error) {
func (r *Reconciler) GetSelectedNamespaces(ctx context.Context, controllerRing *shardingv1alpha1.ControllerRing) (sets.Set[string], error) {
namespaceSelector := r.Config.Webhook.Config.NamespaceSelector
if controllerRing.Spec.NamespaceSelector != nil {
namespaceSelector = controllerRing.Spec.NamespaceSelector
Expand All @@ -142,19 +138,47 @@ func (r *Reconciler) getSelectedNamespaces(ctx context.Context, controllerRing *
return namespaceSet, err
}

func (r *Reconciler) resyncResource(
type Operation struct {
Client client.Client
Reader client.Reader

ControllerRing *shardingv1alpha1.ControllerRing
Namespaces sets.Set[string]
HashRing *consistenthash.Ring
Shards leases.Shards
}

func (o *Operation) ResyncControllerRing(ctx context.Context, log logr.Logger) error {
allErrs := &multierror.Error{
ErrorFormat: utilerrors.FormatErrors,
}

// resync all ring resources
for _, ringResource := range o.ControllerRing.Spec.Resources {
allErrs = multierror.Append(allErrs,
o.resyncResource(ctx, log, ringResource.GroupResource, false),
)

for _, controlledResource := range ringResource.ControlledResources {
allErrs = multierror.Append(allErrs,
o.resyncResource(ctx, log, controlledResource, true),
)
}
}

// collect all errors and return a combined error if any occurred
return allErrs.ErrorOrNil()
}

func (o *Operation) resyncResource(
ctx context.Context,
log logr.Logger,
gr metav1.GroupResource,
ring *shardingv1alpha1.ControllerRing,
namespaces sets.Set[string],
hashRing *consistenthash.Ring,
shards leases.Shards,
controlled bool,
) error {
log = log.WithValues("resource", gr)

gvks, err := r.Client.RESTMapper().KindsFor(schema.GroupVersionResource{Group: gr.Group, Resource: gr.Resource})
gvks, err := o.Client.RESTMapper().KindsFor(schema.GroupVersionResource{Group: gr.Group, Resource: gr.Resource})
if err != nil {
return fmt.Errorf("error determining kinds for resource %q: %w", gr.String(), err)
}
Expand All @@ -166,13 +190,13 @@ func (r *Reconciler) resyncResource(

list := &metav1.PartialObjectMetadataList{}
list.SetGroupVersionKind(gvks[0])
err = pager.New(r.Reader).EachListItemWithAlloc(ctx, list,
err = pager.New(o.Reader).EachListItemWithAlloc(ctx, list,
func(obj client.Object) error {
if !namespaces.Has(obj.GetNamespace()) {
if !o.Namespaces.Has(obj.GetNamespace()) {
return nil
}

allErrs = multierror.Append(allErrs, r.resyncObject(ctx, log, gr, obj.(*metav1.PartialObjectMetadata), ring, hashRing, shards, controlled))
allErrs = multierror.Append(allErrs, o.resyncObject(ctx, log, gr, controlled, obj.(*metav1.PartialObjectMetadata)))
return nil
},
// List a recent version from the API server's watch cache by setting resourceVersion=0. This reduces the load on etcd
Expand All @@ -196,15 +220,12 @@ var (
KeyForController = key.ForController
)

func (r *Reconciler) resyncObject(
func (o *Operation) resyncObject(
ctx context.Context,
log logr.Logger,
gr metav1.GroupResource,
obj *metav1.PartialObjectMetadata,
ring *shardingv1alpha1.ControllerRing,
hashRing *consistenthash.Ring,
shards leases.Shards,
controlled bool,
obj *metav1.PartialObjectMetadata,
) error {
log = log.WithValues("object", client.ObjectKeyFromObject(obj))

Expand All @@ -223,8 +244,8 @@ func (r *Reconciler) resyncObject(
}

var (
desiredShard = hashRing.Hash(hashKey)
currentShard = obj.Labels[ring.LabelShard()]
desiredShard = o.HashRing.Hash(hashKey)
currentShard = obj.Labels[o.ControllerRing.LabelShard()]
)

if desiredShard == "" {
Expand All @@ -237,20 +258,20 @@ func (r *Reconciler) resyncObject(
return nil
}

if currentShard != "" && currentShard != desiredShard && shards.ByID(currentShard).State.IsAvailable() && !controlled {
if currentShard != "" && o.Shards.ByID(currentShard).State.IsAvailable() && !controlled {
// If the object should be moved and the current shard is still available, we need to drain it.
// We only drain non-controlled objects, the controller's main object is used as a synchronization point for
// preventing concurrent reconciliations.
log.V(1).Info("Draining object from shard", "currentShard", currentShard)

patch := client.MergeFromWithOptions(obj.DeepCopy(), client.MergeFromWithOptimisticLock{})
metav1.SetMetaDataLabel(&obj.ObjectMeta, ring.LabelDrain(), "true")
if err := r.Client.Patch(ctx, obj, patch); err != nil {
metav1.SetMetaDataLabel(&obj.ObjectMeta, o.ControllerRing.LabelDrain(), "true")
if err := o.Client.Patch(ctx, obj, patch); err != nil {
return fmt.Errorf("error draining %s %q: %w", gr.String(), client.ObjectKeyFromObject(obj), err)
}

shardingmetrics.DrainsTotal.WithLabelValues(
ring.Name, gr.Group, gr.Resource,
o.ControllerRing.Name, gr.Group, gr.Resource,
).Inc()

// object will go through the sharder webhook when shard removes the drain label, which will perform the assignment
Expand All @@ -264,14 +285,14 @@ func (r *Reconciler) resyncObject(
patch := client.MergeFromWithOptions(obj.DeepCopy(), client.MergeFromWithOptimisticLock{})
// remove drain label if it is still present, this might happen when trying to drain an object from a shard that
// just got unavailable
delete(obj.Labels, ring.LabelShard())
delete(obj.Labels, ring.LabelDrain())
if err := r.Client.Patch(ctx, obj, patch); err != nil {
delete(obj.Labels, o.ControllerRing.LabelShard())
delete(obj.Labels, o.ControllerRing.LabelDrain())
if err := o.Client.Patch(ctx, obj, patch); err != nil {
return fmt.Errorf("error triggering assignment for %s %q: %w", gr.String(), client.ObjectKeyFromObject(obj), err)
}

shardingmetrics.MovementsTotal.WithLabelValues(
ring.Name, gr.Group, gr.Resource,
o.ControllerRing.Name, gr.Group, gr.Resource,
).Inc()

return nil
Expand Down
Loading
Loading