diff --git a/pkg/apis/sharding/v1alpha1/types_controllerring.go b/pkg/apis/sharding/v1alpha1/types_controllerring.go index 439ad4d1..24d36981 100644 --- a/pkg/apis/sharding/v1alpha1/types_controllerring.go +++ b/pkg/apis/sharding/v1alpha1/types_controllerring.go @@ -130,8 +130,3 @@ func (c *ControllerRing) LabelShard() string { func (c *ControllerRing) LabelDrain() string { return LabelDrain(c.Name) } - -// RingResources returns the list of resources that are distributed across shards in this ControllerRing. -func (c *ControllerRing) RingResources() []RingResource { - return c.Spec.Resources -} diff --git a/pkg/apis/sharding/v1alpha1/types_controllerring_test.go b/pkg/apis/sharding/v1alpha1/types_controllerring_test.go index c95411a4..3f9fc8cf 100644 --- a/pkg/apis/sharding/v1alpha1/types_controllerring_test.go +++ b/pkg/apis/sharding/v1alpha1/types_controllerring_test.go @@ -64,10 +64,4 @@ var _ = Describe("ControllerRing", func() { Expect(ring.LabelDrain()).To(Equal("drain.alpha.sharding.timebertt.dev/operator")) }) }) - - Describe("#RingResources", func() { - It("should return the specified resources", func() { - Expect(ring.RingResources()).To(Equal(ring.Spec.Resources)) - }) - }) }) diff --git a/pkg/controller/sharder/reconciler.go b/pkg/controller/sharder/reconciler.go index 512b9e76..1e049d8d 100644 --- a/pkg/controller/sharder/reconciler.go +++ b/pkg/controller/sharder/reconciler.go @@ -35,8 +35,8 @@ import ( configv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/config/v1alpha1" shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1" - "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding" "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/consistenthash" + "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/key" "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases" shardingmetrics "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/metrics" "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/ring" @@ -201,22 +201,22 @@ func (r *Reconciler) resyncObject( ) error { log = log.WithValues("object", client.ObjectKeyFromObject(obj)) - keyFunc := sharding.KeyForObject + keyFunc := key.ForObject if controlled { - keyFunc = sharding.KeyForController + keyFunc = key.ForController } - key, err := keyFunc(obj) + hashKey, err := keyFunc(obj) if err != nil { return err } - if key == "" { + if hashKey == "" { // object should not be assigned return nil } var ( - desiredShard = hashRing.Hash(key) + desiredShard = hashRing.Hash(hashKey) currentShard = obj.Labels[ring.LabelShard()] ) diff --git a/pkg/sharding/key.go b/pkg/sharding/key/key.go similarity index 72% rename from pkg/sharding/key.go rename to pkg/sharding/key/key.go index 6d1ba061..5c2212c2 100644 --- a/pkg/sharding/key.go +++ b/pkg/sharding/key/key.go @@ -14,25 +14,26 @@ See the License for the specific language governing permissions and limitations under the License. */ -package sharding +package key import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1" ) -// KeyFuncForResource returns the key function that maps the given resource or its controller depending on whether +// FuncForResource returns the key function that maps the given resource or its controller depending on whether // the resource is listed as a resource or controlled resource in the given ring. -func KeyFuncForResource(gr metav1.GroupResource, ring *shardingv1alpha1.ControllerRing) (KeyFunc, error) { +func FuncForResource(gr metav1.GroupResource, ring *shardingv1alpha1.ControllerRing) (Func, error) { ringResources := sets.New[metav1.GroupResource]() controlledResources := sets.New[metav1.GroupResource]() - for _, ringResource := range ring.RingResources() { + for _, ringResource := range ring.Spec.Resources { ringResources.Insert(ringResource.GroupResource) for _, controlledResource := range ringResource.ControlledResources { @@ -42,24 +43,24 @@ func KeyFuncForResource(gr metav1.GroupResource, ring *shardingv1alpha1.Controll switch { case ringResources.Has(gr): - return KeyForObject, nil + return ForObject, nil case controlledResources.Has(gr): - return KeyForController, nil + return ForController, nil } - return nil, fmt.Errorf("object's resource %q was not found in Ring", gr.String()) + return nil, fmt.Errorf("object's resource %q was not found in ControllerRing", gr.String()) } -// KeyFunc maps objects to hash keys. -// It returns an error if the prequisities for sharding the given object are not fulfilled. +// Func maps objects to hash keys. +// It returns an error if the prerequisites for sharding the given object are not fulfilled. // If the returned key is empty, the object should not be assigned. -type KeyFunc func(client.Object) (string, error) +type Func func(client.Object) (string, error) -// KeyForObject returns a ring key for the given object itself. +// ForObject returns a ring key for the given object itself. // It needs the TypeMeta (GVK) to be set, which is not set on objects after decoding by default. -func KeyForObject(obj client.Object) (string, error) { +func ForObject(obj client.Object) (string, error) { // We can't use the object's UID, as it is unset during admission for CREATE requests. - // Instead, we need to calculate a unique ID ourselves. The ID has this pattern (see keyForMetadata): + // Instead, we need to calculate a unique ID ourselves. The ID has this pattern (see forMetadata): // group/version/kind/namespace/name // With this, different object instances with the same name will use the same hash key, which sounds acceptable. // We can only use fields that are also present in owner references as we need to assign owners and ownees to the same @@ -77,7 +78,7 @@ func KeyForObject(obj client.Object) (string, error) { // object ID that we can also reconstruct later on for owned objects just by looking at the object itself. // We could use a cache lookup though, but this would restrict scalability of the sharding solution again. // Generally, this tradeoff seems acceptable, as generateName is mostly used on owned objects, but rarely the - // owner itself. In such case, KeyForController will be used instead, which doesn't care about the object's own + // owner itself. In such case, ForController will be used instead, which doesn't care about the object's own // name but only that of the owner. // If generateName is used nevertheless, respond with a proper error. // We could assign the object after creation, however we can't use a watch cache because of the mentioned @@ -90,12 +91,12 @@ func KeyForObject(obj client.Object) (string, error) { // Namespace can be empty for cluster-scoped resources. Only check the name field as an optimistic check for // preventing wrong usage of the function. - return keyForMetadata(gvk.GroupVersion().String(), gvk.Kind, obj.GetNamespace(), obj.GetName()), nil + return forMetadata(gvk.Group, gvk.Kind, obj.GetNamespace(), obj.GetName()), nil } -// KeyForController returns a ring key for the controller of the given object. +// ForController returns a ring key for the controller of the given object. // It returns an empty key if the object doesn't have an ownerReference with controller=true". -func KeyForController(obj client.Object) (string, error) { +func ForController(obj client.Object) (string, error) { ref := metav1.GetControllerOf(obj) if ref == nil { return "", nil @@ -111,11 +112,16 @@ func KeyForController(obj client.Object) (string, error) { return "", fmt.Errorf("name of controller reference must not be empty") } + gv, err := schema.ParseGroupVersion(ref.APIVersion) + if err != nil { + return "", fmt.Errorf("invalid apiVersion of controller reference: %w", err) + } + // Namespace can be empty for cluster-scoped resources. Only check the other fields as an optimistic check for // preventing wrong usage of the function. - return keyForMetadata(ref.APIVersion, ref.Kind, obj.GetNamespace(), ref.Name), nil + return forMetadata(gv.Group, ref.Kind, obj.GetNamespace(), ref.Name), nil } -func keyForMetadata(apiVersion, kind, namespace, name string) string { - return apiVersion + "/" + kind + "/" + namespace + "/" + name +func forMetadata(group, kind, namespace, name string) string { + return group + "/" + kind + "/" + namespace + "/" + name } diff --git a/pkg/sharding/key/key_suite_test.go b/pkg/sharding/key/key_suite_test.go new file mode 100644 index 00000000..d0f3817c --- /dev/null +++ b/pkg/sharding/key/key_suite_test.go @@ -0,0 +1,29 @@ +/* +Copyright 2025 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package key_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestKey(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Sharding Key Suite") +} diff --git a/pkg/sharding/key/key_test.go b/pkg/sharding/key/key_test.go new file mode 100644 index 00000000..8cc8d89e --- /dev/null +++ b/pkg/sharding/key/key_test.go @@ -0,0 +1,185 @@ +/* +Copyright 2025 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package key_test + +import ( + "reflect" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gcustom" + gomegatypes "github.com/onsi/gomega/types" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + + shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1" + . "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/key" +) + +var _ = Describe("#FuncForResource", func() { + var controllerRing *shardingv1alpha1.ControllerRing + + BeforeEach(func() { + controllerRing = &shardingv1alpha1.ControllerRing{ + Spec: shardingv1alpha1.ControllerRingSpec{ + Resources: []shardingv1alpha1.RingResource{ + { + GroupResource: metav1.GroupResource{ + Group: "operator", + Resource: "foo", + }, + ControlledResources: []metav1.GroupResource{ + { + Group: "operator", + Resource: "controlled", + }, + { + Resource: "foo", + }, + }, + }, + { + GroupResource: metav1.GroupResource{ + Resource: "foo", + }, + }, + }, + }, + } + }) + + It("should return an error if the resource is not part of the ring", func() { + Expect(FuncForResource(metav1.GroupResource{ + Resource: "bar", + }, controllerRing)).Error().To( + MatchError(ContainSubstring("not found")), + ) + }) + + It("should return ForObject if the resource is a main resource of the ring", func() { + Expect(FuncForResource(metav1.GroupResource{ + Group: "operator", + Resource: "foo", + }, controllerRing)).To( + beFunc(ForObject), + ) + }) + + It("should return ForController if the resource is a controlled resource of the ring", func() { + Expect(FuncForResource(metav1.GroupResource{ + Group: "operator", + Resource: "controlled", + }, controllerRing)).To( + beFunc(ForController), + ) + }) + + It("should return ForObject if the resource is a main and controlled resource of the ring", func() { + Expect(FuncForResource(metav1.GroupResource{ + Resource: "foo", + }, controllerRing)).To( + beFunc(ForObject), + ) + }) +}) + +var _ = Describe("#ForObject", func() { + var obj *appsv1.Deployment + + BeforeEach(func() { + obj = &appsv1.Deployment{} + obj.GetObjectKind().SetGroupVersionKind(appsv1.SchemeGroupVersion.WithKind("Deployment")) + obj.Name = "foo" + obj.Namespace = "bar" + }) + + It("should return an error if the object has no TypeMeta", func() { + Expect(ForObject(&appsv1.Deployment{})).Error().To(MatchError("apiVersion and kind must not be empty")) + }) + + It("should return an error if the object has no name", func() { + obj.Name = "" + Expect(ForObject(obj)).Error().To(MatchError("name must not be empty")) + + obj.GenerateName = "foo-" + Expect(ForObject(obj)).Error().To(MatchError(ContainSubstring("generateName is not supported"))) + }) + + It("should return the object's hash key", func() { + Expect(ForObject(obj)).To(Equal("apps/Deployment/bar/foo")) + }) +}) + +var _ = Describe("#ForController", func() { + var obj *appsv1.Deployment + + BeforeEach(func() { + obj = &appsv1.Deployment{} + obj.SetOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: "other/v1", + Kind: "Bar", + Name: "owner-but-not-controller", + }, + { + APIVersion: "operator/v1", + Kind: "Foo", + Name: "foo", + Controller: ptr.To(true), + }, + }) + obj.Namespace = "bar" + }) + + It("should return an empty key if the object has no controller ref", func() { + Expect(ForController(&appsv1.Deployment{})).To(BeEmpty()) + + obj.OwnerReferences[1].Controller = nil + Expect(ForController(obj)).To(BeEmpty()) + }) + + It("should return an error if the controller ref has no apiVersion", func() { + obj.OwnerReferences[1].APIVersion = "" + Expect(ForController(obj)).Error().To(MatchError("apiVersion of controller reference must not be empty")) + }) + + It("should return an error if the controller ref has no kind", func() { + obj.OwnerReferences[1].Kind = "" + Expect(ForController(obj)).Error().To(MatchError("kind of controller reference must not be empty")) + }) + + It("should return an error if the controller ref has no name", func() { + obj.OwnerReferences[1].Name = "" + Expect(ForController(obj)).Error().To(MatchError("name of controller reference must not be empty")) + }) + + It("should return an error if the controller ref has an invalid apiVersion", func() { + obj.OwnerReferences[1].APIVersion = "foo/bar/v1" + Expect(ForController(obj)).Error().To(MatchError(ContainSubstring("invalid apiVersion of controller reference"))) + }) + + It("should return the controller's hash key", func() { + Expect(ForController(obj)).To(Equal("operator/Foo/bar/foo")) + }) +}) + +func beFunc(expected Func) gomegatypes.GomegaMatcher { + return gcustom.MakeMatcher(func(actual Func) (bool, error) { + return reflect.ValueOf(expected).Pointer() == reflect.ValueOf(actual).Pointer(), nil + }) +} diff --git a/pkg/sharding/ring/ring_suite_test.go b/pkg/sharding/ring/ring_suite_test.go new file mode 100644 index 00000000..2f5bea67 --- /dev/null +++ b/pkg/sharding/ring/ring_suite_test.go @@ -0,0 +1,29 @@ +/* +Copyright 2025 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ring_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestRing(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Sharding Ring Suite") +} diff --git a/pkg/sharding/ring/ring_test.go b/pkg/sharding/ring/ring_test.go new file mode 100644 index 00000000..2bd1ac06 --- /dev/null +++ b/pkg/sharding/ring/ring_test.go @@ -0,0 +1,96 @@ +/* +Copyright 2025 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ring_test + +import ( + "strconv" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + coordinationv1 "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/ptr" + + shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1" + "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/consistenthash" + . "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/ring" +) + +var _ = Describe("FromLeases", func() { + var ( + now time.Time + controllerRing *shardingv1alpha1.ControllerRing + leaseList *coordinationv1.LeaseList + ) + + BeforeEach(func() { + now = time.Now() + controllerRing = &shardingv1alpha1.ControllerRing{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + + leaseList = &coordinationv1.LeaseList{} + + leaseTemplate := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo-0", + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: ptr.To("foo-0"), + LeaseDurationSeconds: ptr.To[int32](10), + AcquireTime: ptr.To(metav1.NewMicroTime(now.Add(-5 * time.Minute))), + RenewTime: ptr.To(metav1.NewMicroTime(now.Add(-2 * time.Second))), + }, + } + + lease := leaseTemplate.DeepCopy() + leaseList.Items = append(leaseList.Items, *lease) + + lease = leaseTemplate.DeepCopy() + lease.Name = "foo-1" + lease.Spec.HolderIdentity = ptr.To("foo-1") + leaseList.Items = append(leaseList.Items, *lease) + + lease = leaseTemplate.DeepCopy() + lease.Name = "foo-2" + lease.Spec.HolderIdentity = nil + leaseList.Items = append(leaseList.Items, *lease) + + lease = leaseTemplate.DeepCopy() + lease.Name = "foo-3" + lease.Spec.RenewTime = ptr.To(metav1.NewMicroTime(now.Add(-time.Minute))) + leaseList.Items = append(leaseList.Items, *lease) + }) + + It("should create a ring from the available shards", func() { + ring, shards := FromLeases(controllerRing, leaseList, now) + Expect(ring).NotTo(BeNil()) + Expect(shards).To(HaveLen(4)) + Expect(shards.AvailableShards().IDs()).To(ConsistOf("foo-0", "foo-1")) + Expect(probeRingNodes(ring)).To(ConsistOf("foo-0", "foo-1")) + }) +}) + +func probeRingNodes(ring *consistenthash.Ring) []string { + nodes := sets.New[string]() + + for i := 0; i < 1000; i++ { + nodes.Insert(ring.Hash(strconv.Itoa(i))) + } + + return nodes.UnsortedList() +} diff --git a/pkg/webhook/sharder/handler.go b/pkg/webhook/sharder/handler.go index f99f33f7..f4b3843a 100644 --- a/pkg/webhook/sharder/handler.go +++ b/pkg/webhook/sharder/handler.go @@ -33,7 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1" - "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding" + "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/key" shardingmetrics "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/metrics" "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/ring" ) @@ -67,7 +67,7 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R return admission.Allowed("object is already assigned") } - keyFunc, err := sharding.KeyFuncForResource(metav1.GroupResource{ + keyFunc, err := key.FuncForResource(metav1.GroupResource{ Group: req.Resource.Group, Resource: req.Resource.Resource, }, controllerRing) @@ -75,11 +75,11 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R return admission.Errored(http.StatusBadRequest, fmt.Errorf("error deteriming hash key func for object: %w", err)) } - key, err := keyFunc(obj) + hashKey, err := keyFunc(obj) if err != nil { return admission.Errored(http.StatusBadRequest, fmt.Errorf("error calculating hash key for object: %w", err)) } - if key == "" { + if hashKey == "" { return admission.Allowed("object should not be assigned") } @@ -91,7 +91,7 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R // get ring from cache and hash the object onto the ring hashRing, _ := ring.FromLeases(controllerRing, leaseList, h.Clock.Now()) - shard := hashRing.Hash(key) + shard := hashRing.Hash(hashKey) log.V(1).Info("Assigning object for ControllerRing", "controllerRing", client.ObjectKeyFromObject(controllerRing), "shard", shard)