Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pkg/apis/sharding/v1alpha1/types_controllerring.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,3 @@ func (c *ControllerRing) LabelShard() string {
func (c *ControllerRing) LabelDrain() string {
return LabelDrain(c.Name)
}

// RingResources returns the list of resources that are distributed across shards in this ControllerRing.
func (c *ControllerRing) RingResources() []RingResource {
return c.Spec.Resources
}
6 changes: 0 additions & 6 deletions pkg/apis/sharding/v1alpha1/types_controllerring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,4 @@ var _ = Describe("ControllerRing", func() {
Expect(ring.LabelDrain()).To(Equal("drain.alpha.sharding.timebertt.dev/operator"))
})
})

Describe("#RingResources", func() {
It("should return the specified resources", func() {
Expect(ring.RingResources()).To(Equal(ring.Spec.Resources))
})
})
})
12 changes: 6 additions & 6 deletions pkg/controller/sharder/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (

configv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/config/v1alpha1"
shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/consistenthash"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/key"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases"
shardingmetrics "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/metrics"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/ring"
Expand Down Expand Up @@ -201,22 +201,22 @@ func (r *Reconciler) resyncObject(
) error {
log = log.WithValues("object", client.ObjectKeyFromObject(obj))

keyFunc := sharding.KeyForObject
keyFunc := key.ForObject
if controlled {
keyFunc = sharding.KeyForController
keyFunc = key.ForController
}

key, err := keyFunc(obj)
hashKey, err := keyFunc(obj)
if err != nil {
return err
}
if key == "" {
if hashKey == "" {
// object should not be assigned
return nil
}

var (
desiredShard = hashRing.Hash(key)
desiredShard = hashRing.Hash(hashKey)
currentShard = obj.Labels[ring.LabelShard()]
)

Expand Down
46 changes: 26 additions & 20 deletions pkg/sharding/key.go → pkg/sharding/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,26 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package sharding
package key

import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
)

// KeyFuncForResource returns the key function that maps the given resource or its controller depending on whether
// FuncForResource returns the key function that maps the given resource or its controller depending on whether
// the resource is listed as a resource or controlled resource in the given ring.
func KeyFuncForResource(gr metav1.GroupResource, ring *shardingv1alpha1.ControllerRing) (KeyFunc, error) {
func FuncForResource(gr metav1.GroupResource, ring *shardingv1alpha1.ControllerRing) (Func, error) {
ringResources := sets.New[metav1.GroupResource]()
controlledResources := sets.New[metav1.GroupResource]()

for _, ringResource := range ring.RingResources() {
for _, ringResource := range ring.Spec.Resources {
ringResources.Insert(ringResource.GroupResource)

for _, controlledResource := range ringResource.ControlledResources {
Expand All @@ -42,24 +43,24 @@ func KeyFuncForResource(gr metav1.GroupResource, ring *shardingv1alpha1.Controll

switch {
case ringResources.Has(gr):
return KeyForObject, nil
return ForObject, nil
case controlledResources.Has(gr):
return KeyForController, nil
return ForController, nil
}

return nil, fmt.Errorf("object's resource %q was not found in Ring", gr.String())
return nil, fmt.Errorf("object's resource %q was not found in ControllerRing", gr.String())
}

// KeyFunc maps objects to hash keys.
// It returns an error if the prequisities for sharding the given object are not fulfilled.
// Func maps objects to hash keys.
// It returns an error if the prerequisites for sharding the given object are not fulfilled.
// If the returned key is empty, the object should not be assigned.
type KeyFunc func(client.Object) (string, error)
type Func func(client.Object) (string, error)

// KeyForObject returns a ring key for the given object itself.
// ForObject returns a ring key for the given object itself.
// It needs the TypeMeta (GVK) to be set, which is not set on objects after decoding by default.
func KeyForObject(obj client.Object) (string, error) {
func ForObject(obj client.Object) (string, error) {
// We can't use the object's UID, as it is unset during admission for CREATE requests.
// Instead, we need to calculate a unique ID ourselves. The ID has this pattern (see keyForMetadata):
// Instead, we need to calculate a unique ID ourselves. The ID has this pattern (see forMetadata):
// group/version/kind/namespace/name
// With this, different object instances with the same name will use the same hash key, which sounds acceptable.
// We can only use fields that are also present in owner references as we need to assign owners and ownees to the same
Expand All @@ -77,7 +78,7 @@ func KeyForObject(obj client.Object) (string, error) {
// object ID that we can also reconstruct later on for owned objects just by looking at the object itself.
// We could use a cache lookup though, but this would restrict scalability of the sharding solution again.
// Generally, this tradeoff seems acceptable, as generateName is mostly used on owned objects, but rarely the
// owner itself. In such case, KeyForController will be used instead, which doesn't care about the object's own
// owner itself. In such case, ForController will be used instead, which doesn't care about the object's own
// name but only that of the owner.
// If generateName is used nevertheless, respond with a proper error.
// We could assign the object after creation, however we can't use a watch cache because of the mentioned
Expand All @@ -90,12 +91,12 @@ func KeyForObject(obj client.Object) (string, error) {

// Namespace can be empty for cluster-scoped resources. Only check the name field as an optimistic check for
// preventing wrong usage of the function.
return keyForMetadata(gvk.GroupVersion().String(), gvk.Kind, obj.GetNamespace(), obj.GetName()), nil
return forMetadata(gvk.Group, gvk.Kind, obj.GetNamespace(), obj.GetName()), nil
}

// KeyForController returns a ring key for the controller of the given object.
// ForController returns a ring key for the controller of the given object.
// It returns an empty key if the object doesn't have an ownerReference with controller=true".
func KeyForController(obj client.Object) (string, error) {
func ForController(obj client.Object) (string, error) {
ref := metav1.GetControllerOf(obj)
if ref == nil {
return "", nil
Expand All @@ -111,11 +112,16 @@ func KeyForController(obj client.Object) (string, error) {
return "", fmt.Errorf("name of controller reference must not be empty")
}

gv, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
return "", fmt.Errorf("invalid apiVersion of controller reference: %w", err)
}

// Namespace can be empty for cluster-scoped resources. Only check the other fields as an optimistic check for
// preventing wrong usage of the function.
return keyForMetadata(ref.APIVersion, ref.Kind, obj.GetNamespace(), ref.Name), nil
return forMetadata(gv.Group, ref.Kind, obj.GetNamespace(), ref.Name), nil
}

func keyForMetadata(apiVersion, kind, namespace, name string) string {
return apiVersion + "/" + kind + "/" + namespace + "/" + name
func forMetadata(group, kind, namespace, name string) string {
return group + "/" + kind + "/" + namespace + "/" + name
}
29 changes: 29 additions & 0 deletions pkg/sharding/key/key_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2025 Tim Ebert.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package key_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestKey(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Sharding Key Suite")
}
185 changes: 185 additions & 0 deletions pkg/sharding/key/key_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
Copyright 2025 Tim Ebert.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package key_test

import (
"reflect"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gcustom"
gomegatypes "github.com/onsi/gomega/types"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
. "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/key"
)

var _ = Describe("#FuncForResource", func() {
var controllerRing *shardingv1alpha1.ControllerRing

BeforeEach(func() {
controllerRing = &shardingv1alpha1.ControllerRing{
Spec: shardingv1alpha1.ControllerRingSpec{
Resources: []shardingv1alpha1.RingResource{
{
GroupResource: metav1.GroupResource{
Group: "operator",
Resource: "foo",
},
ControlledResources: []metav1.GroupResource{
{
Group: "operator",
Resource: "controlled",
},
{
Resource: "foo",
},
},
},
{
GroupResource: metav1.GroupResource{
Resource: "foo",
},
},
},
},
}
})

It("should return an error if the resource is not part of the ring", func() {
Expect(FuncForResource(metav1.GroupResource{
Resource: "bar",
}, controllerRing)).Error().To(
MatchError(ContainSubstring("not found")),
)
})

It("should return ForObject if the resource is a main resource of the ring", func() {
Expect(FuncForResource(metav1.GroupResource{
Group: "operator",
Resource: "foo",
}, controllerRing)).To(
beFunc(ForObject),
)
})

It("should return ForController if the resource is a controlled resource of the ring", func() {
Expect(FuncForResource(metav1.GroupResource{
Group: "operator",
Resource: "controlled",
}, controllerRing)).To(
beFunc(ForController),
)
})

It("should return ForObject if the resource is a main and controlled resource of the ring", func() {
Expect(FuncForResource(metav1.GroupResource{
Resource: "foo",
}, controllerRing)).To(
beFunc(ForObject),
)
})
})

var _ = Describe("#ForObject", func() {
var obj *appsv1.Deployment

BeforeEach(func() {
obj = &appsv1.Deployment{}
obj.GetObjectKind().SetGroupVersionKind(appsv1.SchemeGroupVersion.WithKind("Deployment"))
obj.Name = "foo"
obj.Namespace = "bar"
})

It("should return an error if the object has no TypeMeta", func() {
Expect(ForObject(&appsv1.Deployment{})).Error().To(MatchError("apiVersion and kind must not be empty"))
})

It("should return an error if the object has no name", func() {
obj.Name = ""
Expect(ForObject(obj)).Error().To(MatchError("name must not be empty"))

obj.GenerateName = "foo-"
Expect(ForObject(obj)).Error().To(MatchError(ContainSubstring("generateName is not supported")))
})

It("should return the object's hash key", func() {
Expect(ForObject(obj)).To(Equal("apps/Deployment/bar/foo"))
})
})

var _ = Describe("#ForController", func() {
var obj *appsv1.Deployment

BeforeEach(func() {
obj = &appsv1.Deployment{}
obj.SetOwnerReferences([]metav1.OwnerReference{
{
APIVersion: "other/v1",
Kind: "Bar",
Name: "owner-but-not-controller",
},
{
APIVersion: "operator/v1",
Kind: "Foo",
Name: "foo",
Controller: ptr.To(true),
},
})
obj.Namespace = "bar"
})

It("should return an empty key if the object has no controller ref", func() {
Expect(ForController(&appsv1.Deployment{})).To(BeEmpty())

obj.OwnerReferences[1].Controller = nil
Expect(ForController(obj)).To(BeEmpty())
})

It("should return an error if the controller ref has no apiVersion", func() {
obj.OwnerReferences[1].APIVersion = ""
Expect(ForController(obj)).Error().To(MatchError("apiVersion of controller reference must not be empty"))
})

It("should return an error if the controller ref has no kind", func() {
obj.OwnerReferences[1].Kind = ""
Expect(ForController(obj)).Error().To(MatchError("kind of controller reference must not be empty"))
})

It("should return an error if the controller ref has no name", func() {
obj.OwnerReferences[1].Name = ""
Expect(ForController(obj)).Error().To(MatchError("name of controller reference must not be empty"))
})

It("should return an error if the controller ref has an invalid apiVersion", func() {
obj.OwnerReferences[1].APIVersion = "foo/bar/v1"
Expect(ForController(obj)).Error().To(MatchError(ContainSubstring("invalid apiVersion of controller reference")))
})

It("should return the controller's hash key", func() {
Expect(ForController(obj)).To(Equal("operator/Foo/bar/foo"))
})
})

func beFunc(expected Func) gomegatypes.GomegaMatcher {
return gcustom.MakeMatcher(func(actual Func) (bool, error) {
return reflect.ValueOf(expected).Pointer() == reflect.ValueOf(actual).Pointer(), nil
})
}
Loading
Loading