Skip to content

Commit 9a02fad

Browse files
committed
Migrate ServiceLimiters to AWS SDK V2
Signed-off-by: Pankaj Walke <[email protected]>
1 parent 4b2f770 commit 9a02fad

File tree

1 file changed

+73
-0
lines changed

1 file changed

+73
-0
lines changed

pkg/cloud/throttle/throttle.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ limitations under the License.
1818
package throttle
1919

2020
import (
21+
"context"
2122
"regexp"
2223
"strings"
2324

25+
awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware"
2426
"github.com/aws/aws-sdk-go/aws/request"
27+
"github.com/aws/smithy-go/middleware"
2528

2629
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/cloud/awserrors"
2730
"sigs.k8s.io/cluster-api-provider-aws/v2/pkg/internal/rate"
@@ -52,6 +55,11 @@ func (o *OperationLimiter) Wait(r *request.Request) error {
5255
return o.getLimiter().Wait(r.Context())
5356
}
5457

58+
// WaitV2 will wait on a request for AWS SDK V2.
59+
func (o *OperationLimiter) WaitV2(ctx context.Context) error {
60+
return o.getLimiter().Wait(ctx)
61+
}
62+
5563
// Match will match a request.
5664
func (o *OperationLimiter) Match(r *request.Request) (bool, error) {
5765
if o.regexp == nil {
@@ -64,13 +72,33 @@ func (o *OperationLimiter) Match(r *request.Request) (bool, error) {
6472
return o.regexp.MatchString(r.Operation.Name), nil
6573
}
6674

75+
// MatchV2 will match a request for AWS SDK V2.
76+
func (o *OperationLimiter) MatchV2(ctx context.Context) (bool, error) {
77+
if o.regexp == nil {
78+
var err error
79+
o.regexp, err = regexp.Compile("^" + o.Operation)
80+
if err != nil {
81+
return false, err
82+
}
83+
}
84+
opName := awsmiddleware.GetOperationName(ctx)
85+
return o.regexp.MatchString(opName), nil
86+
}
87+
6788
// LimitRequest will limit a request.
6889
func (s ServiceLimiter) LimitRequest(r *request.Request) {
6990
if ol, ok := s.matchRequest(r); ok {
7091
_ = ol.Wait(r)
7192
}
7293
}
7394

95+
// LimitRequestV2 will limit a request for AWS SDK V2.
96+
func (s ServiceLimiter) LimitRequestV2(ctx context.Context) {
97+
if ol, ok := s.matchRequestV2(ctx); ok {
98+
_ = ol.WaitV2(ctx)
99+
}
100+
}
101+
74102
func (o *OperationLimiter) getLimiter() *rate.Limiter {
75103
if o.limiter == nil {
76104
o.limiter = rate.NewLimiter(o.RefillRate, o.Burst)
@@ -92,6 +120,17 @@ func (s ServiceLimiter) ReviewResponse(r *request.Request) {
92120
}
93121
}
94122

123+
// ReviewResponseV2 will review the limits of a Request's response for AWS SDK V2.
124+
func (s ServiceLimiter) ReviewResponseV2(ctx context.Context, errorCode string) {
125+
126+
switch errorCode {
127+
case "Throttling", "RequestLimitExceeded":
128+
if ol, ok := s.matchRequestV2(ctx); ok {
129+
ol.limiter.ResetTokens()
130+
}
131+
}
132+
}
133+
95134
func (s ServiceLimiter) matchRequest(r *request.Request) (*OperationLimiter, bool) {
96135
for _, ol := range s {
97136
match, err := ol.Match(r)
@@ -104,3 +143,37 @@ func (s ServiceLimiter) matchRequest(r *request.Request) (*OperationLimiter, boo
104143
}
105144
return nil, false
106145
}
146+
147+
// matchRequestV2 is used for matching request for AWS SDK V2.
148+
func (s ServiceLimiter) matchRequestV2(ctx context.Context) (*OperationLimiter, bool) {
149+
for _, ol := range s {
150+
match, err := ol.MatchV2(ctx)
151+
if err != nil {
152+
return nil, false
153+
}
154+
if match {
155+
return ol, true
156+
}
157+
}
158+
return nil, false
159+
}
160+
161+
// WithServiceLimiterMiddleware returns ServiceLimiter middleware stack for specified service name.
162+
func WithServiceLimiterMiddleware(limiter *ServiceLimiter) func(stack *middleware.Stack) error {
163+
return func(stack *middleware.Stack) error {
164+
// Inserts service Limiter middleware after RequestContext initialization.
165+
return stack.Finalize.Insert(getServiceLimiterMiddleware(limiter), "capa/RequestMetricContextMiddleware", middleware.After)
166+
}
167+
}
168+
169+
// getServiceLimiterMiddleware implements serviceLimiter middleware.
170+
func getServiceLimiterMiddleware(limiter *ServiceLimiter) middleware.FinalizeMiddleware {
171+
return middleware.FinalizeMiddlewareFunc("capa/ServiceLimiterMiddleware", func(ctx context.Context, input middleware.FinalizeInput, handler middleware.FinalizeHandler) (middleware.FinalizeOutput, middleware.Metadata, error) {
172+
limiter.LimitRequestV2(ctx)
173+
174+
out, metadata, err := handler.HandleFinalize(ctx, input)
175+
smithyErr := awserrors.ParseSmithyError(err)
176+
limiter.ReviewResponseV2(ctx, smithyErr.ErrorCode())
177+
return out, metadata, err
178+
})
179+
}

0 commit comments

Comments
 (0)