Skip to content

Commit 7af6ee1

Browse files
authored
Merge pull request kubernetes#2840 from DataDog/JulienBalestra/batch-cache-ttl
cluster-autoscaler/aws: batch launch config query and ttl cache
2 parents b19ef3d + 716836a commit 7af6ee1

File tree

6 files changed

+156
-19
lines changed

6 files changed

+156
-19
lines changed

cluster-autoscaler/cloudprovider/aws/auto_scaling.go

Lines changed: 131 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,23 @@ package aws
1818

1919
import (
2020
"fmt"
21+
"sync"
22+
"time"
2123

2224
"github.com/aws/aws-sdk-go/aws"
2325
"github.com/aws/aws-sdk-go/service/autoscaling"
26+
"k8s.io/apimachinery/pkg/util/clock"
27+
"k8s.io/apimachinery/pkg/util/rand"
28+
"k8s.io/client-go/tools/cache"
2429
"k8s.io/klog"
2530
)
2631

32+
const (
33+
launchConfigurationCachedTTL = time.Minute * 20
34+
cacheMinTTL = 120
35+
cacheMaxTTL = 600
36+
)
37+
2738
// autoScaling is the interface represents a specific aspect of the auto-scaling service provided by AWS SDK for use in CA
2839
type autoScaling interface {
2940
DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error
@@ -36,30 +47,94 @@ type autoScaling interface {
3647
// autoScalingWrapper provides several utility methods over the auto-scaling service provided by AWS SDK
3748
type autoScalingWrapper struct {
3849
autoScaling
39-
launchConfigurationInstanceTypeCache map[string]string
50+
launchConfigurationInstanceTypeCache *expirationStore
4051
}
4152

42-
func (m autoScalingWrapper) getInstanceTypeByLCName(name string) (string, error) {
43-
if instanceType, found := m.launchConfigurationInstanceTypeCache[name]; found {
44-
return instanceType, nil
53+
// expirationStore cache the launch configuration with their instance type.
54+
// The store expires its keys based on a TTL. This TTL can have a jitter applied to it.
55+
// This allows to get a better repartition of the AWS queries.
56+
type expirationStore struct {
57+
cache.Store
58+
jitterClock *jitterClock
59+
}
60+
61+
type instanceTypeCachedObject struct {
62+
name string
63+
instanceType string
64+
}
65+
66+
type jitterClock struct {
67+
clock.Clock
68+
69+
jitter bool
70+
sync.RWMutex
71+
}
72+
73+
func newLaunchConfigurationInstanceTypeCache() *expirationStore {
74+
jc := &jitterClock{}
75+
return &expirationStore{
76+
cache.NewExpirationStore(func(obj interface{}) (s string, e error) {
77+
return obj.(instanceTypeCachedObject).name, nil
78+
}, &cache.TTLPolicy{
79+
TTL: launchConfigurationCachedTTL,
80+
Clock: jc,
81+
}),
82+
jc,
83+
}
84+
}
85+
86+
func (c *jitterClock) Since(ts time.Time) time.Duration {
87+
since := time.Since(ts)
88+
c.RLock()
89+
defer c.RUnlock()
90+
if c.jitter {
91+
return since + (time.Second * time.Duration(rand.IntnRange(cacheMinTTL, cacheMaxTTL)))
92+
}
93+
return since
94+
}
95+
96+
func (m autoScalingWrapper) getInstanceTypeByLCNames(launchConfigToQuery []*string) ([]*autoscaling.LaunchConfiguration, error) {
97+
var launchConfigurations []*autoscaling.LaunchConfiguration
98+
99+
for i := 0; i < len(launchConfigToQuery); i += 50 {
100+
end := i + 50
101+
102+
if end > len(launchConfigToQuery) {
103+
end = len(launchConfigToQuery)
104+
}
105+
params := &autoscaling.DescribeLaunchConfigurationsInput{
106+
LaunchConfigurationNames: launchConfigToQuery[i:end],
107+
MaxRecords: aws.Int64(50),
108+
}
109+
r, err := m.DescribeLaunchConfigurations(params)
110+
if err != nil {
111+
return nil, err
112+
}
113+
launchConfigurations = append(launchConfigurations, r.LaunchConfigurations...)
114+
for _, lc := range r.LaunchConfigurations {
115+
_ = m.launchConfigurationInstanceTypeCache.Add(instanceTypeCachedObject{
116+
name: *lc.LaunchConfigurationName,
117+
instanceType: *lc.InstanceType,
118+
})
119+
}
45120
}
121+
return launchConfigurations, nil
122+
}
46123

47-
params := &autoscaling.DescribeLaunchConfigurationsInput{
48-
LaunchConfigurationNames: []*string{aws.String(name)},
49-
MaxRecords: aws.Int64(1),
124+
func (m autoScalingWrapper) getInstanceTypeByLCName(name string) (string, error) {
125+
if obj, found, _ := m.launchConfigurationInstanceTypeCache.GetByKey(name); found {
126+
return obj.(instanceTypeCachedObject).instanceType, nil
50127
}
51-
launchConfigurations, err := m.DescribeLaunchConfigurations(params)
128+
129+
launchConfigs, err := m.getInstanceTypeByLCNames([]*string{aws.String(name)})
52130
if err != nil {
53-
klog.V(4).Infof("Failed LaunchConfiguration info request for %s: %v", name, err)
131+
klog.Errorf("Failed to query the launch configuration %s to get the instance type: %v", name, err)
54132
return "", err
55133
}
56-
if len(launchConfigurations.LaunchConfigurations) < 1 {
134+
if len(launchConfigs) < 1 || launchConfigs[0].InstanceType == nil {
57135
return "", fmt.Errorf("unable to get first LaunchConfiguration for %s", name)
58136
}
59-
60-
instanceType := *launchConfigurations.LaunchConfigurations[0].InstanceType
61-
m.launchConfigurationInstanceTypeCache[name] = instanceType
62-
return instanceType, nil
137+
return *launchConfigs[0].InstanceType, nil
63138
}
64139

65140
func (m *autoScalingWrapper) getAutoscalingGroupsByNames(names []string) ([]*autoscaling.Group, error) {
@@ -94,6 +169,48 @@ func (m *autoScalingWrapper) getAutoscalingGroupsByNames(names []string) ([]*aut
94169
return asgs, nil
95170
}
96171

172+
func (m autoScalingWrapper) populateLaunchConfigurationInstanceTypeCache(autoscalingGroups []*autoscaling.Group) error {
173+
var launchConfigToQuery []*string
174+
175+
m.launchConfigurationInstanceTypeCache.jitterClock.Lock()
176+
m.launchConfigurationInstanceTypeCache.jitterClock.jitter = true
177+
m.launchConfigurationInstanceTypeCache.jitterClock.Unlock()
178+
for _, asg := range autoscalingGroups {
179+
if asg == nil {
180+
continue
181+
}
182+
if asg.LaunchConfigurationName == nil {
183+
continue
184+
}
185+
_, found, _ := m.launchConfigurationInstanceTypeCache.GetByKey(*asg.LaunchConfigurationName)
186+
if found {
187+
continue
188+
}
189+
launchConfigToQuery = append(launchConfigToQuery, asg.LaunchConfigurationName)
190+
}
191+
m.launchConfigurationInstanceTypeCache.jitterClock.Lock()
192+
m.launchConfigurationInstanceTypeCache.jitterClock.jitter = false
193+
m.launchConfigurationInstanceTypeCache.jitterClock.Unlock()
194+
195+
// List expire old entries
196+
_ = m.launchConfigurationInstanceTypeCache.List()
197+
198+
if len(launchConfigToQuery) == 0 {
199+
klog.V(4).Infof("%d launch configurations already in cache", len(autoscalingGroups))
200+
return nil
201+
}
202+
klog.V(4).Infof("%d launch configurations to query", len(launchConfigToQuery))
203+
204+
_, err := m.getInstanceTypeByLCNames(launchConfigToQuery)
205+
if err != nil {
206+
klog.Errorf("Failed to query %d launch configurations", len(launchConfigToQuery))
207+
return err
208+
}
209+
210+
klog.V(4).Infof("Successfully query %d launch configurations", len(launchConfigToQuery))
211+
return nil
212+
}
213+
97214
func (m *autoScalingWrapper) getAutoscalingGroupNamesByTags(kvs map[string]string) ([]string, error) {
98215
// DescribeTags does an OR query when multiple filters on different tags are
99216
// specified. In other words, DescribeTags returns [asg1, asg1] for keys

cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,11 @@ func (m *asgCache) regenerate() error {
354354
return err
355355
}
356356

357+
err = m.service.populateLaunchConfigurationInstanceTypeCache(groups)
358+
if err != nil {
359+
klog.Warningf("Failed to fully populate all launchConfigurations: %v", err)
360+
}
361+
357362
// If currently any ASG has more Desired than running Instances, introduce placeholders
358363
// for the instances to come up. This is required to track Desired instances that
359364
// will never come up, like with Spot Request that can't be fulfilled

cluster-autoscaler/cloudprovider/aws/auto_scaling_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/aws/aws-sdk-go/service/autoscaling"
2525
"github.com/stretchr/testify/assert"
2626
"github.com/stretchr/testify/mock"
27+
"github.com/stretchr/testify/require"
2728
)
2829

2930
func TestMoreThen50Groups(t *testing.T) {
@@ -68,3 +69,16 @@ func TestMoreThen50Groups(t *testing.T) {
6869
assert.Equal(t, *asgs[0].AutoScalingGroupName, "asg-1")
6970
assert.Equal(t, *asgs[1].AutoScalingGroupName, "asg-2")
7071
}
72+
73+
func TestLaunchConfigurationCache(t *testing.T) {
74+
c := newLaunchConfigurationInstanceTypeCache()
75+
err := c.Add(instanceTypeCachedObject{
76+
name: "123",
77+
instanceType: "t2.medium",
78+
})
79+
require.NoError(t, err)
80+
obj, ok, err := c.GetByKey("123")
81+
require.NoError(t, err)
82+
require.True(t, ok)
83+
require.Equal(t, "t2.medium", obj.(instanceTypeCachedObject).instanceType)
84+
}

cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (e *EC2Mock) DescribeLaunchTemplateVersions(i *ec2.DescribeLaunchTemplateVe
6666
return args.Get(0).(*ec2.DescribeLaunchTemplateVersionsOutput), nil
6767
}
6868

69-
var testService = autoScalingWrapper{&AutoScalingMock{}, map[string]string{}}
69+
var testService = autoScalingWrapper{&AutoScalingMock{}, newLaunchConfigurationInstanceTypeCache()}
7070

7171
var testAwsManager = &AwsManager{
7272
asgCache: &asgCache{
@@ -80,7 +80,7 @@ var testAwsManager = &AwsManager{
8080
}
8181

8282
func newTestAwsManagerWithService(service autoScaling, autoDiscoverySpecs []asgAutoDiscoveryConfig) *AwsManager {
83-
wrapper := autoScalingWrapper{service, map[string]string{}}
83+
wrapper := autoScalingWrapper{service, newLaunchConfigurationInstanceTypeCache()}
8484
return &AwsManager{
8585
autoScalingService: wrapper,
8686
asgCache: &asgCache{

cluster-autoscaler/cloudprovider/aws/aws_manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ func createAWSManagerInternal(
196196
}
197197

198198
if autoScalingService == nil {
199-
autoScalingService = &autoScalingWrapper{autoscaling.New(sess), map[string]string{}}
199+
c := newLaunchConfigurationInstanceTypeCache()
200+
autoScalingService = &autoScalingWrapper{autoscaling.New(sess), c}
200201
}
201202

202203
if ec2Service == nil {

cluster-autoscaler/cloudprovider/aws/aws_manager_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func TestFetchExplicitAsgs(t *testing.T) {
235235
defer resetAWSRegion(os.LookupEnv("AWS_REGION"))
236236
os.Setenv("AWS_REGION", "fanghorn")
237237
// fetchExplicitASGs is called at manager creation time.
238-
m, err := createAWSManagerInternal(nil, do, &autoScalingWrapper{s, map[string]string{}}, nil)
238+
m, err := createAWSManagerInternal(nil, do, &autoScalingWrapper{s, newLaunchConfigurationInstanceTypeCache()}, nil)
239239
assert.NoError(t, err)
240240

241241
asgs := m.asgCache.Get()
@@ -469,7 +469,7 @@ func TestFetchAutoAsgs(t *testing.T) {
469469
defer resetAWSRegion(os.LookupEnv("AWS_REGION"))
470470
os.Setenv("AWS_REGION", "fanghorn")
471471
// fetchAutoASGs is called at manager creation time, via forceRefresh
472-
m, err := createAWSManagerInternal(nil, do, &autoScalingWrapper{s, map[string]string{}}, nil)
472+
m, err := createAWSManagerInternal(nil, do, &autoScalingWrapper{s, newLaunchConfigurationInstanceTypeCache()}, nil)
473473
assert.NoError(t, err)
474474

475475
asgs := m.asgCache.Get()

0 commit comments

Comments
 (0)