Skip to content

Commit b6b38b0

Browse files
committed
add coalescing reconciler to enable debouncing of reconciles
1 parent b2efc45 commit b6b38b0

File tree

5 files changed

+392
-0
lines changed

5 files changed

+392
-0
lines changed

pkg/coalescing/mocks/coalescing_mock.go

Lines changed: 78 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/coalescing/mocks/doc.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Run go generate to regenerate this mock.
18+
//go:generate ../../../hack/tools/bin/mockgen -destination coalescing_mock.go -package mock_coalescing -source ../reconciler.go ReconcileCacher
19+
//go:generate ../../../hack/tools/bin/mockgen -destination reconciler_mock.go -package mock_coalescing sigs.k8s.io/controller-runtime/pkg/reconcile Reconciler
20+
//go:generate /usr/bin/env bash -c "cat ../../../hack/boilerplate/boilerplate.generatego.txt coalescing_mock.go > _coalescing_mock.go && mv _coalescing_mock.go coalescing_mock.go"
21+
//go:generate /usr/bin/env bash -c "cat ../../../hack/boilerplate/boilerplate.generatego.txt reconciler_mock.go > _reconciler_mock.go && mv _reconciler_mock.go reconciler_mock.go"
22+
package mock_coalescing //nolint

pkg/coalescing/mocks/reconciler_mock.go

Lines changed: 67 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/coalescing/reconciler.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package coalescing
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
"github.com/go-logr/logr"
24+
"github.com/pkg/errors"
25+
"go.opentelemetry.io/otel/api/trace"
26+
"go.opentelemetry.io/otel/label"
27+
"sigs.k8s.io/cluster-api-provider-azure/util/cache/ttllru"
28+
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
29+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
30+
)
31+
32+
type (
33+
// ReconcileCache uses and underlying time to live last recently used cache to track high frequency requests.
34+
// A reconciler should call ShouldProcess to determine if the key has expired. If the key has expired, a zero value
35+
// time.Time and true is returned. If the key has not expired, the expiration and false is returned. Upon successful
36+
// reconciliation a reconciler should call Reconciled to update the cache expiry.
37+
ReconcileCache struct {
38+
lastSuccessfulReconciliationCache ttllru.PeekingCacher
39+
}
40+
41+
// ReconcileCacher describes an interface for determining if a request should be reconciled through a call to
42+
// ShouldProcess and if ok, reset the cool down through a call to Reconciled
43+
ReconcileCacher interface {
44+
ShouldProcess(key string) (expiration time.Time, ok bool)
45+
Reconciled(key string)
46+
}
47+
48+
// reconciler is the caching reconciler middleware that uses the cache or
49+
reconciler struct {
50+
upstream reconcile.Reconciler
51+
cache ReconcileCacher
52+
log logr.Logger
53+
}
54+
)
55+
56+
// NewRequestCache creates a new instance of a ReconcileCache given a specified window of expiration
57+
func NewRequestCache(window time.Duration) (*ReconcileCache, error) {
58+
cache, err := ttllru.New(1024, window)
59+
if err != nil {
60+
return nil, errors.Wrap(err, "failed to build ttllru cache")
61+
}
62+
63+
return &ReconcileCache{
64+
lastSuccessfulReconciliationCache: cache,
65+
}, nil
66+
}
67+
68+
// ShouldProcess determines if the key has expired. If the key has expired, a zero value
69+
// time.Time and true is returned. If the key has not expired, the expiration and false is returned.
70+
func (cache *ReconcileCache) ShouldProcess(key string) (time.Time, bool) {
71+
_, expiration, ok := cache.lastSuccessfulReconciliationCache.Peek(key)
72+
return expiration, !ok
73+
}
74+
75+
// Reconciled updates the cache expiry for a given key
76+
func (cache *ReconcileCache) Reconciled(key string) {
77+
cache.lastSuccessfulReconciliationCache.Add(key, nil)
78+
}
79+
80+
// NewReconciler returns a reconcile wrapper that will delay new reconcile.Requests
81+
// after the cache expiry of the request string key.
82+
// A successful reconciliation is defined as as one where no error is returned
83+
func NewReconciler(upstream reconcile.Reconciler, cache ReconcileCacher, log logr.Logger) reconcile.Reconciler {
84+
return &reconciler{
85+
upstream: upstream,
86+
cache: cache,
87+
log: log.WithName("CoalescingReconciler"),
88+
}
89+
}
90+
91+
// Reconcile sends a request to the upstream reconciler if the request is outside of the debounce window
92+
func (rc *reconciler) Reconcile(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
93+
ctx, span := tele.Tracer().Start(ctx, "controllers.reconciler.Reconcile",
94+
trace.WithAttributes(
95+
label.String("namespace", r.Namespace),
96+
label.String("name", r.Name),
97+
))
98+
defer span.End()
99+
100+
log := rc.log.WithValues("request", r.String())
101+
102+
if expiration, ok := rc.cache.ShouldProcess(r.String()); !ok {
103+
log.V(4).Info("not processing", "expiration", expiration, "timeUntil", time.Until(expiration))
104+
var requeueAfter = time.Until(expiration)
105+
if requeueAfter < 1*time.Second {
106+
requeueAfter = 1 * time.Second
107+
}
108+
return reconcile.Result{RequeueAfter: requeueAfter}, nil
109+
}
110+
111+
log.V(4).Info("processing")
112+
result, err := rc.upstream.Reconcile(ctx, r)
113+
if err != nil {
114+
log.V(4).Info("not successful")
115+
return result, err
116+
}
117+
118+
log.V(4).Info("successful")
119+
rc.cache.Reconciled(r.String())
120+
return result, nil
121+
}

pkg/coalescing/reconciler_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package coalescing
18+
19+
import (
20+
"context"
21+
"testing"
22+
"time"
23+
24+
"github.com/golang/mock/gomock"
25+
. "github.com/onsi/gomega"
26+
gtypes "github.com/onsi/gomega/types"
27+
"github.com/pkg/errors"
28+
"k8s.io/apimachinery/pkg/types"
29+
mock_coalescing "sigs.k8s.io/cluster-api-provider-azure/pkg/coalescing/mocks"
30+
"sigs.k8s.io/controller-runtime/pkg/log"
31+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
32+
)
33+
34+
func TestCoalescingReconciler_Reconcile(t *testing.T) {
35+
var (
36+
defaultRequest = reconcile.Request{
37+
NamespacedName: types.NamespacedName{
38+
Name: "aName",
39+
Namespace: "aNamespace",
40+
},
41+
}
42+
43+
defaultRequestKey = "aNamespace/aName"
44+
)
45+
46+
cases := []struct {
47+
Name string
48+
Reconciler func(g *WithT, cacherMock *mock_coalescing.MockReconcileCacher, mockReconciler *mock_coalescing.MockReconciler) reconcile.Reconciler
49+
Request reconcile.Request
50+
MatchThis gtypes.GomegaMatcher
51+
Error string
52+
}{
53+
{
54+
Name: "should call upstream reconciler if key does not exist in cache",
55+
Reconciler: func(g *WithT, cacherMock *mock_coalescing.MockReconcileCacher, mockReconciler *mock_coalescing.MockReconciler) reconcile.Reconciler {
56+
cacherMock.EXPECT().ShouldProcess(defaultRequestKey).Return(time.Now(), true)
57+
cacherMock.EXPECT().Reconciled(defaultRequestKey)
58+
mockReconciler.EXPECT().Reconcile(gomock.Any(), defaultRequest)
59+
return NewReconciler(mockReconciler, cacherMock, log.NullLogger{})
60+
},
61+
Request: defaultRequest,
62+
MatchThis: Equal(0 * time.Second),
63+
},
64+
{
65+
Name: "should not call upstream reconciler if key does exists in cache and is not expired",
66+
Reconciler: func(g *WithT, cacherMock *mock_coalescing.MockReconcileCacher, mockReconciler *mock_coalescing.MockReconciler) reconcile.Reconciler {
67+
cacherMock.EXPECT().ShouldProcess(defaultRequestKey).Return(time.Now().Add(30*time.Second), false)
68+
return NewReconciler(mockReconciler, cacherMock, log.NullLogger{})
69+
},
70+
Request: defaultRequest,
71+
MatchThis: And(BeNumerically("<=", 30*time.Second), BeNumerically(">", 29*time.Second)),
72+
},
73+
{
74+
Name: "should call upstream reconciler if key does not exist in cache and return error",
75+
Reconciler: func(g *WithT, cacherMock *mock_coalescing.MockReconcileCacher, mockReconciler *mock_coalescing.MockReconciler) reconcile.Reconciler {
76+
cacherMock.EXPECT().ShouldProcess(defaultRequestKey).Return(time.Now(), true)
77+
mockReconciler.EXPECT().Reconcile(gomock.Any(), defaultRequest).Return(reconcile.Result{}, errors.New("boom"))
78+
return NewReconciler(mockReconciler, cacherMock, log.NullLogger{})
79+
},
80+
Request: defaultRequest,
81+
MatchThis: Equal(0 * time.Second),
82+
Error: "boom",
83+
},
84+
}
85+
86+
for _, c := range cases {
87+
c := c
88+
t.Run(c.Name, func(t *testing.T) {
89+
g := NewWithT(t)
90+
mockCtrl := gomock.NewController(t)
91+
defer mockCtrl.Finish()
92+
cacherMock := mock_coalescing.NewMockReconcileCacher(mockCtrl)
93+
reconcilerMock := mock_coalescing.NewMockReconciler(mockCtrl)
94+
subject := c.Reconciler(g, cacherMock, reconcilerMock)
95+
result, err := subject.Reconcile(context.Background(), c.Request)
96+
if c.Error != "" || err != nil {
97+
g.Expect(err).To(And(HaveOccurred(), MatchError(c.Error)))
98+
return
99+
}
100+
101+
g.Expect(result.RequeueAfter).To(c.MatchThis)
102+
})
103+
}
104+
}

0 commit comments

Comments
 (0)