Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions pkg/controller/controllerring/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,7 @@ func (r *Reconciler) reconcileWebhooks(ctx context.Context, controllerRing *shar
}

// add ring-specific path to webhook client config
webhookPath, err := sharder.WebhookPathFor(controllerRing)
if err != nil {
return err
}
webhookPath := sharder.WebhookPathForControllerRing(controllerRing)

if service := webhook.ClientConfig.Service; service != nil {
service.Path = ptr.To(path.Join(ptr.Deref(service.Path, ""), webhookPath))
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/sharder/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (r *Reconciler) resyncResource(
ctx context.Context,
log logr.Logger,
gr metav1.GroupResource,
ring sharding.Ring,
ring *shardingv1alpha1.ControllerRing,
namespaces sets.Set[string],
hashRing *consistenthash.Ring,
shards leases.Shards,
Expand Down Expand Up @@ -194,7 +194,7 @@ func (r *Reconciler) resyncObject(
log logr.Logger,
gr metav1.GroupResource,
obj *metav1.PartialObjectMetadata,
ring sharding.Ring,
ring *shardingv1alpha1.ControllerRing,
hashRing *consistenthash.Ring,
shards leases.Shards,
controlled bool,
Expand Down Expand Up @@ -243,7 +243,7 @@ func (r *Reconciler) resyncObject(
}

shardingmetrics.DrainsTotal.WithLabelValues(
ring.GetName(), gr.Group, gr.Resource,
ring.Name, gr.Group, gr.Resource,
).Inc()

// object will go through the sharder webhook when shard removes the drain label, which will perform the assignment
Expand All @@ -264,7 +264,7 @@ func (r *Reconciler) resyncObject(
}

shardingmetrics.MovementsTotal.WithLabelValues(
ring.GetName(), gr.Group, gr.Resource,
ring.Name, gr.Group, gr.Resource,
).Inc()

return nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/sharding/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
)

// KeyFuncForResource returns the key function that maps the given resource or its controller dependening on whether
// the resource is listed as a resource or controlled resource in the given ring.
func KeyFuncForResource(gr metav1.GroupResource, ring Ring) (KeyFunc, error) {
func KeyFuncForResource(gr metav1.GroupResource, ring *shardingv1alpha1.ControllerRing) (KeyFunc, error) {
ringResources := sets.New[metav1.GroupResource]()
controlledResources := sets.New[metav1.GroupResource]()

Expand Down
33 changes: 0 additions & 33 deletions pkg/sharding/ring.go

This file was deleted.

6 changes: 3 additions & 3 deletions pkg/sharding/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

coordinationv1 "k8s.io/api/coordination/v1"

"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding"
shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/consistenthash"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases"
shardingmetrics "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/metrics"
Expand All @@ -32,13 +32,13 @@ import (
// This is a central function in the sharding implementation bringing together the leases package with the
// consistenthash package.
// In short, it determines the subset of available shards and constructs a new consistenthash.Ring with it.
func FromLeases(ringObj sharding.Ring, leaseList *coordinationv1.LeaseList, now time.Time) (*consistenthash.Ring, leases.Shards) {
func FromLeases(controllerRing *shardingv1alpha1.ControllerRing, leaseList *coordinationv1.LeaseList, now time.Time) (*consistenthash.Ring, leases.Shards) {
// determine ready shards and calculate hash ring
shards := leases.ToShards(leaseList.Items, now)
availableShards := shards.AvailableShards().IDs()
ring := consistenthash.New(nil, 0, availableShards...)

shardingmetrics.RingCalculationsTotal.WithLabelValues(ringObj.GetName()).Inc()
shardingmetrics.RingCalculationsTotal.WithLabelValues(controllerRing.Name).Inc()

return ring, shards
}
37 changes: 10 additions & 27 deletions pkg/webhook/sharder/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,13 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding"
)

const (
// HandlerName is the name of the webhook handler.
HandlerName = "sharder"
// WebhookPathPrefix is the path prefix at which the handler should be registered.
WebhookPathPrefix = "/webhooks/sharder/"
)
Expand All @@ -58,41 +54,28 @@ func (h *Handler) AddToManager(mgr manager.Manager) error {

const pathControllerRing = "controllerring"

// WebhookPathFor returns the webhook handler path that should be used for implementing the given ring object.
// It is the reverse of RingForWebhookPath.
func WebhookPathFor(obj client.Object) (string, error) {
switch obj.(type) {
case *shardingv1alpha1.ControllerRing:
return path.Join(WebhookPathPrefix, pathControllerRing, obj.GetName()), nil
default:
return "", fmt.Errorf("unexpected kind %T", obj)
}
// WebhookPathForControllerRing returns the webhook handler path that should be used for implementing the given
// ControllerRing. It is the reverse of ControllerRingForWebhookPath.
func WebhookPathForControllerRing(ring *shardingv1alpha1.ControllerRing) string {
return path.Join(WebhookPathPrefix, pathControllerRing, ring.Name)
}

// RingForWebhookPath returns the ring object that is associated with the given webhook handler path.
// It is the reverse of WebhookPathFor.
func RingForWebhookPath(requestPath string) (sharding.Ring, error) {
// ControllerRingForWebhookPath returns the ControllerRing that is associated with the given webhook handler path.
// It is the reverse of WebhookPathForControllerRing.
func ControllerRingForWebhookPath(requestPath string) (*shardingv1alpha1.ControllerRing, error) {
if !strings.HasPrefix(requestPath, WebhookPathPrefix) {
return nil, fmt.Errorf("unexpected request path: %s", requestPath)
}

parts := strings.SplitN(strings.TrimPrefix(requestPath, WebhookPathPrefix), "/", 3)
if len(parts) < 2 {
if len(parts) != 2 {
return nil, fmt.Errorf("unexpected request path: %s", requestPath)
}

var ring sharding.Ring
switch parts[0] {
case pathControllerRing:
if len(parts) != 2 {
return nil, fmt.Errorf("unexpected request path: %s", requestPath)
}
ring = &shardingv1alpha1.ControllerRing{ObjectMeta: metav1.ObjectMeta{Name: parts[1]}}
default:
if parts[0] != pathControllerRing {
return nil, fmt.Errorf("unexpected request path: %s", requestPath)
}

return ring, nil
return &shardingv1alpha1.ControllerRing{ObjectMeta: metav1.ObjectMeta{Name: parts[1]}}, nil
}

type ctxKey int
Expand Down
29 changes: 15 additions & 14 deletions pkg/webhook/sharder/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding"
shardingmetrics "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/metrics"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/ring"
Expand All @@ -46,9 +47,9 @@ type Handler struct {
func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.Response {
log := logf.FromContext(ctx)

ringObj, err := RingForRequest(ctx, h.Reader)
controllerRing, err := ControllerRingForRequest(ctx, h.Reader)
if err != nil {
return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error determining ring for request: %w", err))
return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error determining ControllerRing for request: %w", err))
}

// Unfortunately, admission.Decoder / runtime.Decoder can't handle decoding into PartialObjectMetadata.
Expand All @@ -58,7 +59,7 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R
return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error decoding object: %w", err))
}

labelShard := ringObj.LabelShard()
labelShard := controllerRing.LabelShard()

// Don't touch labels that the object already has, we can't simply reassign it because the active shard might still
// be working on it.
Expand All @@ -69,7 +70,7 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R
keyFunc, err := sharding.KeyFuncForResource(metav1.GroupResource{
Group: req.Resource.Group,
Resource: req.Resource.Resource,
}, ringObj)
}, controllerRing)
if err != nil {
return admission.Errored(http.StatusBadRequest, fmt.Errorf("error deteriming hash key func for object: %w", err))
}
Expand All @@ -84,15 +85,15 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R

// collect list of shards in the ring
leaseList := &coordinationv1.LeaseList{}
if err := h.Reader.List(ctx, leaseList, client.MatchingLabelsSelector{Selector: ringObj.LeaseSelector()}); err != nil {
if err := h.Reader.List(ctx, leaseList, client.MatchingLabelsSelector{Selector: controllerRing.LeaseSelector()}); err != nil {
return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error listing Leases for ControllerRing: %w", err))
}

// get ring from cache and hash the object onto the ring
hashRing, _ := ring.FromLeases(ringObj, leaseList, h.Clock.Now())
hashRing, _ := ring.FromLeases(controllerRing, leaseList, h.Clock.Now())
shard := hashRing.Hash(key)

log.V(1).Info("Assigning object for ring", "ring", client.ObjectKeyFromObject(ringObj), "shard", shard)
log.V(1).Info("Assigning object for ControllerRing", "controllerRing", client.ObjectKeyFromObject(controllerRing), "shard", shard)

patches := make([]jsonpatch.JsonPatchOperation, 0, 2)
if obj.Labels == nil {
Expand All @@ -103,30 +104,30 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R

if !ptr.Deref(req.DryRun, false) {
shardingmetrics.AssignmentsTotal.WithLabelValues(
ringObj.GetName(), req.Resource.Group, req.Resource.Resource,
controllerRing.Name, req.Resource.Group, req.Resource.Resource,
).Inc()
}

return admission.Patched("assigning object", patches...)
}

// RingForRequest returns the Ring object matching the requests' path.
func RingForRequest(ctx context.Context, c client.Reader) (sharding.Ring, error) {
// ControllerRingForRequest returns the Ring object matching the requests' path.
func ControllerRingForRequest(ctx context.Context, c client.Reader) (*shardingv1alpha1.ControllerRing, error) {
requestPath, err := RequestPathFromContext(ctx)
if err != nil {
return nil, err
}

ring, err := RingForWebhookPath(requestPath)
controllerRing, err := ControllerRingForWebhookPath(requestPath)
if err != nil {
return nil, err
}

if err := c.Get(ctx, client.ObjectKeyFromObject(ring), ring); err != nil {
return nil, fmt.Errorf("error getting ring: %w", err)
if err := c.Get(ctx, client.ObjectKeyFromObject(controllerRing), controllerRing); err != nil {
return nil, fmt.Errorf("error getting ControllerRing: %w", err)
}

return ring, nil
return controllerRing, nil
}

// rfc6901Encoder can escape / characters in label keys for inclusion in JSON patch paths.
Expand Down
Loading