Skip to content

Commit 45862e4

Browse files
authored
feat: add AsReconciler wrapper to rate limit and replace controller-runtime's requeue: true behavior (#160)
* feat: add default singleton rate limiter to replace requeue * feat: Add requeue adapter for rate-limiting * clean up * fix presubmit * pr responses * reduce diff * further reduce diff * remove describe * formatting * pr responses * pr responses * fix comments
1 parent ac6a928 commit 45862e4

File tree

3 files changed

+355
-5
lines changed

3 files changed

+355
-5
lines changed

reconciler/reconciler.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package reconciler
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"k8s.io/client-go/util/workqueue"
8+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
9+
)
10+
11+
// Result adds Requeue functionality back to reconcile results.
12+
type Result struct {
13+
RequeueAfter time.Duration
14+
Requeue bool
15+
}
16+
17+
// Reconciler defines the interface for standard reconcilers
18+
type Reconciler interface {
19+
Reconcile(ctx context.Context, req reconcile.Request) (Result, error)
20+
}
21+
22+
// AsReconciler creates a reconciler with a default rate-limiter
23+
func AsReconciler(reconciler Reconciler) reconcile.Reconciler {
24+
return AsReconcilerWithRateLimiter(
25+
reconciler,
26+
workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](),
27+
)
28+
}
29+
30+
// AsReconcilerWithRateLimiter creates a reconciler with a custom rate-limiter
31+
func AsReconcilerWithRateLimiter(
32+
reconciler Reconciler,
33+
rateLimiter workqueue.TypedRateLimiter[reconcile.Request],
34+
) reconcile.Reconciler {
35+
return reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
36+
result, err := reconciler.Reconcile(ctx, req)
37+
if err != nil {
38+
return reconcile.Result{}, err
39+
}
40+
if result.RequeueAfter > 0 {
41+
rateLimiter.Forget(req)
42+
return reconcile.Result{RequeueAfter: result.RequeueAfter}, nil
43+
}
44+
if result.Requeue {
45+
return reconcile.Result{RequeueAfter: rateLimiter.When(req)}, nil
46+
}
47+
rateLimiter.Forget(req)
48+
return reconcile.Result{}, nil
49+
})
50+
}

reconciler/suite_test.go

Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
package reconciler_test
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/awslabs/operatorpkg/reconciler"
10+
. "github.com/onsi/ginkgo/v2"
11+
. "github.com/onsi/gomega"
12+
"k8s.io/apimachinery/pkg/types"
13+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
14+
)
15+
16+
func Test(t *testing.T) {
17+
RegisterFailHandler(Fail)
18+
RunSpecs(t, "Reconciler")
19+
}
20+
21+
// MockRateLimiter is a mock implementation of workqueue.TypedRateLimiter for testing
22+
type MockRateLimiter[K comparable] struct {
23+
whenFunc func(K) time.Duration
24+
numRequeues map[K]int
25+
backoffDuration time.Duration
26+
}
27+
28+
func (m *MockRateLimiter[K]) When(key K) time.Duration {
29+
if m.whenFunc != nil {
30+
return m.whenFunc(key)
31+
}
32+
// Default implementation
33+
if m.numRequeues == nil {
34+
m.numRequeues = make(map[K]int)
35+
}
36+
m.numRequeues[key] += 1
37+
return m.backoffDuration
38+
}
39+
40+
func (m *MockRateLimiter[K]) NumRequeues(key K) int {
41+
return m.numRequeues[key]
42+
}
43+
44+
func (m *MockRateLimiter[K]) Forget(key K) {
45+
delete(m.numRequeues, key)
46+
}
47+
48+
// MockReconciler is a mock implementation of Reconciler for testing
49+
type MockReconciler struct {
50+
reconcileFunc func(context.Context, reconcile.Request) (reconciler.Result, error)
51+
result reconciler.Result
52+
err error
53+
}
54+
55+
func (m *MockReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconciler.Result, error) {
56+
if m.reconcileFunc != nil {
57+
return m.reconcileFunc(ctx, req)
58+
}
59+
return m.result, m.err
60+
}
61+
62+
var _ = Describe("Reconciler", func() {
63+
It("should return the result without backoff", func() {
64+
mockReconciler := &MockReconciler{
65+
result: reconciler.Result{},
66+
}
67+
68+
reconciler := reconciler.AsReconciler(mockReconciler)
69+
70+
result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})
71+
72+
Expect(err).NotTo(HaveOccurred())
73+
Expect(result.RequeueAfter).To(Equal(0 * time.Second))
74+
})
75+
It("should return the result with backoff when Requeue is set", func() {
76+
mockReconciler := &MockReconciler{
77+
result: reconciler.Result{
78+
Requeue: true,
79+
},
80+
}
81+
82+
reconciler := reconciler.AsReconciler(mockReconciler)
83+
84+
result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})
85+
86+
Expect(err).NotTo(HaveOccurred())
87+
Expect(result.RequeueAfter).To(Equal(5 * time.Millisecond))
88+
})
89+
It("should return the result with backoff when both RequeueAfter and Requeue are set", func() {
90+
mockReconciler := &MockReconciler{
91+
result: reconciler.Result{
92+
RequeueAfter: 10 * time.Second,
93+
Requeue: true,
94+
},
95+
}
96+
97+
reconciler := reconciler.AsReconciler(mockReconciler)
98+
99+
result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})
100+
101+
Expect(err).NotTo(HaveOccurred())
102+
Expect(result.RequeueAfter).To(Equal(10 * time.Second))
103+
})
104+
It("should return the result with backoff when RequeueAfter is set and Requeue is false", func() {
105+
mockReconciler := &MockReconciler{
106+
result: reconciler.Result{
107+
RequeueAfter: 10 * time.Second,
108+
Requeue: false,
109+
},
110+
}
111+
112+
reconciler := reconciler.AsReconciler(mockReconciler)
113+
114+
result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})
115+
116+
Expect(err).NotTo(HaveOccurred())
117+
Expect(result.RequeueAfter).To(Equal(10 * time.Second))
118+
})
119+
It("should return the result with backoff when RequeueAfter is set to zero and Requeue is true", func() {
120+
mockReconciler := &MockReconciler{
121+
result: reconciler.Result{
122+
RequeueAfter: 0 * time.Second,
123+
Requeue: true,
124+
},
125+
}
126+
127+
reconciler := reconciler.AsReconciler(mockReconciler)
128+
129+
result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})
130+
131+
Expect(err).NotTo(HaveOccurred())
132+
Expect(result.RequeueAfter).To(Equal(5 * time.Millisecond))
133+
})
134+
It("should return the result without backoff when RequeueAfter is set to zero and Requeue is false", func() {
135+
mockReconciler := &MockReconciler{
136+
result: reconciler.Result{
137+
RequeueAfter: 0 * time.Second,
138+
Requeue: false,
139+
},
140+
}
141+
142+
reconciler := reconciler.AsReconciler(mockReconciler)
143+
144+
result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})
145+
146+
Expect(err).NotTo(HaveOccurred())
147+
Expect(result.RequeueAfter).To(Equal(0 * time.Millisecond))
148+
})
149+
It("should return the error without processing backoff", func() {
150+
expectedErr := errors.New("test error")
151+
mockReconciler := &MockReconciler{
152+
result: reconciler.Result{Requeue: true},
153+
err: expectedErr,
154+
}
155+
156+
reconciler := reconciler.AsReconciler(mockReconciler)
157+
158+
result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})
159+
160+
Expect(err).To(HaveOccurred())
161+
Expect(err).To(Equal(expectedErr))
162+
Expect(result.RequeueAfter).To(BeZero())
163+
})
164+
It("should use custom rate limiter for backoff", func() {
165+
mockRateLimiter := &MockRateLimiter[reconcile.Request]{
166+
backoffDuration: 10 * time.Second,
167+
}
168+
169+
mockReconciler := &MockReconciler{
170+
result: reconciler.Result{
171+
Requeue: true,
172+
},
173+
}
174+
175+
reconciler := reconciler.AsReconcilerWithRateLimiter(mockReconciler, mockRateLimiter)
176+
177+
req := reconcile.Request{}
178+
result, err := reconciler.Reconcile(context.Background(), req)
179+
180+
Expect(err).NotTo(HaveOccurred())
181+
Expect(result.RequeueAfter).To(Equal(10 * time.Second))
182+
Expect(mockRateLimiter.NumRequeues(req)).To(Equal(1))
183+
})
184+
It("should rate limit distinct items", func() {
185+
mockRateLimiter := &MockRateLimiter[reconcile.Request]{
186+
backoffDuration: 10 * time.Second,
187+
}
188+
189+
mockReconciler := &MockReconciler{
190+
result: reconciler.Result{
191+
Requeue: true,
192+
},
193+
}
194+
195+
reconciler := reconciler.AsReconcilerWithRateLimiter(mockReconciler, mockRateLimiter)
196+
197+
req1 := reconcile.Request{
198+
NamespacedName: types.NamespacedName{
199+
Name: "req1",
200+
Namespace: "",
201+
},
202+
}
203+
result1, err1 := reconciler.Reconcile(context.Background(), req1)
204+
req2 := reconcile.Request{
205+
NamespacedName: types.NamespacedName{
206+
Name: "req2",
207+
Namespace: "",
208+
},
209+
}
210+
result2, err2 := reconciler.Reconcile(context.Background(), req2)
211+
212+
Expect(err1).NotTo(HaveOccurred())
213+
Expect(result1.RequeueAfter).To(Equal(10 * time.Second))
214+
Expect(err2).NotTo(HaveOccurred())
215+
Expect(result2.RequeueAfter).To(Equal(10 * time.Second))
216+
Expect(mockRateLimiter.NumRequeues(req1)).To(Equal(1))
217+
Expect(mockRateLimiter.NumRequeues(req2)).To(Equal(1))
218+
})
219+
It("should implement exponential backoff on repeated calls", func() {
220+
mockReconciler := &MockReconciler{
221+
result: reconciler.Result{
222+
Requeue: true,
223+
},
224+
}
225+
// Multiple calls to the same controller should show increasing delays
226+
delays := make([]time.Duration, 5)
227+
reconciler := reconciler.AsReconciler(mockReconciler)
228+
229+
for i := range 5 {
230+
result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})
231+
Expect(err).NotTo(HaveOccurred())
232+
delays[i] = result.RequeueAfter
233+
}
234+
235+
initialDelay := 5 * time.Millisecond
236+
Expect(delays[0]).To(BeNumerically("==", initialDelay))
237+
for i := 1; i < len(delays); i++ {
238+
initialDelay *= 2
239+
Expect(delays[i]).To(BeNumerically("==", initialDelay))
240+
Expect(delays[i]).To(BeNumerically(">", delays[i-1]),
241+
"Delay at index %d (%v) should be >= delay at index %d (%v)",
242+
i, delays[i], i-1, delays[i-1])
243+
}
244+
})
245+
It("should forget an item when reconcile succeeds", func() {
246+
mockReconciler := &MockReconciler{
247+
result: reconciler.Result{
248+
Requeue: false,
249+
},
250+
}
251+
// Multiple calls to the same controller should show zero requeues
252+
reconciler := reconciler.AsReconciler(mockReconciler)
253+
254+
for i := 0; i < 5; i++ {
255+
result, err := reconciler.Reconcile(context.Background(), reconcile.Request{})
256+
Expect(err).NotTo(HaveOccurred())
257+
Expect(result.RequeueAfter).To(BeZero())
258+
}
259+
})
260+
It("should return a result with RequeueAfter that is scoped to a controller", func() {
261+
// Test with different controllers to ensure they're handled independently
262+
controller1 := &MockReconciler{
263+
result: reconciler.Result{Requeue: true},
264+
}
265+
controller2 := &MockReconciler{
266+
result: reconciler.Result{Requeue: true},
267+
}
268+
269+
reconciler1 := reconciler.AsReconciler(controller1)
270+
reconciler2 := reconciler.AsReconciler(controller2)
271+
272+
// Each controller should get its own rate limiting
273+
result1, err1 := reconciler1.Reconcile(context.Background(), reconcile.Request{})
274+
result2, err2 := reconciler2.Reconcile(context.Background(), reconcile.Request{})
275+
276+
Expect(err1).NotTo(HaveOccurred())
277+
Expect(err2).NotTo(HaveOccurred())
278+
Expect(result1.RequeueAfter).To(BeNumerically(">=", 0))
279+
Expect(result2.RequeueAfter).To(BeNumerically(">=", 0))
280+
Expect(result1.RequeueAfter).To(Equal(result2.RequeueAfter))
281+
282+
result2, err2 = reconciler2.Reconcile(context.Background(), reconcile.Request{})
283+
Expect(err2).NotTo(HaveOccurred())
284+
Expect(result1.RequeueAfter).NotTo(Equal(result2.RequeueAfter))
285+
})
286+
})

singleton/controller.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"time"
66

7+
"github.com/awslabs/operatorpkg/reconciler"
78
"k8s.io/client-go/util/workqueue"
89
"sigs.k8s.io/controller-runtime/pkg/event"
910
"sigs.k8s.io/controller-runtime/pkg/handler"
@@ -17,16 +18,29 @@ const (
1718
RequeueImmediately = 1 * time.Nanosecond
1819
)
1920

21+
// Reconciler defines the interface for singleton reconcilers
2022
type Reconciler interface {
21-
Reconcile(ctx context.Context) (reconcile.Result, error)
23+
Reconcile(ctx context.Context) (reconciler.Result, error)
24+
}
25+
type reconcilerAdapter struct {
26+
Reconciler
2227
}
2328

24-
func AsReconciler(reconciler Reconciler) reconcile.Reconciler {
25-
return reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
26-
return reconciler.Reconcile(ctx)
27-
})
29+
func (r *reconcilerAdapter) Reconcile(ctx context.Context, _ reconcile.Request) (reconciler.Result, error) {
30+
return r.Reconciler.Reconcile(ctx)
31+
}
32+
33+
// In response to Requeue: True being deprecated via: https://github.com/kubernetes-sigs/controller-runtime/pull/3107/files
34+
// This uses a bucket and per item delay but the item will be the same because the key is the controller name.
35+
// This implements the same behavior as Requeue: True.
36+
37+
// AsReconciler creates a controller-runtime reconciler from a singleton reconciler
38+
func AsReconciler(rec Reconciler) reconcile.Reconciler {
39+
adapter := &reconcilerAdapter{Reconciler: rec}
40+
return reconciler.AsReconciler(adapter)
2841
}
2942

43+
// Source creates a source for singleton controllers
3044
func Source() source.Source {
3145
eventSource := make(chan event.GenericEvent, 1)
3246
eventSource <- event.GenericEvent{}

0 commit comments

Comments
 (0)