Skip to content

Commit eabb82f

Browse files
committed
fix: Handle rate limiting in validation controller (aws#7892)
1 parent 4ed03a6 commit eabb82f

File tree

4 files changed

+41
-6
lines changed

4 files changed

+41
-6
lines changed

pkg/cache/cache.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ const (
4545
// DiscoveredCapacityCacheTTL is the time to drop discovered resource capacity data per-instance type
4646
// if it is not updated by a node creation event or refreshed during controller reconciliation
4747
DiscoveredCapacityCacheTTL = 60 * 24 * time.Hour
48+
// ValidationTTL is time to check authorization errors with validation controller
49+
ValidationTTL = 10 * time.Minute
4850
)
4951

5052
const (

pkg/controllers/nodeclass/validation.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"sigs.k8s.io/karpenter/pkg/cloudprovider"
2828
"sigs.k8s.io/karpenter/pkg/scheduling"
2929

30+
"github.com/aws/aws-sdk-go-v2/aws"
3031
"github.com/aws/aws-sdk-go-v2/service/ec2"
3132
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
3233
corev1 "k8s.io/api/core/v1"
@@ -159,7 +160,13 @@ func (v *Validation) validateCreateFleetAuthorization(
159160
) (reason string, requeue bool, err error) {
160161
createFleetInput := instance.GetCreateFleetInput(nodeClass, karpv1.CapacityTypeOnDemand, tags, mockLaunchTemplateConfig())
161162
createFleetInput.DryRun = lo.ToPtr(true)
162-
if _, err := v.ec2api.CreateFleet(ctx, createFleetInput); awserrors.IgnoreDryRunError(err) != nil {
163+
// Adding NopRetryer to avoid aggressive retry when rate limited
164+
if _, err := v.ec2api.CreateFleet(ctx, createFleetInput, func(o *ec2.Options) {
165+
o.Retryer = aws.NopRetryer{}
166+
}); awserrors.IgnoreDryRunError(err) != nil {
167+
if awserrors.IsRateLimitedError(err) {
168+
return "", true, nil
169+
}
163170
if awserrors.IgnoreUnauthorizedOperationError(err) != nil {
164171
// Dry run should only ever return UnauthorizedOperation or DryRunOperation so if we receive any other error
165172
// it would be an unexpected state
@@ -182,7 +189,13 @@ func (v *Validation) validateCreateLaunchTemplateAuthorization(
182189
}
183190
createLaunchTemplateInput := launchtemplate.GetCreateLaunchTemplateInput(ctx, opts[0], corev1.IPv4Protocol, "")
184191
createLaunchTemplateInput.DryRun = lo.ToPtr(true)
185-
if _, err := v.ec2api.CreateLaunchTemplate(ctx, createLaunchTemplateInput); awserrors.IgnoreDryRunError(err) != nil {
192+
// Adding NopRetryer to avoid aggressive retry when rate limited
193+
if _, err := v.ec2api.CreateLaunchTemplate(ctx, createLaunchTemplateInput, func(o *ec2.Options) {
194+
o.Retryer = aws.NopRetryer{}
195+
}); awserrors.IgnoreDryRunError(err) != nil {
196+
if awserrors.IsRateLimitedError(err) {
197+
return "", true, nil
198+
}
186199
if awserrors.IgnoreUnauthorizedOperationError(err) != nil {
187200
// Dry run should only ever return UnauthorizedOperation or DryRunOperation so if we receive any other error
188201
// it would be an unexpected state
@@ -230,11 +243,13 @@ func (v *Validation) validateRunInstancesAuthorization(
230243
Tags: runInstancesInput.TagSpecifications[0].Tags,
231244
},
232245
)
233-
234-
if _, err = v.ec2api.RunInstances(ctx, runInstancesInput); awserrors.IgnoreDryRunError(err) != nil {
246+
// Adding NopRetryer to avoid aggressive retry when rate limited
247+
if _, err = v.ec2api.RunInstances(ctx, runInstancesInput, func(o *ec2.Options) {
248+
o.Retryer = aws.NopRetryer{}
249+
}); awserrors.IgnoreDryRunError(err) != nil {
235250
// If we get InstanceProfile NotFound, but we have a resolved instance profile in the status,
236251
// this means there is most likely an eventual consistency issue and we just need to requeue
237-
if awserrors.IsInstanceProfileNotFound(err) {
252+
if awserrors.IsInstanceProfileNotFound(err) || awserrors.IsRateLimitedError(err) {
238253
return "", true, nil
239254
}
240255
if awserrors.IgnoreUnauthorizedOperationError(err) != nil {

pkg/errors/errors.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
RunInstancesInvalidParameterValueCode = "InvalidParameterValue"
2929
DryRunOperationErrorCode = "DryRunOperation"
3030
UnauthorizedOperationErrorCode = "UnauthorizedOperation"
31+
RateLimitingErrorCode = "RequestLimitExceeded"
3132
)
3233

3334
var (
@@ -129,6 +130,23 @@ func IgnoreUnauthorizedOperationError(err error) error {
129130
return err
130131
}
131132

133+
func IsRateLimitedError(err error) bool {
134+
if err == nil {
135+
return false
136+
}
137+
if apiErr, ok := lo.ErrorsAs[smithy.APIError](err); ok {
138+
return apiErr.ErrorCode() == RateLimitingErrorCode
139+
}
140+
return false
141+
}
142+
143+
func IgnoreRateLimitedError(err error) error {
144+
if IsRateLimitedError(err) {
145+
return nil
146+
}
147+
return err
148+
}
149+
132150
// IsUnfulfillableCapacity returns true if the Fleet err means capacity is temporarily unavailable for launching. This
133151
// could be due to account limits, insufficient ec2 capacity, etc.
134152
func IsUnfulfillableCapacity(err ec2types.CreateFleetError) bool {

pkg/operator/operator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
144144
}
145145
unavailableOfferingsCache := awscache.NewUnavailableOfferings()
146146
ssmCache := cache.New(awscache.SSMCacheTTL, awscache.DefaultCleanupInterval)
147-
validationCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
147+
validationCache := cache.New(awscache.ValidationTTL, awscache.DefaultCleanupInterval)
148148

149149
subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval))
150150
securityGroupProvider := securitygroup.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))

0 commit comments

Comments
 (0)