Skip to content

Commit 0827dce

Browse files
authored
Add rate limiter to the Cloudmap API calls. Increasing the base delay of the service export reconciler's rate limiter. (#243)
1 parent 9efb435 commit 0827dce

File tree

8 files changed

+57
-16
lines changed

8 files changed

+57
-16
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ ENVTEST_ASSETS_DIR=$(shell pwd)/testbin
7373
KUBEBUILDER_ASSETS?="$(shell $(ENVTEST) use -i $(ENVTEST_KUBERNETES_VERSION) --bin-dir=$(ENVTEST_ASSETS_DIR) -p path)"
7474

7575
.PHONY: test
76-
test: manifests generate generate-mocks fmt vet test-setup ## Run tests
76+
test: manifests generate generate-mocks fmt vet test-setup goimports lint ## Run tests
7777
KUBEBUILDER_ASSETS=$(KUBEBUILDER_ASSETS) go test ./... -coverprofile cover.out -covermode=atomic
7878

7979
test-setup: setup-envtest ## Ensure test environment has been downloaded
@@ -117,7 +117,7 @@ eks-test:
117117
##@ Build
118118

119119
.DEFAULT: build
120-
build: test goimports lint ## Build manager binary.
120+
build: test ## Build manager binary.
121121
go build -ldflags="-s -w -X ${PKG}.GitVersion=${GIT_TAG} -X ${PKG}.GitCommit=${GIT_COMMIT}" -o bin/manager main.go
122122

123123
run: test ## Run a controller from your host.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ require (
1313
github.com/onsi/gomega v1.20.2
1414
github.com/pkg/errors v0.9.1
1515
github.com/stretchr/testify v1.8.1
16+
golang.org/x/time v0.1.0
1617
k8s.io/api v0.24.3
1718
k8s.io/apimachinery v0.24.3
1819
k8s.io/client-go v0.24.2
@@ -80,7 +81,6 @@ require (
8081
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
8182
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
8283
golang.org/x/text v0.3.7 // indirect
83-
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
8484
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
8585
google.golang.org/appengine v1.6.7 // indirect
8686
google.golang.org/protobuf v1.28.0 // indirect

go.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -774,8 +774,9 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
774774
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
775775
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
776776
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
777-
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
778777
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
778+
golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=
779+
golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
779780
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
780781
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
781782
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

main.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,24 +92,24 @@ func main() {
9292

9393
if err = (&multiclustercontrollers.ServiceExportReconciler{
9494
Client: mgr.GetClient(),
95-
Log: common.NewLogger("controllers", "ServiceExport"),
95+
Log: common.NewLogger("controllers", "ServiceExportReconciler"),
9696
Scheme: mgr.GetScheme(),
9797
CloudMap: serviceDiscoveryClient,
9898
ClusterUtils: clusterUtils,
9999
}).SetupWithManager(mgr); err != nil {
100-
log.Error(err, "unable to create controller", "controller", "ServiceExport")
100+
log.Error(err, "unable to create controller", "controller", "ServiceExportReconciler")
101101
os.Exit(1)
102102
}
103103

104104
cloudMapReconciler := &multiclustercontrollers.CloudMapReconciler{
105105
Client: mgr.GetClient(),
106106
Cloudmap: serviceDiscoveryClient,
107-
Log: common.NewLogger("controllers", "Cloudmap"),
107+
Log: common.NewLogger("controllers", "CloudmapReconciler"),
108108
ClusterUtils: clusterUtils,
109109
}
110110

111111
if err = mgr.Add(cloudMapReconciler); err != nil {
112-
log.Error(err, "unable to create controller", "controller", "CloudMap")
112+
log.Error(err, "unable to create controller", "controller", "CloudmapReconciler")
113113
os.Exit(1)
114114
}
115115

pkg/apis/multicluster/v1alpha1/zz_generated.deepcopy.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/cloudmap/api.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"time"
8+
9+
"golang.org/x/time/rate"
710

811
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/common"
912
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/model"
@@ -52,19 +55,30 @@ type ServiceDiscoveryApi interface {
5255
}
5356

5457
type serviceDiscoveryApi struct {
55-
log common.Logger
56-
awsFacade AwsFacade
58+
log common.Logger
59+
awsFacade AwsFacade
60+
nsRateLimiter *rate.Limiter
61+
svcRateLimiter *rate.Limiter
62+
opRateLimiter *rate.Limiter
5763
}
5864

5965
// NewServiceDiscoveryApiFromConfig creates a new AWS Cloud Map API connection manager from an AWS client config.
6066
func NewServiceDiscoveryApiFromConfig(cfg *aws.Config) ServiceDiscoveryApi {
6167
return &serviceDiscoveryApi{
62-
log: common.NewLogger("cloudmap"),
63-
awsFacade: NewAwsFacadeFromConfig(cfg),
68+
log: common.NewLogger("cloudmap", "api"),
69+
awsFacade: NewAwsFacadeFromConfig(cfg),
70+
nsRateLimiter: rate.NewLimiter(rate.Every(1*time.Second), 5), // 1 per second
71+
svcRateLimiter: rate.NewLimiter(rate.Every(2*time.Second), 10), // 2 per second
72+
opRateLimiter: rate.NewLimiter(rate.Every(100*time.Second), 200), // 100 per second
6473
}
6574
}
6675

6776
func (sdApi *serviceDiscoveryApi) GetNamespaceMap(ctx context.Context) (map[string]*model.Namespace, error) {
77+
err := sdApi.nsRateLimiter.Wait(ctx)
78+
if err != nil {
79+
return nil, err
80+
}
81+
6882
namespaceMap := make(map[string]*model.Namespace)
6983

7084
pages := sd.NewListNamespacesPaginator(sdApi.awsFacade, &sd.ListNamespacesInput{})
@@ -91,6 +105,11 @@ func (sdApi *serviceDiscoveryApi) GetNamespaceMap(ctx context.Context) (map[stri
91105
}
92106

93107
func (sdApi *serviceDiscoveryApi) GetServiceIdMap(ctx context.Context, nsId string) (map[string]string, error) {
108+
err := sdApi.svcRateLimiter.Wait(ctx)
109+
if err != nil {
110+
return nil, err
111+
}
112+
94113
serviceIdMap := make(map[string]string)
95114

96115
filter := types.ServiceFilter{
@@ -155,6 +174,11 @@ func (sdApi *serviceDiscoveryApi) ListOperations(ctx context.Context, opFilters
155174
}
156175

157176
func (sdApi *serviceDiscoveryApi) GetOperation(ctx context.Context, opId string) (operation *types.Operation, err error) {
177+
err = sdApi.opRateLimiter.Wait(ctx)
178+
if err != nil {
179+
return nil, err
180+
}
181+
158182
opResp, err := sdApi.awsFacade.GetOperation(ctx, &sd.GetOperationInput{OperationId: &opId})
159183

160184
if err != nil {

pkg/cloudmap/api_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import (
55
"errors"
66
"fmt"
77
"testing"
8+
"time"
9+
10+
"golang.org/x/time/rate"
811

912
aboutv1alpha1 "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/apis/about/v1alpha1"
1013

@@ -336,7 +339,10 @@ func getServiceDiscoveryApi(t *testing.T, awsFacade *cloudmapMock.MockAwsFacade)
336339
scheme := runtime.NewScheme()
337340
scheme.AddKnownTypes(aboutv1alpha1.GroupVersion, &aboutv1alpha1.ClusterProperty{})
338341
return &serviceDiscoveryApi{
339-
log: common.NewLoggerWithLogr(testr.New(t)),
340-
awsFacade: awsFacade,
342+
log: common.NewLoggerWithLogr(testr.New(t)),
343+
awsFacade: awsFacade,
344+
nsRateLimiter: rate.NewLimiter(rate.Every(1*time.Second), 2), // 1 per second
345+
svcRateLimiter: rate.NewLimiter(rate.Every(2*time.Second), 4), // 2 per second
346+
opRateLimiter: rate.NewLimiter(rate.Every(10*time.Second), 100), // 10 per second
341347
}
342348
}

pkg/controllers/multicluster/serviceexport_controller.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package controllers
33
import (
44
"context"
55
"fmt"
6+
"time"
7+
8+
"k8s.io/apimachinery/pkg/api/errors"
69

710
aboutv1alpha1 "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/apis/about/v1alpha1"
811
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/cloudmap"
@@ -12,9 +15,11 @@ import (
1215
"github.com/aws/aws-sdk-go-v2/aws"
1316
v1 "k8s.io/api/core/v1"
1417
discovery "k8s.io/api/discovery/v1"
15-
"k8s.io/apimachinery/pkg/api/errors"
18+
1619
"k8s.io/apimachinery/pkg/types"
20+
"k8s.io/client-go/util/workqueue"
1721
"sigs.k8s.io/controller-runtime/pkg/builder"
22+
"sigs.k8s.io/controller-runtime/pkg/controller"
1823
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1924
"sigs.k8s.io/controller-runtime/pkg/event"
2025
"sigs.k8s.io/controller-runtime/pkg/handler"
@@ -295,6 +300,11 @@ func (r *ServiceExportReconciler) SetupWithManager(mgr ctrl.Manager) error {
295300
&source.Kind{Type: &aboutv1alpha1.ClusterProperty{}},
296301
handler.EnqueueRequestsFromMapFunc(r.clusterPropertyMappingFunction()),
297302
).
303+
WithOptions(controller.Options{
304+
// rate-limiting is applied to reconcile responses with an error
305+
// We are increasing the base delay to 500ms, defaults baseDelay: 5ms, maxDelay: 1000s
306+
RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 1000*time.Second),
307+
}).
298308
Complete(r)
299309
}
300310

0 commit comments

Comments
 (0)