Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,30 +253,27 @@ func (c *controller) processNextWorkItem(ctx context.Context) bool {
// other workers.
defer c.queue.Done(key)

if requeue, err := c.process(ctx, key); err != nil {
if err := c.process(ctx, key); err != nil {
utilruntime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", ControllerName, key, err))
c.queue.AddRateLimited(key)
return true
} else if requeue {
c.queue.Add(key)
return true
}
c.queue.Forget(key)
return true
}

func (c *controller) process(ctx context.Context, key string) (bool, error) {
func (c *controller) process(ctx context.Context, key string) error {
clusterName, _, name, err := kcpcache.SplitMetaClusterNamespaceKey(key)
if err != nil {
utilruntime.HandleError(err)
return false, nil
return nil
}
obj, err := c.getCachedResourceEndpointSlice(clusterName.Path(), name)
if err != nil {
if errors.IsNotFound(err) {
return false, nil // object deleted before we handled it
return nil // object deleted before we handled it
}
return false, err
return err
}

old := obj
Expand All @@ -286,7 +283,7 @@ func (c *controller) process(ctx context.Context, key string) (bool, error) {
ctx = klog.NewContext(ctx, logger)

var errs []error
requeue, err := c.reconcile(ctx, obj)
err = c.reconcile(ctx, obj)
if err != nil {
errs = append(errs, err)
}
Expand All @@ -302,7 +299,7 @@ func (c *controller) process(ctx context.Context, key string) (bool, error) {
errs = append(errs, err)
}

return requeue, utilerrors.NewAggregate(errs)
return utilerrors.NewAggregate(errs)
}

// InstallIndexers adds the additional indexers that this controller requires to the informers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,10 @@ import (
cachev1alpha1 "github.com/kcp-dev/sdk/apis/cache/v1alpha1"
conditionsv1alpha1 "github.com/kcp-dev/sdk/apis/third_party/conditions/apis/conditions/v1alpha1"
"github.com/kcp-dev/sdk/apis/third_party/conditions/util/conditions"
topologyv1alpha1 "github.com/kcp-dev/sdk/apis/topology/v1alpha1"
)

func (c *controller) reconcile(ctx context.Context, endpoints *cachev1alpha1.CachedResourceEndpointSlice) (bool, error) {
r := &endpointsReconciler{
getCachedResource: c.getCachedResource,
getPartition: c.getPartition,
}

return r.reconcile(ctx, endpoints)
}

type endpointsReconciler struct {
getCachedResource func(path logicalcluster.Path, name string) (*cachev1alpha1.CachedResource, error)
getPartition func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.Partition, error)
}

func (r *endpointsReconciler) reconcile(ctx context.Context, endpoints *cachev1alpha1.CachedResourceEndpointSlice) (bool, error) {
_, err := r.getCachedResource(logicalcluster.From(endpoints).Path(), endpoints.Spec.CachedResource.Name)
func (c *controller) reconcile(ctx context.Context, endpoints *cachev1alpha1.CachedResourceEndpointSlice) error {
_, err := c.getCachedResource(logicalcluster.From(endpoints).Path(), endpoints.Spec.CachedResource.Name)
if err != nil {
if apierrors.IsNotFound(err) {
// Don't keep the endpoints if the CachedResource has been deleted.
Expand All @@ -60,7 +45,7 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, endpoints *cachev1a
endpoints.Spec.CachedResource.Name,
)
// No need to try again.
return false, nil
return nil
} else {
conditions.MarkFalse(
endpoints,
Expand All @@ -71,15 +56,15 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, endpoints *cachev1a
logicalcluster.From(endpoints),
endpoints.Spec.CachedResource.Name,
)
return true, err
return err
}
}
conditions.MarkTrue(endpoints, cachev1alpha1.CachedResourceValid)

// Check the partition selector.
var selector labels.Selector
if endpoints.Spec.Partition != "" {
partition, err := r.getPartition(logicalcluster.From(endpoints), endpoints.Spec.Partition)
partition, err := c.getPartition(logicalcluster.From(endpoints), endpoints.Spec.Partition)
if err != nil {
if apierrors.IsNotFound(err) {
// Don't keep the endpoints if the Partition has been deleted and is still referenced.
Expand All @@ -93,7 +78,7 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, endpoints *cachev1a
err,
)
// No need to try again.
return false, nil
return nil
} else {
conditions.MarkFalse(
endpoints,
Expand All @@ -103,7 +88,7 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, endpoints *cachev1a
"%v",
err,
)
return true, err
return err
}
}
selector, err = metav1.LabelSelectorAsSelector(partition.Spec.Selector)
Expand All @@ -116,7 +101,7 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, endpoints *cachev1a
"%v",
err,
)
return true, err
return err
}
}
if selector == nil {
Expand All @@ -127,5 +112,5 @@ func (r *endpointsReconciler) reconcile(ctx context.Context, endpoints *cachev1a

endpoints.Status.ShardSelector = selector.String()

return true, err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This here was the bug ^^ the reconciler would requeue the item indefinitely.

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestReconcile(t *testing.T) {
Partition: "my-partition",
},
}
_, err := c.reconcile(context.Background(), cachedResourceEndpointSlice)
err := c.reconcile(context.Background(), cachedResourceEndpointSlice)
if tc.wantError {
require.Error(t, err, "expected an error")
} else {
Expand Down