Skip to content
Merged
49 changes: 49 additions & 0 deletions reconciler/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package reconciler

import (
"context"
"time"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// Result adds Requeue functionality back to reconcile results.
type Result struct {
RequeueAfter time.Duration
Requeue bool
}

// Reconciler defines the interface for standard reconcilers
type Reconciler interface {
Reconcile(ctx context.Context, req reconcile.Request) (Result, error)
}

// AsReconciler creates a reconciler with a default rate-limiter
func AsReconciler(reconciler Reconciler) reconcile.Reconciler {
return AsReconcilerWithRateLimiter(
reconciler,
workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](),
)
}

// AsReconcilerWithRateLimiter creates a reconciler with a custom rate-limiter
func AsReconcilerWithRateLimiter(
reconciler Reconciler,
rateLimiter workqueue.TypedRateLimiter[reconcile.Request],
) reconcile.Reconciler {
return reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
result, err := reconciler.Reconcile(ctx, req)
if err != nil {
return reconcile.Result{}, err
}
if result.RequeueAfter > 0 {
return reconcile.Result{RequeueAfter: result.RequeueAfter}, nil
}
if result.Requeue {
return reconcile.Result{RequeueAfter: rateLimiter.When(req)}, nil
}
rateLimiter.Forget(req)
return reconcile.Result{}, nil
})
}
260 changes: 260 additions & 0 deletions reconciler/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
package reconciler_test

import (
"context"
"errors"
"testing"
"time"

"github.com/awslabs/operatorpkg/reconciler"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func Test(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Reconciler")
}

// MockRateLimiter is a mock implementation of workqueue.TypedRateLimiter for testing
type MockRateLimiter[K comparable] struct {
whenFunc func(K) time.Duration
numRequeues map[K]int
backoffDuration time.Duration
}

func (m *MockRateLimiter[K]) When(key K) time.Duration {
if m.whenFunc != nil {
return m.whenFunc(key)
}
// Default implementation
if m.numRequeues == nil {
m.numRequeues = make(map[K]int)
}
m.numRequeues[key] += 1
return m.backoffDuration
}

func (m *MockRateLimiter[K]) NumRequeues(key K) int {
return m.numRequeues[key]
}

func (m *MockRateLimiter[K]) Forget(key K) {
delete(m.numRequeues, key)
}

// MockReconciler is a mock implementation of Reconciler for testing
type MockReconciler struct {
reconcileFunc func(context.Context, reconcile.Request) (reconciler.Result, error)
result reconciler.Result
err error
}

func (m *MockReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconciler.Result, error) {
if m.reconcileFunc != nil {
return m.reconcileFunc(ctx, req)
}
return m.result, m.err
}

var _ = Describe("Reconciler", func() {
It("should return the result without backoff", func() {
mockReconciler := &MockReconciler{
result: reconciler.Result{},
}

reconciler := reconciler.AsReconciler(mockReconciler)

result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})

Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(Equal(0 * time.Second))
})
It("should return the result with backoff when Requeue is set", func() {
mockReconciler := &MockReconciler{
result: reconciler.Result{
Requeue: true,
},
}

reconciler := reconciler.AsReconciler(mockReconciler)

result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})

Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(Equal(5 * time.Millisecond))
})
It("should return the result with backoff when both RequeueAfter and Requeue are set", func() {
mockReconciler := &MockReconciler{
result: reconciler.Result{
RequeueAfter: 10 * time.Second,
Requeue: true,
},
}

reconciler := reconciler.AsReconciler(mockReconciler)

result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})

Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(Equal(10 * time.Second))
})
It("should return the result with backoff when RequeueAfter is set and Requeue is false", func() {
mockReconciler := &MockReconciler{
result: reconciler.Result{
RequeueAfter: 10 * time.Second,
Requeue: false,
},
}

reconciler := reconciler.AsReconciler(mockReconciler)

result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})

Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(Equal(10 * time.Second))
})
It("should return the result with backoff when RequeueAfter is set to zero and Requeue is true", func() {
mockReconciler := &MockReconciler{
result: reconciler.Result{
RequeueAfter: 0 * time.Second,
Requeue: true,
},
}

reconciler := reconciler.AsReconciler(mockReconciler)

result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})

Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(Equal(5 * time.Millisecond))
})
It("should return the result without backoff when RequeueAfter is set to zero and Requeue is false", func() {
mockReconciler := &MockReconciler{
result: reconciler.Result{
RequeueAfter: 0 * time.Second,
Requeue: false,
},
}

reconciler := reconciler.AsReconciler(mockReconciler)

result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})

Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(Equal(0 * time.Millisecond))
})
It("should return the error without processing backoff", func() {
expectedErr := errors.New("test error")
mockReconciler := &MockReconciler{
result: reconciler.Result{Requeue: true},
err: expectedErr,
}

reconciler := reconciler.AsReconciler(mockReconciler)

result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})

Expect(err).To(HaveOccurred())
Expect(err).To(Equal(expectedErr))
Expect(result.RequeueAfter).To(BeZero())
})
It("should use custom rate limiter for backoff", func() {
mockRateLimiter := &MockRateLimiter[reconcile.Request]{
backoffDuration: 10 * time.Second,
}

mockReconciler := &MockReconciler{
result: reconciler.Result{
Requeue: true,
},
}

reconciler := reconciler.AsReconcilerWithRateLimiter(mockReconciler, mockRateLimiter)

req := reconcile.Request{}
result, err := reconciler.Reconcile(context.Background(), req)

Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(Equal(10 * time.Second))
Expect(mockRateLimiter.NumRequeues(req)).To(Equal(1))
})
It("should rate limit distinct items", func() {
mockRateLimiter := &MockRateLimiter[reconcile.Request]{
backoffDuration: 10 * time.Second,
}

mockReconciler := &MockReconciler{
result: reconciler.Result{
Requeue: true,
},
}

reconciler := reconciler.AsReconcilerWithRateLimiter(mockReconciler, mockRateLimiter)

req1 := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: "req1",
Namespace: "",
},
}
result1, err1 := reconciler.Reconcile(context.Background(), req1)
req2 := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: "req2",
Namespace: "",
},
}
result2, err2 := reconciler.Reconcile(context.Background(), req2)

Expect(err1).NotTo(HaveOccurred())
Expect(result1.RequeueAfter).To(Equal(10 * time.Second))
Expect(err2).NotTo(HaveOccurred())
Expect(result2.RequeueAfter).To(Equal(10 * time.Second))
Expect(mockRateLimiter.NumRequeues(req1)).To(Equal(1))
Expect(mockRateLimiter.NumRequeues(req2)).To(Equal(1))
})
It("should implement exponential backoff on repeated calls", func() {
mockReconciler := &MockReconciler{
result: reconciler.Result{
Requeue: true,
},
}
// Multiple calls to the same controller should show increasing delays
delays := make([]time.Duration, 5)
reconciler := reconciler.AsReconciler(mockReconciler)

for i := range 5 {
result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})
Expect(err).NotTo(HaveOccurred())
delays[i] = result.RequeueAfter
}

initialDelay := 5 * time.Millisecond
Expect(delays[0]).To(BeNumerically("==", initialDelay))
for i := 1; i < len(delays); i++ {
initialDelay *= 2
Expect(delays[i]).To(BeNumerically("==", initialDelay))
Expect(delays[i]).To(BeNumerically(">", delays[i-1]),
"Delay at index %d (%v) should be >= delay at index %d (%v)",
i, delays[i], i-1, delays[i-1])
}
})
It("should forget an item when reconcile succeeds", func() {
mockReconciler := &MockReconciler{
result: reconciler.Result{
Requeue: false,
},
}
// Multiple calls to the same controller should show zero requeues
reconciler := reconciler.AsReconciler(mockReconciler)

for i := 0; i < 5; i++ {
result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})
Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(BeZero())
}
})
})
24 changes: 19 additions & 5 deletions singleton/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/awslabs/operatorpkg/reconciler"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -17,16 +18,29 @@ const (
RequeueImmediately = 1 * time.Nanosecond
)

// Reconciler defines the interface for singleton reconcilers
type Reconciler interface {
Reconcile(ctx context.Context) (reconcile.Result, error)
Reconcile(ctx context.Context) (reconciler.Result, error)
}
type reconcilerAdapter struct {
Reconciler
}

func AsReconciler(reconciler Reconciler) reconcile.Reconciler {
return reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
return reconciler.Reconcile(ctx)
})
func (r *reconcilerAdapter) Reconcile(ctx context.Context, _ reconcile.Request) (reconciler.Result, error) {
return r.Reconciler.Reconcile(ctx)
}

// In response to Requeue: True being deprecated via: https://github.com/kubernetes-sigs/controller-runtime/pull/3107/files
// This uses a bucket and per item delay but the item will be the same because the key is the controller name.
// This implements the same behavior as Requeue: True.

// AsReconciler creates a controller-runtime reconciler from a singleton reconciler
func AsReconciler(rec Reconciler) reconcile.Reconciler {
adapter := &reconcilerAdapter{Reconciler: rec}
return reconciler.AsReconciler(adapter)
}

// Source creates a source for singleton controllers
func Source() source.Source {
eventSource := make(chan event.GenericEvent, 1)
eventSource <- event.GenericEvent{}
Expand Down
Loading