Skip to content

Commit bef3690

Browse files
committed
added cache for instances
update error log content added a boilerplate header change paramater value to specStr remove blank line check if specified the node groups change autoScalingGroups to autoScalingGroupCache
1 parent da455d5 commit bef3690

File tree

3 files changed

+191
-27
lines changed

3 files changed

+191
-27
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package huaweicloud
18+
19+
import (
20+
"sync"
21+
"time"
22+
23+
"k8s.io/apimachinery/pkg/util/wait"
24+
"k8s.io/klog/v2"
25+
)
26+
27+
type autoScalingGroupCache struct {
28+
registeredAsgs []*AutoScalingGroup
29+
instanceToAsg map[string]*AutoScalingGroup
30+
cacheMutex sync.Mutex
31+
instancesNotInManagedAsg map[string]struct{}
32+
}
33+
34+
func newAutoScalingGroupCache() *autoScalingGroupCache {
35+
registry := &autoScalingGroupCache{
36+
instanceToAsg: make(map[string]*AutoScalingGroup),
37+
instancesNotInManagedAsg: make(map[string]struct{}),
38+
}
39+
40+
return registry
41+
}
42+
43+
// Register asg in HuaweiCloud Manager.
44+
func (asg *autoScalingGroupCache) Register(autoScalingGroup *AutoScalingGroup) {
45+
asg.cacheMutex.Lock()
46+
defer asg.cacheMutex.Unlock()
47+
48+
asg.registeredAsgs = append(asg.registeredAsgs, autoScalingGroup)
49+
}
50+
51+
// FindForInstance returns AsgConfig of the given Instance
52+
func (asg *autoScalingGroupCache) FindForInstance(instanceId string, csm *cloudServiceManager) (*AutoScalingGroup, error) {
53+
asg.cacheMutex.Lock()
54+
defer asg.cacheMutex.Unlock()
55+
if config, found := asg.instanceToAsg[instanceId]; found {
56+
return config, nil
57+
}
58+
if _, found := asg.instancesNotInManagedAsg[instanceId]; found {
59+
return nil, nil
60+
}
61+
if err := asg.regenerateCache(csm); err != nil {
62+
return nil, err
63+
}
64+
if config, found := asg.instanceToAsg[instanceId]; found {
65+
return config, nil
66+
}
67+
// instance does not belong to any configured ASG
68+
asg.instancesNotInManagedAsg[instanceId] = struct{}{}
69+
return nil, nil
70+
}
71+
72+
func (asg *autoScalingGroupCache) generateCache(csm *cloudServiceManager) {
73+
go wait.Forever(func() {
74+
asg.cacheMutex.Lock()
75+
defer asg.cacheMutex.Unlock()
76+
if err := asg.regenerateCache(csm); err != nil {
77+
klog.Errorf("failed to do regenerating ASG cache,because of %s", err.Error())
78+
}
79+
}, time.Hour)
80+
}
81+
82+
func (asg *autoScalingGroupCache) regenerateCache(csm *cloudServiceManager) error {
83+
newCache := make(map[string]*AutoScalingGroup)
84+
85+
for _, registeredAsg := range asg.registeredAsgs {
86+
instances, err := csm.GetInstances(registeredAsg.groupID)
87+
if err != nil {
88+
return err
89+
}
90+
for _, instance := range instances {
91+
newCache[instance.Id] = registeredAsg
92+
}
93+
}
94+
95+
asg.instanceToAsg = newCache
96+
return nil
97+
}

cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud_cloud_provider.go

Lines changed: 73 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"k8s.io/apimachinery/pkg/api/resource"
2525
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
2626
"k8s.io/autoscaler/cluster-autoscaler/config"
27+
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
2728
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
2829
klog "k8s.io/klog/v2"
2930
)
@@ -54,6 +55,11 @@ type huaweicloudCloudProvider struct {
5455
}
5556

5657
func newCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) *huaweicloudCloudProvider {
58+
if do.AutoDiscoverySpecified() {
59+
klog.Errorf("only support static discovery scaling group in huaweicloud for now")
60+
return nil
61+
}
62+
5763
cloudConfig, err := readConf(opts.CloudConfig)
5864
if err != nil {
5965
klog.Errorf("failed to read cloud configuration. error: %v", err)
@@ -65,17 +71,24 @@ func newCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroup
6571
}
6672

6773
csm := newCloudServiceManager(cloudConfig)
68-
sgs, err := csm.ListScalingGroups()
69-
if err != nil {
70-
klog.Errorf("failed to list scaling groups. error: %v", err)
71-
return nil
72-
}
7374

74-
return &huaweicloudCloudProvider{
75+
hcp := &huaweicloudCloudProvider{
7576
cloudServiceManager: csm,
7677
resourceLimiter: rl,
77-
autoScalingGroup: sgs,
78+
autoScalingGroup: make([]AutoScalingGroup, 0),
79+
}
80+
81+
if len(do.NodeGroupSpecs) <= 0 {
82+
klog.Error("no auto scaling group specified")
83+
return nil
84+
}
85+
err = hcp.buildAsgs(do.NodeGroupSpecs)
86+
if err != nil {
87+
klog.Errorf("failed to build auto scaling groups. error: %v", err)
88+
return nil
7889
}
90+
91+
return hcp
7992
}
8093

8194
// Name returns the name of the cloud provider.
@@ -111,25 +124,7 @@ func (hcp *huaweicloudCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudpr
111124
return nil, fmt.Errorf("provider id missing from node: %s", node.Name)
112125
}
113126

114-
hcp.lock.RLock()
115-
defer hcp.lock.RUnlock()
116-
117-
for i := range hcp.autoScalingGroup {
118-
instances, err := hcp.autoScalingGroup[i].Nodes()
119-
if err != nil {
120-
klog.Warningf("failed to list instances from scaling group: %s, error: %v", hcp.autoScalingGroup[i].groupName, err)
121-
return nil, err
122-
}
123-
124-
for j := range instances {
125-
if instanceID == instances[j].Id {
126-
pinnedGroup := hcp.autoScalingGroup[i]
127-
return &pinnedGroup, nil
128-
}
129-
}
130-
}
131-
132-
return nil, fmt.Errorf("no node group found")
127+
return hcp.cloudServiceManager.GetAsgForInstance(instanceID)
133128
}
134129

135130
// Pricing returns pricing model for this cloud provider or error if not available. Not implemented.
@@ -176,6 +171,38 @@ func (hcp *huaweicloudCloudProvider) Refresh() error {
176171
return nil
177172
}
178173

174+
func (hcp *huaweicloudCloudProvider) buildAsgs(specs []string) error {
175+
asgs, err := hcp.cloudServiceManager.ListScalingGroups()
176+
if err != nil {
177+
klog.Errorf("failed to list scaling groups, because of %s", err.Error())
178+
return err
179+
}
180+
181+
for _, spec := range specs {
182+
if err := hcp.addNodeGroup(spec, asgs); err != nil {
183+
klog.Warningf("failed to add node group to huaweicloud provider with spec: %s", spec)
184+
return err
185+
}
186+
}
187+
188+
return nil
189+
}
190+
191+
func (hcp *huaweicloudCloudProvider) addNodeGroup(spec string, asgs []AutoScalingGroup) error {
192+
asg, err := buildAsgFromSpec(spec, asgs, hcp.cloudServiceManager)
193+
if err != nil {
194+
klog.Errorf("failed to build ASG from spec, because of %s", err.Error())
195+
return err
196+
}
197+
hcp.addAsg(asg)
198+
return nil
199+
}
200+
201+
func (hcp *huaweicloudCloudProvider) addAsg(asg *AutoScalingGroup) {
202+
hcp.autoScalingGroup = append(hcp.autoScalingGroup, *asg)
203+
hcp.cloudServiceManager.RegisterAsg(asg)
204+
}
205+
179206
// BuildHuaweiCloud is called by the autoscaler/cluster-autoscaler/builder to build a huaweicloud cloud provider.
180207
func BuildHuaweiCloud(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
181208
if len(opts.CloudConfig) == 0 {
@@ -184,3 +211,23 @@ func BuildHuaweiCloud(opts config.AutoscalingOptions, do cloudprovider.NodeGroup
184211

185212
return newCloudProvider(opts, do, rl)
186213
}
214+
215+
func buildAsgFromSpec(specStr string, asgs []AutoScalingGroup, manager CloudServiceManager) (*AutoScalingGroup, error) {
216+
spec, err := dynamic.SpecFromString(specStr, true)
217+
if err != nil {
218+
return nil, fmt.Errorf("failed to parse node group spec: %v", err)
219+
}
220+
221+
for _, asg := range asgs {
222+
if asg.groupName == spec.Name {
223+
return &AutoScalingGroup{
224+
cloudServiceManager: manager,
225+
groupName: asg.groupName,
226+
groupID: asg.groupID,
227+
minInstanceNumber: asg.minInstanceNumber,
228+
maxInstanceNumber: asg.maxInstanceNumber,
229+
}, nil
230+
}
231+
}
232+
return nil, fmt.Errorf("no auto scaling group found, spec: %s", spec.Name)
233+
}

cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud_service_manager.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ type AutoScalingService interface {
6262
// IncreaseSizeInstance wait until instance number is updated.
6363
IncreaseSizeInstance(groupID string, delta int) error
6464

65+
// GetAsgForInstance returns auto scaling group for the given instance.
66+
GetAsgForInstance(instanceID string) (*AutoScalingGroup, error)
67+
68+
// RegisterAsg registers auto scaling group to manager
69+
RegisterAsg(asg *AutoScalingGroup)
70+
6571
// DeleteScalingInstances is used to delete instances from auto scaling group by instanceIDs.
6672
DeleteScalingInstances(groupID string, instanceIds []string) error
6773

@@ -86,6 +92,7 @@ type cloudServiceManager struct {
8692
cloudConfig *CloudConfig
8793
getECSClientFunc func() *huaweicloudsdkecs.EcsClient
8894
getASClientFunc func() *huaweicloudsdkas.AsClient
95+
asgs *autoScalingGroupCache
8996
}
9097

9198
type asgTemplate struct {
@@ -99,11 +106,24 @@ type asgTemplate struct {
99106
}
100107

101108
func newCloudServiceManager(cloudConfig *CloudConfig) *cloudServiceManager {
102-
return &cloudServiceManager{
109+
csm := &cloudServiceManager{
103110
cloudConfig: cloudConfig,
104111
getECSClientFunc: cloudConfig.getECSClient,
105112
getASClientFunc: cloudConfig.getASClient,
113+
asgs: newAutoScalingGroupCache(),
106114
}
115+
116+
csm.asgs.generateCache(csm)
117+
118+
return csm
119+
}
120+
121+
func (csm *cloudServiceManager) GetAsgForInstance(instanceID string) (*AutoScalingGroup, error) {
122+
return csm.asgs.FindForInstance(instanceID, csm)
123+
}
124+
125+
func (csm *cloudServiceManager) RegisterAsg(asg *AutoScalingGroup) {
126+
csm.asgs.Register(asg)
107127
}
108128

109129
// DeleteServers deletes a group of server by ID.

0 commit comments

Comments
 (0)