Skip to content

Commit ffa284d

Browse files
punkwalkerdamdo
andauthored
🌱 Migrate ServiceLimiters to AWS SDK V2 (#5574)
* Migrate ServiceLimiters to AWS SDK V2 Signed-off-by: Pankaj Walke <[email protected]> * fix lint errors Signed-off-by: Pankaj Walke <[email protected]> * makefile: bump release-binaries's GOMAXPROCS=2 it was hanging otherwise --------- Signed-off-by: Pankaj Walke <[email protected]> Co-authored-by: Damiano Donati <[email protected]>
1 parent b3a6721 commit ffa284d

File tree

2 files changed

+73
-1
lines changed

2 files changed

+73
-1
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ promote-images: $(KPROMO) $(YQ)
606606

607607
.PHONY: release-binaries
608608
release-binaries: $(GORELEASER) ## Builds only the binaries, not a release.
609-
$(GORELEASER) build --config $(GORELEASER_CONFIG) --snapshot --clean
609+
GOMAXPROCS=2 $(GORELEASER) build --config $(GORELEASER_CONFIG) --snapshot --clean
610610

611611
.PHONY: release-staging
612612
release-staging: ## Builds and push container images and manifests to the staging bucket.

pkg/cloud/throttle/throttle.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ limitations under the License.
1818
package throttle
1919

2020
import (
21+
"context"
2122
"regexp"
2223
"strings"
2324

25+
awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware"
2426
"github.com/aws/aws-sdk-go/aws/request"
27+
"github.com/aws/smithy-go/middleware"
2528

2629
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/awserrors"
2730
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/internal/rate"
@@ -52,6 +55,11 @@ func (o *OperationLimiter) Wait(r *request.Request) error {
5255
return o.getLimiter().Wait(r.Context())
5356
}
5457

58+
// WaitV2 will wait on a request for AWS SDK V2.
59+
func (o *OperationLimiter) WaitV2(ctx context.Context) error {
60+
return o.getLimiter().Wait(ctx)
61+
}
62+
5563
// Match will match a request.
5664
func (o *OperationLimiter) Match(r *request.Request) (bool, error) {
5765
if o.regexp == nil {
@@ -64,13 +72,33 @@ func (o *OperationLimiter) Match(r *request.Request) (bool, error) {
6472
return o.regexp.MatchString(r.Operation.Name), nil
6573
}
6674

75+
// MatchV2 will match a request for AWS SDK V2.
76+
func (o *OperationLimiter) MatchV2(ctx context.Context) (bool, error) {
77+
if o.regexp == nil {
78+
var err error
79+
o.regexp, err = regexp.Compile("^" + o.Operation)
80+
if err != nil {
81+
return false, err
82+
}
83+
}
84+
opName := awsmiddleware.GetOperationName(ctx)
85+
return o.regexp.MatchString(opName), nil
86+
}
87+
6788
// LimitRequest will limit a request.
6889
func (s ServiceLimiter) LimitRequest(r *request.Request) {
6990
if ol, ok := s.matchRequest(r); ok {
7091
_ = ol.Wait(r)
7192
}
7293
}
7394

95+
// LimitRequestV2 will limit a request for AWS SDK V2.
96+
func (s ServiceLimiter) LimitRequestV2(ctx context.Context) {
97+
if ol, ok := s.matchRequestV2(ctx); ok {
98+
_ = ol.WaitV2(ctx)
99+
}
100+
}
101+
74102
func (o *OperationLimiter) getLimiter() *rate.Limiter {
75103
if o.limiter == nil {
76104
o.limiter = rate.NewLimiter(o.RefillRate, o.Burst)
@@ -92,6 +120,16 @@ func (s ServiceLimiter) ReviewResponse(r *request.Request) {
92120
}
93121
}
94122

123+
// ReviewResponseV2 will review the limits of a Request's response for AWS SDK V2.
124+
func (s ServiceLimiter) ReviewResponseV2(ctx context.Context, errorCode string) {
125+
switch errorCode {
126+
case "Throttling", "RequestLimitExceeded":
127+
if ol, ok := s.matchRequestV2(ctx); ok {
128+
ol.limiter.ResetTokens()
129+
}
130+
}
131+
}
132+
95133
func (s ServiceLimiter) matchRequest(r *request.Request) (*OperationLimiter, bool) {
96134
for _, ol := range s {
97135
match, err := ol.Match(r)
@@ -104,3 +142,37 @@ func (s ServiceLimiter) matchRequest(r *request.Request) (*OperationLimiter, boo
104142
}
105143
return nil, false
106144
}
145+
146+
// matchRequestV2 is used for matching request for AWS SDK V2.
147+
func (s ServiceLimiter) matchRequestV2(ctx context.Context) (*OperationLimiter, bool) {
148+
for _, ol := range s {
149+
match, err := ol.MatchV2(ctx)
150+
if err != nil {
151+
return nil, false
152+
}
153+
if match {
154+
return ol, true
155+
}
156+
}
157+
return nil, false
158+
}
159+
160+
// WithServiceLimiterMiddleware returns ServiceLimiter middleware stack for specified service name.
161+
func WithServiceLimiterMiddleware(limiter *ServiceLimiter) func(stack *middleware.Stack) error {
162+
return func(stack *middleware.Stack) error {
163+
// Inserts service Limiter middleware after RequestContext initialization.
164+
return stack.Finalize.Insert(getServiceLimiterMiddleware(limiter), "capa/RequestMetricContextMiddleware", middleware.After)
165+
}
166+
}
167+
168+
// getServiceLimiterMiddleware implements serviceLimiter middleware.
169+
func getServiceLimiterMiddleware(limiter *ServiceLimiter) middleware.FinalizeMiddleware {
170+
return middleware.FinalizeMiddlewareFunc("capa/ServiceLimiterMiddleware", func(ctx context.Context, input middleware.FinalizeInput, handler middleware.FinalizeHandler) (middleware.FinalizeOutput, middleware.Metadata, error) {
171+
limiter.LimitRequestV2(ctx)
172+
173+
out, metadata, err := handler.HandleFinalize(ctx, input)
174+
smithyErr := awserrors.ParseSmithyError(err)
175+
limiter.ReviewResponseV2(ctx, smithyErr.ErrorCode())
176+
return out, metadata, err
177+
})
178+
}

0 commit comments

Comments
 (0)