Skip to content

Commit f5d958a

Browse files
authored
Merge pull request kubernetes#76749 from mcrute/ec2-rate-limit-fix
Avoid using tag filters for EC2 API where possible
2 parents 2a7eaa3 + c8edfa2 commit f5d958a

File tree

4 files changed

+101
-36
lines changed

4 files changed

+101
-36
lines changed

staging/src/k8s.io/legacy-cloud-providers/aws/aws.go

Lines changed: 72 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -904,12 +904,28 @@ func (s *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*e
904904

905905
// Implements EC2.DescribeSecurityGroups
906906
func (s *awsSdkEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) {
907-
// Security groups are not paged
908-
response, err := s.ec2.DescribeSecurityGroups(request)
909-
if err != nil {
910-
return nil, fmt.Errorf("error listing AWS security groups: %q", err)
907+
// Security groups are paged
908+
results := []*ec2.SecurityGroup{}
909+
var nextToken *string
910+
requestTime := time.Now()
911+
for {
912+
response, err := s.ec2.DescribeSecurityGroups(request)
913+
if err != nil {
914+
recordAWSMetric("describe_security_groups", 0, err)
915+
return nil, fmt.Errorf("error listing AWS security groups: %q", err)
916+
}
917+
918+
results = append(results, response.SecurityGroups...)
919+
920+
nextToken = response.NextToken
921+
if aws.StringValue(nextToken) == "" {
922+
break
923+
}
924+
request.NextToken = nextToken
911925
}
912-
return response.SecurityGroups, nil
926+
timeTaken := time.Since(requestTime).Seconds()
927+
recordAWSMetric("describe_security_groups", timeTaken, nil)
928+
return results, nil
913929
}
914930

915931
func (s *awsSdkEC2) AttachVolume(request *ec2.AttachVolumeInput) (*ec2.VolumeAttachment, error) {
@@ -1034,12 +1050,27 @@ func (s *awsSdkEC2) CreateTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOut
10341050
}
10351051

10361052
func (s *awsSdkEC2) DescribeRouteTables(request *ec2.DescribeRouteTablesInput) ([]*ec2.RouteTable, error) {
1037-
// Not paged
1038-
response, err := s.ec2.DescribeRouteTables(request)
1039-
if err != nil {
1040-
return nil, fmt.Errorf("error listing AWS route tables: %q", err)
1053+
results := []*ec2.RouteTable{}
1054+
var nextToken *string
1055+
requestTime := time.Now()
1056+
for {
1057+
response, err := s.ec2.DescribeRouteTables(request)
1058+
if err != nil {
1059+
recordAWSMetric("describe_route_tables", 0, err)
1060+
return nil, fmt.Errorf("error listing AWS route tables: %q", err)
1061+
}
1062+
1063+
results = append(results, response.RouteTables...)
1064+
1065+
nextToken = response.NextToken
1066+
if aws.StringValue(nextToken) == "" {
1067+
break
1068+
}
1069+
request.NextToken = nextToken
10411070
}
1042-
return response.RouteTables, nil
1071+
timeTaken := time.Since(requestTime).Seconds()
1072+
recordAWSMetric("describe_route_tables", timeTaken, nil)
1073+
return results, nil
10431074
}
10441075

10451076
func (s *awsSdkEC2) CreateRoute(request *ec2.CreateRouteInput) (*ec2.CreateRouteOutput, error) {
@@ -1573,13 +1604,32 @@ func (c *Cloud) GetCandidateZonesForDynamicVolume() (sets.String, error) {
15731604
// TODO: Caching / expose v1.Nodes to the cloud provider?
15741605
// TODO: We could also query for subnets, I think
15751606

1576-
filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")}
1607+
// Note: It is more efficient to call the EC2 API twice with different tag
1608+
// filters than to call it once with a tag filter that results in a logical
1609+
// OR. For really large clusters the logical OR will result in EC2 API rate
1610+
// limiting.
1611+
instances := []*ec2.Instance{}
15771612

1578-
instances, err := c.describeInstances(filters)
1613+
baseFilters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")}
1614+
1615+
filters := c.tagging.addFilters(baseFilters)
1616+
di, err := c.describeInstances(filters)
15791617
if err != nil {
15801618
return nil, err
15811619
}
15821620

1621+
instances = append(instances, di...)
1622+
1623+
if c.tagging.usesLegacyTags {
1624+
filters = c.tagging.addLegacyFilters(baseFilters)
1625+
di, err = c.describeInstances(filters)
1626+
if err != nil {
1627+
return nil, err
1628+
}
1629+
1630+
instances = append(instances, di...)
1631+
}
1632+
15831633
if len(instances) == 0 {
15841634
return nil, fmt.Errorf("no instances returned")
15851635
}
@@ -3022,17 +3072,16 @@ func (c *Cloud) ensureSecurityGroup(name string, description string, additionalT
30223072
for {
30233073
attempt++
30243074

3025-
request := &ec2.DescribeSecurityGroupsInput{}
3026-
filters := []*ec2.Filter{
3027-
newEc2Filter("group-name", name),
3028-
newEc2Filter("vpc-id", c.vpcID),
3029-
}
30303075
// Note that we do _not_ add our tag filters; group-name + vpc-id is the EC2 primary key.
30313076
// However, we do check that it matches our tags.
30323077
// If it doesn't have any tags, we tag it; this is how we recover if we failed to tag before.
30333078
// If it has a different cluster's tags, that is an error.
30343079
// This shouldn't happen because name is expected to be globally unique (UUID derived)
3035-
request.Filters = filters
3080+
request := &ec2.DescribeSecurityGroupsInput{}
3081+
request.Filters = []*ec2.Filter{
3082+
newEc2Filter("group-name", name),
3083+
newEc2Filter("vpc-id", c.vpcID),
3084+
}
30363085

30373086
securityGroups, err := c.ec2.DescribeSecurityGroups(request)
30383087
if err != nil {
@@ -3108,8 +3157,7 @@ func findTag(tags []*ec2.Tag, key string) (string, bool) {
31083157
// However, in future this will likely be treated as an error.
31093158
func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) {
31103159
request := &ec2.DescribeSubnetsInput{}
3111-
filters := []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)}
3112-
request.Filters = c.tagging.addFilters(filters)
3160+
request.Filters = []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)}
31133161

31143162
subnets, err := c.ec2.DescribeSubnets(request)
31153163
if err != nil {
@@ -3131,8 +3179,7 @@ func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) {
31313179
klog.Warningf("No tagged subnets found; will fall-back to the current subnet only. This is likely to be an error in a future version of k8s.")
31323180

31333181
request = &ec2.DescribeSubnetsInput{}
3134-
filters = []*ec2.Filter{newEc2Filter("subnet-id", c.selfAWSInstance.subnetID)}
3135-
request.Filters = filters
3182+
request.Filters = []*ec2.Filter{newEc2Filter("subnet-id", c.selfAWSInstance.subnetID)}
31363183

31373184
subnets, err = c.ec2.DescribeSubnets(request)
31383185
if err != nil {
@@ -3888,7 +3935,6 @@ func findSecurityGroupForInstance(instance *ec2.Instance, taggedSecurityGroups m
38883935
// Return all the security groups that are tagged as being part of our cluster
38893936
func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) {
38903937
request := &ec2.DescribeSecurityGroupsInput{}
3891-
request.Filters = c.tagging.addFilters(nil)
38923938
groups, err := c.ec2.DescribeSecurityGroups(request)
38933939
if err != nil {
38943940
return nil, fmt.Errorf("error querying security groups: %q", err)
@@ -3937,10 +3983,9 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer
39373983
var actualGroups []*ec2.SecurityGroup
39383984
{
39393985
describeRequest := &ec2.DescribeSecurityGroupsInput{}
3940-
filters := []*ec2.Filter{
3986+
describeRequest.Filters = []*ec2.Filter{
39413987
newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID),
39423988
}
3943-
describeRequest.Filters = c.tagging.addFilters(filters)
39443989
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
39453990
if err != nil {
39463991
return fmt.Errorf("error querying security groups for ELB: %q", err)
@@ -4098,10 +4143,9 @@ func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName strin
40984143
{
40994144
// Server side filter
41004145
describeRequest := &ec2.DescribeSecurityGroupsInput{}
4101-
filters := []*ec2.Filter{
4146+
describeRequest.Filters = []*ec2.Filter{
41024147
newEc2Filter("ip-permission.protocol", "tcp"),
41034148
}
4104-
describeRequest.Filters = c.tagging.addFilters(filters)
41054149
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
41064150
if err != nil {
41074151
return fmt.Errorf("Error querying security groups for NLB: %q", err)
@@ -4229,10 +4273,9 @@ func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName strin
42294273
var loadBalancerSGs = aws.StringValueSlice(lb.SecurityGroups)
42304274

42314275
describeRequest := &ec2.DescribeSecurityGroupsInput{}
4232-
filters := []*ec2.Filter{
4276+
describeRequest.Filters = []*ec2.Filter{
42334277
newEc2Filter("group-id", loadBalancerSGs...),
42344278
}
4235-
describeRequest.Filters = c.tagging.addFilters(filters)
42364279
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
42374280
if err != nil {
42384281
return fmt.Errorf("error querying security groups for ELB: %q", err)
@@ -4444,7 +4487,6 @@ func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([
44444487

44454488
// TODO: Move to instanceCache
44464489
func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) {
4447-
filters = c.tagging.addFilters(filters)
44484490
request := &ec2.DescribeInstancesInput{
44494491
Filters: filters,
44504492
}

staging/src/k8s.io/legacy-cloud-providers/aws/aws_loadbalancer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -935,10 +935,10 @@ func (c *Cloud) updateInstanceSecurityGroupsForNLB(mappings []nlbPortMapping, in
935935
{
936936
// Server side filter
937937
describeRequest := &ec2.DescribeSecurityGroupsInput{}
938-
filters := []*ec2.Filter{
938+
describeRequest.Filters = []*ec2.Filter{
939939
newEc2Filter("ip-permission.protocol", "tcp"),
940+
newEc2Filter("vpc-id", c.vpcID),
940941
}
941-
describeRequest.Filters = c.tagging.addFilters(filters)
942942
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
943943
if err != nil {
944944
return fmt.Errorf("Error querying security groups for NLB: %q", err)

staging/src/k8s.io/legacy-cloud-providers/aws/aws_routes.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (c *Cloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) {
4242

4343
tables = response
4444
} else {
45-
request := &ec2.DescribeRouteTablesInput{Filters: c.tagging.addFilters(nil)}
45+
request := &ec2.DescribeRouteTablesInput{}
4646
response, err := c.ec2.DescribeRouteTables(request)
4747
if err != nil {
4848
return nil, err

staging/src/k8s.io/legacy-cloud-providers/aws/tags.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,32 @@ func (t *awsTagging) addFilters(filters []*ec2.Filter) []*ec2.Filter {
247247
}
248248
return filters
249249
}
250-
// For 1.6, we always recognize the legacy tag, for the 1.5 -> 1.6 upgrade
251-
// There are no "or" filters by key, so we look for both the legacy and new key, and then we have to post-filter
252-
f := newEc2Filter("tag-key", TagNameKubernetesClusterLegacy, t.clusterTagKey())
250+
251+
f := newEc2Filter("tag-key", t.clusterTagKey())
252+
253+
// We can't pass a zero-length Filters to AWS (it's an error)
254+
// So if we end up with no filters; we need to return nil
255+
filters = append(filters, f)
256+
return filters
257+
}
258+
259+
// Add additional filters, to match on our tags. This uses the tag for legacy
260+
// 1.5 -> 1.6 clusters and exists for backwards compatibility
261+
//
262+
// This lets us run multiple k8s clusters in a single EC2 AZ
263+
func (t *awsTagging) addLegacyFilters(filters []*ec2.Filter) []*ec2.Filter {
264+
// if there are no clusterID configured - no filtering by special tag names
265+
// should be applied to revert to legacy behaviour.
266+
if len(t.ClusterID) == 0 {
267+
if len(filters) == 0 {
268+
// We can't pass a zero-length Filters to AWS (it's an error)
269+
// So if we end up with no filters; just return nil
270+
return nil
271+
}
272+
return filters
273+
}
274+
275+
f := newEc2Filter(fmt.Sprintf("tag:%s", TagNameKubernetesClusterLegacy), t.ClusterID)
253276

254277
// We can't pass a zero-length Filters to AWS (it's an error)
255278
// So if we end up with no filters; we need to return nil

0 commit comments

Comments
 (0)