Skip to content

Commit c8edfa2

Browse files
committed
Avoid using tag filters for EC2 API where possible
For very large clusters these tag filters are not efficient within the EC2 API and will result in rate limiting. Most of these queries have filters that are targeted narrowly enough that the elimination of the tags filter will not return significantly more data but will be executed more efficiently by the EC2 API. Additionally, some API wrappers did not support pagination despite the underlying API calls being paginated. This change adds pagination to prevent truncating the returned results.
1 parent 11611ee commit c8edfa2

File tree

4 files changed

+101
-36
lines changed

4 files changed

+101
-36
lines changed

staging/src/k8s.io/legacy-cloud-providers/aws/aws.go

Lines changed: 72 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -904,12 +904,28 @@ func (s *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*e
904904

905905
// Implements EC2.DescribeSecurityGroups
906906
func (s *awsSdkEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) {
907-
// Security groups are not paged
908-
response, err := s.ec2.DescribeSecurityGroups(request)
909-
if err != nil {
910-
return nil, fmt.Errorf("error listing AWS security groups: %q", err)
907+
// Security groups are paged
908+
results := []*ec2.SecurityGroup{}
909+
var nextToken *string
910+
requestTime := time.Now()
911+
for {
912+
response, err := s.ec2.DescribeSecurityGroups(request)
913+
if err != nil {
914+
recordAWSMetric("describe_security_groups", 0, err)
915+
return nil, fmt.Errorf("error listing AWS security groups: %q", err)
916+
}
917+
918+
results = append(results, response.SecurityGroups...)
919+
920+
nextToken = response.NextToken
921+
if aws.StringValue(nextToken) == "" {
922+
break
923+
}
924+
request.NextToken = nextToken
911925
}
912-
return response.SecurityGroups, nil
926+
timeTaken := time.Since(requestTime).Seconds()
927+
recordAWSMetric("describe_security_groups", timeTaken, nil)
928+
return results, nil
913929
}
914930

915931
func (s *awsSdkEC2) AttachVolume(request *ec2.AttachVolumeInput) (*ec2.VolumeAttachment, error) {
@@ -1034,12 +1050,27 @@ func (s *awsSdkEC2) CreateTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOut
10341050
}
10351051

10361052
func (s *awsSdkEC2) DescribeRouteTables(request *ec2.DescribeRouteTablesInput) ([]*ec2.RouteTable, error) {
1037-
// Not paged
1038-
response, err := s.ec2.DescribeRouteTables(request)
1039-
if err != nil {
1040-
return nil, fmt.Errorf("error listing AWS route tables: %q", err)
1053+
results := []*ec2.RouteTable{}
1054+
var nextToken *string
1055+
requestTime := time.Now()
1056+
for {
1057+
response, err := s.ec2.DescribeRouteTables(request)
1058+
if err != nil {
1059+
recordAWSMetric("describe_route_tables", 0, err)
1060+
return nil, fmt.Errorf("error listing AWS route tables: %q", err)
1061+
}
1062+
1063+
results = append(results, response.RouteTables...)
1064+
1065+
nextToken = response.NextToken
1066+
if aws.StringValue(nextToken) == "" {
1067+
break
1068+
}
1069+
request.NextToken = nextToken
10411070
}
1042-
return response.RouteTables, nil
1071+
timeTaken := time.Since(requestTime).Seconds()
1072+
recordAWSMetric("describe_route_tables", timeTaken, nil)
1073+
return results, nil
10431074
}
10441075

10451076
func (s *awsSdkEC2) CreateRoute(request *ec2.CreateRouteInput) (*ec2.CreateRouteOutput, error) {
@@ -1573,13 +1604,32 @@ func (c *Cloud) GetCandidateZonesForDynamicVolume() (sets.String, error) {
15731604
// TODO: Caching / expose v1.Nodes to the cloud provider?
15741605
// TODO: We could also query for subnets, I think
15751606

1576-
filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")}
1607+
// Note: It is more efficient to call the EC2 API twice with different tag
1608+
// filters than to call it once with a tag filter that results in a logical
1609+
// OR. For really large clusters the logical OR will result in EC2 API rate
1610+
// limiting.
1611+
instances := []*ec2.Instance{}
15771612

1578-
instances, err := c.describeInstances(filters)
1613+
baseFilters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")}
1614+
1615+
filters := c.tagging.addFilters(baseFilters)
1616+
di, err := c.describeInstances(filters)
15791617
if err != nil {
15801618
return nil, err
15811619
}
15821620

1621+
instances = append(instances, di...)
1622+
1623+
if c.tagging.usesLegacyTags {
1624+
filters = c.tagging.addLegacyFilters(baseFilters)
1625+
di, err = c.describeInstances(filters)
1626+
if err != nil {
1627+
return nil, err
1628+
}
1629+
1630+
instances = append(instances, di...)
1631+
}
1632+
15831633
if len(instances) == 0 {
15841634
return nil, fmt.Errorf("no instances returned")
15851635
}
@@ -3022,17 +3072,16 @@ func (c *Cloud) ensureSecurityGroup(name string, description string, additionalT
30223072
for {
30233073
attempt++
30243074

3025-
request := &ec2.DescribeSecurityGroupsInput{}
3026-
filters := []*ec2.Filter{
3027-
newEc2Filter("group-name", name),
3028-
newEc2Filter("vpc-id", c.vpcID),
3029-
}
30303075
// Note that we do _not_ add our tag filters; group-name + vpc-id is the EC2 primary key.
30313076
// However, we do check that it matches our tags.
30323077
// If it doesn't have any tags, we tag it; this is how we recover if we failed to tag before.
30333078
// If it has a different cluster's tags, that is an error.
30343079
// This shouldn't happen because name is expected to be globally unique (UUID derived)
3035-
request.Filters = filters
3080+
request := &ec2.DescribeSecurityGroupsInput{}
3081+
request.Filters = []*ec2.Filter{
3082+
newEc2Filter("group-name", name),
3083+
newEc2Filter("vpc-id", c.vpcID),
3084+
}
30363085

30373086
securityGroups, err := c.ec2.DescribeSecurityGroups(request)
30383087
if err != nil {
@@ -3108,8 +3157,7 @@ func findTag(tags []*ec2.Tag, key string) (string, bool) {
31083157
// However, in future this will likely be treated as an error.
31093158
func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) {
31103159
request := &ec2.DescribeSubnetsInput{}
3111-
filters := []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)}
3112-
request.Filters = c.tagging.addFilters(filters)
3160+
request.Filters = []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)}
31133161

31143162
subnets, err := c.ec2.DescribeSubnets(request)
31153163
if err != nil {
@@ -3131,8 +3179,7 @@ func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) {
31313179
klog.Warningf("No tagged subnets found; will fall-back to the current subnet only. This is likely to be an error in a future version of k8s.")
31323180

31333181
request = &ec2.DescribeSubnetsInput{}
3134-
filters = []*ec2.Filter{newEc2Filter("subnet-id", c.selfAWSInstance.subnetID)}
3135-
request.Filters = filters
3182+
request.Filters = []*ec2.Filter{newEc2Filter("subnet-id", c.selfAWSInstance.subnetID)}
31363183

31373184
subnets, err = c.ec2.DescribeSubnets(request)
31383185
if err != nil {
@@ -3888,7 +3935,6 @@ func findSecurityGroupForInstance(instance *ec2.Instance, taggedSecurityGroups m
38883935
// Return all the security groups that are tagged as being part of our cluster
38893936
func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) {
38903937
request := &ec2.DescribeSecurityGroupsInput{}
3891-
request.Filters = c.tagging.addFilters(nil)
38923938
groups, err := c.ec2.DescribeSecurityGroups(request)
38933939
if err != nil {
38943940
return nil, fmt.Errorf("error querying security groups: %q", err)
@@ -3937,10 +3983,9 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer
39373983
var actualGroups []*ec2.SecurityGroup
39383984
{
39393985
describeRequest := &ec2.DescribeSecurityGroupsInput{}
3940-
filters := []*ec2.Filter{
3986+
describeRequest.Filters = []*ec2.Filter{
39413987
newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID),
39423988
}
3943-
describeRequest.Filters = c.tagging.addFilters(filters)
39443989
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
39453990
if err != nil {
39463991
return fmt.Errorf("error querying security groups for ELB: %q", err)
@@ -4098,10 +4143,9 @@ func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName strin
40984143
{
40994144
// Server side filter
41004145
describeRequest := &ec2.DescribeSecurityGroupsInput{}
4101-
filters := []*ec2.Filter{
4146+
describeRequest.Filters = []*ec2.Filter{
41024147
newEc2Filter("ip-permission.protocol", "tcp"),
41034148
}
4104-
describeRequest.Filters = c.tagging.addFilters(filters)
41054149
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
41064150
if err != nil {
41074151
return fmt.Errorf("Error querying security groups for NLB: %q", err)
@@ -4229,10 +4273,9 @@ func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName strin
42294273
var loadBalancerSGs = aws.StringValueSlice(lb.SecurityGroups)
42304274

42314275
describeRequest := &ec2.DescribeSecurityGroupsInput{}
4232-
filters := []*ec2.Filter{
4276+
describeRequest.Filters = []*ec2.Filter{
42334277
newEc2Filter("group-id", loadBalancerSGs...),
42344278
}
4235-
describeRequest.Filters = c.tagging.addFilters(filters)
42364279
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
42374280
if err != nil {
42384281
return fmt.Errorf("error querying security groups for ELB: %q", err)
@@ -4444,7 +4487,6 @@ func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([
44444487

44454488
// TODO: Move to instanceCache
44464489
func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) {
4447-
filters = c.tagging.addFilters(filters)
44484490
request := &ec2.DescribeInstancesInput{
44494491
Filters: filters,
44504492
}

staging/src/k8s.io/legacy-cloud-providers/aws/aws_loadbalancer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -935,10 +935,10 @@ func (c *Cloud) updateInstanceSecurityGroupsForNLB(mappings []nlbPortMapping, in
935935
{
936936
// Server side filter
937937
describeRequest := &ec2.DescribeSecurityGroupsInput{}
938-
filters := []*ec2.Filter{
938+
describeRequest.Filters = []*ec2.Filter{
939939
newEc2Filter("ip-permission.protocol", "tcp"),
940+
newEc2Filter("vpc-id", c.vpcID),
940941
}
941-
describeRequest.Filters = c.tagging.addFilters(filters)
942942
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
943943
if err != nil {
944944
return fmt.Errorf("Error querying security groups for NLB: %q", err)

staging/src/k8s.io/legacy-cloud-providers/aws/aws_routes.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (c *Cloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) {
4242

4343
tables = response
4444
} else {
45-
request := &ec2.DescribeRouteTablesInput{Filters: c.tagging.addFilters(nil)}
45+
request := &ec2.DescribeRouteTablesInput{}
4646
response, err := c.ec2.DescribeRouteTables(request)
4747
if err != nil {
4848
return nil, err

staging/src/k8s.io/legacy-cloud-providers/aws/tags.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,32 @@ func (t *awsTagging) addFilters(filters []*ec2.Filter) []*ec2.Filter {
247247
}
248248
return filters
249249
}
250-
// For 1.6, we always recognize the legacy tag, for the 1.5 -> 1.6 upgrade
251-
// There are no "or" filters by key, so we look for both the legacy and new key, and then we have to post-filter
252-
f := newEc2Filter("tag-key", TagNameKubernetesClusterLegacy, t.clusterTagKey())
250+
251+
f := newEc2Filter("tag-key", t.clusterTagKey())
252+
253+
// We can't pass a zero-length Filters to AWS (it's an error)
254+
// So if we end up with no filters; we need to return nil
255+
filters = append(filters, f)
256+
return filters
257+
}
258+
259+
// Add additional filters, to match on our tags. This uses the tag for legacy
260+
// 1.5 -> 1.6 clusters and exists for backwards compatibility
261+
//
262+
// This lets us run multiple k8s clusters in a single EC2 AZ
263+
func (t *awsTagging) addLegacyFilters(filters []*ec2.Filter) []*ec2.Filter {
264+
// if there are no clusterID configured - no filtering by special tag names
265+
// should be applied to revert to legacy behaviour.
266+
if len(t.ClusterID) == 0 {
267+
if len(filters) == 0 {
268+
// We can't pass a zero-length Filters to AWS (it's an error)
269+
// So if we end up with no filters; just return nil
270+
return nil
271+
}
272+
return filters
273+
}
274+
275+
f := newEc2Filter(fmt.Sprintf("tag:%s", TagNameKubernetesClusterLegacy), t.ClusterID)
253276

254277
// We can't pass a zero-length Filters to AWS (it's an error)
255278
// So if we end up with no filters; we need to return nil

0 commit comments

Comments
 (0)