Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions pkg/shard/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io/fs"
"os"

coordinationv1 "k8s.io/api/coordination/v1"
Expand Down Expand Up @@ -60,6 +61,10 @@ func NewResourceLock(config *rest.Config, eventRecorder resourcelock.EventRecord
return nil, err
}

if options.ControllerRingName == "" {
return nil, errors.New("ControllerRingName is required")
}

// default shard name to hostname if not set
if options.ShardName == "" {
options.ShardName, err = os.Hostname()
Expand All @@ -76,7 +81,7 @@ func NewResourceLock(config *rest.Config, eventRecorder resourcelock.EventRecord
}
}

ll := &LeaseLock{
return &LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Namespace: options.LeaseNamespace,
Name: options.ShardName,
Expand All @@ -90,9 +95,7 @@ func NewResourceLock(config *rest.Config, eventRecorder resourcelock.EventRecord
Labels: map[string]string{
shardingv1alpha1.LabelControllerRing: options.ControllerRingName,
},
}

return ll, nil
}, nil
}

// LeaseLock implements resourcelock.Interface but is able to add labels to the Lease.
Expand Down Expand Up @@ -140,7 +143,7 @@ func (ll *LeaseLock) Create(ctx context.Context, ler resourcelock.LeaderElection
// Update will update an existing Lease spec.
func (ll *LeaseLock) Update(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
if ll.lease == nil {
return errors.New("lease not initialized, call get or create first")
return errors.New("lease not initialized, call Get or Create first")
}
// don't set labels map on ll.lease directly, otherwise we might overwrite labels managed by the sharder
mergeLabels(&ll.lease.ObjectMeta, ll.Labels)
Expand Down Expand Up @@ -185,19 +188,25 @@ func mergeLabels(obj *metav1.ObjectMeta, labels map[string]string) {
}
}

const inClusterNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
// allow overwriting file system for testing purposes
var fsys = os.DirFS("/")

const inClusterNamespacePath = "var/run/secrets/kubernetes.io/serviceaccount/namespace"

// getInClusterNamespace determines the namespace that this binary is running in, if running in a cluster.
// For this, it consults the ServiceAccount mount. If the binary is not running in a cluster or the pod doesn't mount a
// ServiceAccount, it returns an error.
func getInClusterNamespace() (string, error) {
// Check whether the namespace file exists.
// If not, we are not running in cluster so can't guess the namespace.
if _, err := os.Stat(inClusterNamespacePath); os.IsNotExist(err) {
return "", fmt.Errorf("not running in-cluster, please specify LeaseNamespace")
if _, err := fs.Stat(fsys, inClusterNamespacePath); os.IsNotExist(err) {
return "", fmt.Errorf("not running in cluster, please specify LeaseNamespace")
} else if err != nil {
return "", fmt.Errorf("error checking namespace file: %w", err)
}

// Load the namespace file and return its content
namespace, err := os.ReadFile(inClusterNamespacePath)
namespace, err := fs.ReadFile(fsys, inClusterNamespacePath)
if err != nil {
return "", fmt.Errorf("error reading namespace file: %w", err)
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/shard/lease/lease_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2025 Tim Ebert.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lease_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestLease(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Shard Library Lease Suite")
}
288 changes: 288 additions & 0 deletions pkg/shard/lease/lease_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
/*
Copyright 2025 Tim Ebert.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lease

import (
"context"
"os"
"testing/fstest"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
coordinationv1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

. "github.com/timebertt/kubernetes-controller-sharding/pkg/utils/test/matchers"
)

var _ = Describe("LeaseLock", func() {
const (
namespace = "default"
controllerRingName = "operator"
shardName = "operator-a"
)

var ctx context.Context

BeforeEach(func() {
ctx = context.Background()

fsys = fstest.MapFS{
"var/run/secrets/kubernetes.io/serviceaccount/namespace": &fstest.MapFile{
Data: []byte(namespace),
},
}
})

Describe("#NewResourceLock", func() {
var (
restConfig *rest.Config

options Options
)

BeforeEach(func() {
restConfig = &rest.Config{}

options = Options{
ControllerRingName: controllerRingName,
LeaseNamespace: "operator-system",
ShardName: shardName,
}
})

It("should fail if ControllerRingName is empty", func() {
options.ControllerRingName = ""

Expect(NewResourceLock(restConfig, nil, options)).Error().To(MatchError("ControllerRingName is required"))
})

It("should use the configured namespace and name", func() {
resourceLock, err := NewResourceLock(restConfig, nil, options)
Expect(err).NotTo(HaveOccurred())

leaseLock := resourceLock.(*LeaseLock)
Expect(leaseLock.LeaseMeta.Namespace).To(Equal(options.LeaseNamespace))
Expect(leaseLock.LeaseMeta.Name).To(Equal(options.ShardName))
Expect(leaseLock.Identity()).To(Equal(leaseLock.LeaseMeta.Name), "identity should equal the shard name")
})

It("should default the name to the hostname", func() {
options.ShardName = ""
hostname, err := os.Hostname()
Expect(err).NotTo(HaveOccurred())

resourceLock, err := NewResourceLock(restConfig, nil, options)
Expect(err).NotTo(HaveOccurred())

leaseLock := resourceLock.(*LeaseLock)
Expect(leaseLock.LeaseMeta.Name).To(Equal(hostname))
Expect(leaseLock.Identity()).To(Equal(leaseLock.LeaseMeta.Name), "identity should equal the shard name")
})

It("should default the namespace to the in-cluster namespace", func() {
options.LeaseNamespace = ""

resourceLock, err := NewResourceLock(restConfig, nil, options)
Expect(err).NotTo(HaveOccurred())

leaseLock := resourceLock.(*LeaseLock)
Expect(leaseLock.LeaseMeta.Namespace).To(Equal(namespace))
})

It("should fail if the in-cluster namespace cannot be determined", func() {
options.LeaseNamespace = ""
fsys = fstest.MapFS{}

Expect(NewResourceLock(restConfig, nil, options)).Error().To(MatchError(And(
ContainSubstring("not running in cluster"),
ContainSubstring("please specify LeaseNamespace"),
)))
})
})

Describe("#LeaseLock", func() {
var (
lock resourcelock.Interface
fakeClient coordinationv1client.LeasesGetter

lease *coordinationv1.Lease
)

BeforeEach(func() {
var err error
lock, err = NewResourceLock(&rest.Config{}, nil, Options{
ControllerRingName: controllerRingName,
LeaseNamespace: namespace,
ShardName: shardName,
})
Expect(err).NotTo(HaveOccurred())

fakeClient = fake.NewClientset().CoordinationV1()
lock.(*LeaseLock).Client = fakeClient

lease = &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: shardName,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: ptr.To(shardName),
},
}
Expect(fakeClient.Leases(lease.Namespace).Create(ctx, lease, metav1.CreateOptions{})).Error().To(Succeed())
})

Describe("#Get", func() {
It("should return NotFound if the lease does not exist", func() {
Expect(fakeClient.Leases(lease.Namespace).Delete(ctx, lease.Name, metav1.DeleteOptions{})).To(Succeed())
Expect(lock.Get(ctx)).Error().To(BeNotFoundError())
})

It("should return the existing lease", func() {
record, _, err := lock.Get(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(record).NotTo(BeNil())
Expect(record.HolderIdentity).To(Equal(*lease.Spec.HolderIdentity))
})
})

Describe("#Create", func() {
It("should create the lease if it does not exist", func() {
Expect(fakeClient.Leases(lease.Namespace).Delete(ctx, lease.Name, metav1.DeleteOptions{})).To(Succeed())

Expect(lock.Create(ctx, resourcelock.LeaderElectionRecord{
HolderIdentity: "foo",
})).To(Succeed())

Expect(fakeClient.Leases(lease.Namespace).Get(ctx, lease.Name, metav1.GetOptions{})).To(And(
HaveField("ObjectMeta", And(
HaveField("Namespace", Equal(namespace)),
HaveField("Name", Equal(shardName)),
HaveField("Labels", Equal(map[string]string{
"alpha.sharding.timebertt.dev/controllerring": controllerRingName,
})),
)),
HaveField("Spec.HolderIdentity", Equal(ptr.To("foo"))),
))
})
})

Describe("#Update", func() {
It("should fail if lock is not initialized yet", func() {
Expect(lock.Update(ctx, resourcelock.LeaderElectionRecord{
HolderIdentity: "foo",
})).To(MatchError(ContainSubstring("not initialized")))
})

It("should update the lease", func() {
Expect(lock.Get(ctx)).Error().To(Succeed())

Expect(lock.Update(ctx, resourcelock.LeaderElectionRecord{
HolderIdentity: "foo",
})).To(Succeed())

Expect(fakeClient.Leases(lease.Namespace).Get(ctx, lease.Name, metav1.GetOptions{})).To(And(
HaveField("ObjectMeta", And(
HaveField("Namespace", Equal(namespace)),
HaveField("Name", Equal(shardName)),
HaveField("Labels", Equal(map[string]string{
"alpha.sharding.timebertt.dev/controllerring": controllerRingName,
})),
)),
HaveField("Spec.HolderIdentity", Equal(ptr.To("foo"))),
))
})

It("should keep externally managed labels", func() {
metav1.SetMetaDataLabel(&lease.ObjectMeta, "foo", "bar")
Expect(fakeClient.Leases(lease.Namespace).Update(ctx, lease, metav1.UpdateOptions{})).Error().To(Succeed())

Expect(lock.Get(ctx)).Error().To(Succeed())

Expect(lock.Update(ctx, resourcelock.LeaderElectionRecord{
HolderIdentity: "foo",
})).To(Succeed())

Expect(fakeClient.Leases(lease.Namespace).Get(ctx, lease.Name, metav1.GetOptions{})).To(
HaveField("ObjectMeta.Labels", Equal(map[string]string{
"foo": "bar",
"alpha.sharding.timebertt.dev/controllerring": controllerRingName,
})),
)
})
})

Describe("#RecordEvent", func() {
Context("no EventRecorder configured", func() {
It("should do nothing", func() {
lock.RecordEvent("foo")
})
})

Context("EventRecorder configured", func() {
var recorder *record.FakeRecorder

BeforeEach(func() {
recorder = record.NewFakeRecorder(1)
lock.(*LeaseLock).LockConfig.EventRecorder = recorder
})

It("should send the event", func() {
Expect(lock.Get(ctx)).Error().To(Succeed())

lock.RecordEvent("foo")

Eventually(recorder.Events).Should(Receive(
Equal("Normal LeaderElection " + shardName + " foo"),
))
})
})
})

Describe("#Describe", func() {
It("should return the lease key", func() {
Expect(lock.Describe()).To(Equal(client.ObjectKeyFromObject(lease).String()))
})
})

Describe("#Identity()", func() {
It("should return the lease name", func() {
Expect(lock.Identity()).To(Equal(lease.Name))
})
})
})

Describe("#getInClusterNamespace", func() {
It("should fail because namespace file does not exist", func() {
fsys = fstest.MapFS{}

Expect(getInClusterNamespace()).Error().To(MatchError(ContainSubstring("not running in cluster")))
})

It("should return content of namespace file", func() {
Expect(getInClusterNamespace()).To(Equal(namespace))
})
})
})
Loading