Skip to content

Commit 769fb07

Browse files
authored
Merge pull request #453 from l1b0k/update_12
several bug fix and improvement
2 parents 5b69181 + 3511e26 commit 769fb07

File tree

21 files changed

+482
-415
lines changed

21 files changed

+482
-415
lines changed

cmd/terway-controlplane/terway-controlplane.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import (
2121
"math/rand"
2222
"time"
2323

24-
"github.com/AliyunContainerService/terway/pkg/aliyun/client"
24+
aliyun "github.com/AliyunContainerService/terway/pkg/aliyun/client"
2525
"github.com/AliyunContainerService/terway/pkg/aliyun/credential"
2626
"github.com/AliyunContainerService/terway/pkg/apis/crds"
2727
networkv1beta1 "github.com/AliyunContainerService/terway/pkg/apis/network.alibabacloud.com/v1beta1"
28+
"github.com/AliyunContainerService/terway/pkg/backoff"
2829
"github.com/AliyunContainerService/terway/pkg/cert"
2930
register "github.com/AliyunContainerService/terway/pkg/controller"
3031
_ "github.com/AliyunContainerService/terway/pkg/controller/all"
@@ -35,7 +36,6 @@ import (
3536
"github.com/AliyunContainerService/terway/pkg/utils"
3637
"github.com/AliyunContainerService/terway/pkg/version"
3738
"github.com/AliyunContainerService/terway/types/controlplane"
38-
3939
"k8s.io/apimachinery/pkg/runtime"
4040
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
4141
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
@@ -60,14 +60,25 @@ func init() {
6060

6161
func main() {
6262
rand.Seed(time.Now().UnixNano())
63+
var (
64+
configFilePath string
65+
credentialFilePath string
66+
)
67+
flag.StringVar(&configFilePath, "config", "/etc/config/ctrl-config.yaml", "config file for controlplane")
68+
flag.StringVar(&credentialFilePath, "credential", "/etc/credential/ctrl-secret.yaml", "secret file for controlplane")
69+
6370
flag.Parse()
6471

6572
ctrl.SetLogger(klogr.New())
6673
log.Info(version.Version)
6774

68-
cfg := controlplane.GetConfig()
69-
log.Info("using config", "config", cfg)
75+
ctx := ctrl.SetupSignalHandler()
7076

77+
cfg, err := controlplane.ParseAndValidate(configFilePath, credentialFilePath)
78+
if err != nil {
79+
panic(err)
80+
}
81+
backoff.OverrideBackoff(cfg.BackoffOverride)
7182
utils.SetStsKinds(cfg.CustomStatefulWorkloadKinds)
7283

7384
restConfig := ctrl.GetConfigOrDie()
@@ -76,7 +87,9 @@ func main() {
7687
restConfig.UserAgent = version.UA
7788
utils.RegisterClients(restConfig)
7889

79-
err := crds.RegisterCRDs()
90+
log.Info("using config", "config", cfg)
91+
92+
err = crds.RegisterCRDs()
8093
if err != nil {
8194
panic(err)
8295
}
@@ -91,13 +104,11 @@ func main() {
91104
panic(err)
92105
}
93106

94-
aliyunClient, err := client.New(clientSet, flowcontrol.NewTokenBucketRateLimiter(cfg.ReadOnlyQPS, cfg.ReadOnlyBurst), flowcontrol.NewTokenBucketRateLimiter(cfg.MutatingQPS, cfg.MutatingBurst))
107+
aliyunClient, err := aliyun.New(clientSet, flowcontrol.NewTokenBucketRateLimiter(cfg.ReadOnlyQPS, cfg.ReadOnlyBurst), flowcontrol.NewTokenBucketRateLimiter(cfg.MutatingQPS, cfg.MutatingBurst))
95108
if err != nil {
96109
panic(err)
97110
}
98111

99-
ctx := ctrl.SetupSignalHandler()
100-
101112
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
102113
Scheme: scheme,
103114
HealthProbeBindAddress: cfg.HealthzBindAddress,

daemon/daemon.go

Lines changed: 17 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
podENITypes "github.com/AliyunContainerService/terway/pkg/apis/network.alibabacloud.com/v1beta1"
2020
"github.com/AliyunContainerService/terway/pkg/backoff"
2121
terwayIP "github.com/AliyunContainerService/terway/pkg/ip"
22-
"github.com/AliyunContainerService/terway/pkg/ipam"
2322
"github.com/AliyunContainerService/terway/pkg/link"
2423
"github.com/AliyunContainerService/terway/pkg/logger"
2524
"github.com/AliyunContainerService/terway/pkg/metric"
@@ -29,6 +28,7 @@ import (
2928
"github.com/AliyunContainerService/terway/pkg/utils"
3029
"github.com/AliyunContainerService/terway/rpc"
3130
"github.com/AliyunContainerService/terway/types"
31+
"github.com/AliyunContainerService/terway/types/daemon"
3232

3333
"github.com/containernetworking/cni/libcni"
3434
containertypes "github.com/containernetworking/cni/pkg/types"
@@ -265,6 +265,9 @@ func (n *networkService) AllocIP(ctx context.Context, r *rpc.AllocIPRequest) (*r
265265
if netConfig.IfName != IfEth0 && netConfig.IfName != "" {
266266
continue
267267
}
268+
if netConfig.BasicInfo == nil || netConfig.BasicInfo.PodIP == nil {
269+
continue
270+
}
268271
var ips []string
269272
if netConfig.BasicInfo.PodIP.IPv4 != "" {
270273
ips = append(ips, netConfig.BasicInfo.PodIP.IPv4)
@@ -491,6 +494,14 @@ func (n *networkService) ReleaseIP(ctx context.Context, r *rpc.ReleaseIPRequest)
491494
"containerID": r.K8SPodInfraContainerId,
492495
}).Info("release ip req")
493496

497+
_, exist := n.pendingPods.LoadOrStore(podInfoKey(r.K8SPodNamespace, r.K8SPodName), struct{}{})
498+
if exist {
499+
return nil, fmt.Errorf("pod %s resource processing", podInfoKey(r.K8SPodNamespace, r.K8SPodName))
500+
}
501+
defer func() {
502+
n.pendingPods.Delete(podInfoKey(r.K8SPodNamespace, r.K8SPodName))
503+
}()
504+
494505
n.RLock()
495506
defer n.RUnlock()
496507
var (
@@ -1336,7 +1347,7 @@ func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (r
13361347

13371348
var err error
13381349

1339-
globalConfig, err := types.GetConfigFromFileWithMerge(configFilePath, nil)
1350+
globalConfig, err := daemon.GetConfigFromFileWithMerge(configFilePath, nil)
13401351
if err != nil {
13411352
return nil, err
13421353
}
@@ -1353,7 +1364,7 @@ func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (r
13531364
dynamicCfg = ""
13541365
}
13551366

1356-
config, err := types.GetConfigFromFileWithMerge(configFilePath, []byte(dynamicCfg))
1367+
config, err := daemon.GetConfigFromFileWithMerge(configFilePath, []byte(dynamicCfg))
13571368
if err != nil {
13581369
return nil, fmt.Errorf("failed parse config: %v", err)
13591370
}
@@ -1440,10 +1451,6 @@ func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (r
14401451
}
14411452
serviceLog.Infof("init pool config: %+v", poolConfig)
14421453

1443-
err = restoreLocalENIRes(ecs, netSrv.k8s, netSrv.resourceDB)
1444-
if err != nil {
1445-
return nil, errors.Wrapf(err, "error restore local eni resources")
1446-
}
14471454
localResource := make(map[string]map[string]resourceManagerInitItem)
14481455
resObjList, err := netSrv.resourceDB.List()
14491456
if err != nil {
@@ -1537,53 +1544,8 @@ func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (r
15371544
return netSrv, nil
15381545
}
15391546

1540-
// restore local eni resources for old terway migration
1541-
func restoreLocalENIRes(ecs ipam.API, k8s Kubernetes, resourceDB storage.Storage) error {
1542-
resList, err := resourceDB.List()
1543-
if err != nil {
1544-
return errors.Wrapf(err, "error list resourceDB storage")
1545-
}
1546-
if len(resList) != 0 {
1547-
serviceLog.Debugf("skip restore for upgraded")
1548-
return nil
1549-
}
1550-
1551-
eniList, err := ecs.GetAttachedENIs(context.Background(), false)
1552-
if err != nil {
1553-
return errors.Wrapf(err, "error get attached eni for restore")
1554-
}
1555-
ipEniMap := map[string]*types.ENI{}
1556-
for _, eni := range eniList {
1557-
ipEniMap[eni.PrimaryIP.IPv4.String()] = eni
1558-
}
1559-
1560-
podList, err := k8s.GetLocalPods()
1561-
if err != nil {
1562-
return errors.Wrapf(err, "error get local pod for restore")
1563-
}
1564-
for _, pod := range podList {
1565-
if pod.PodNetworkType != podNetworkTypeVPCENI {
1566-
continue
1567-
}
1568-
serviceLog.Debugf("restore for local pod: %+v, enis: %+v", pod, ipEniMap)
1569-
eni, ok := ipEniMap[pod.PodIPs.IPv4.String()]
1570-
if ok {
1571-
err = resourceDB.Put(podInfoKey(pod.Namespace, pod.Name), types.PodResources{
1572-
PodInfo: pod,
1573-
Resources: eni.ToResItems(),
1574-
})
1575-
if err != nil {
1576-
return errors.Wrapf(err, "error put resource into store")
1577-
}
1578-
} else {
1579-
serviceLog.Warnf("error found pod relate eni, pod: %+v", pod)
1580-
}
1581-
}
1582-
return nil
1583-
}
1584-
15851547
// setup default value
1586-
func setDefault(cfg *types.Configure) error {
1548+
func setDefault(cfg *daemon.Config) error {
15871549
if cfg.EniCapRatio == 0 {
15881550
cfg.EniCapRatio = 1
15891551
}
@@ -1600,7 +1562,7 @@ func setDefault(cfg *types.Configure) error {
16001562
return nil
16011563
}
16021564

1603-
func validateConfig(cfg *types.Configure) error {
1565+
func validateConfig(cfg *daemon.Config) error {
16041566
switch cfg.IPStack {
16051567
case "", string(types.IPStackIPv4), string(types.IPStackDual):
16061568
default:
@@ -1610,7 +1572,7 @@ func validateConfig(cfg *types.Configure) error {
16101572
return nil
16111573
}
16121574

1613-
func getPoolConfig(cfg *types.Configure, ipamType types.IPAMType) (*types.PoolConfig, error) {
1575+
func getPoolConfig(cfg *daemon.Config, ipamType types.IPAMType) (*types.PoolConfig, error) {
16141576
poolConfig := &types.PoolConfig{
16151577
MaxPoolSize: cfg.MaxPoolSize,
16161578
MinPoolSize: cfg.MinPoolSize,

daemon/eni-multi-ip.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -781,7 +781,7 @@ func (f *eniIPFactory) checkAccount(message chan<- string) {
781781
// get ENIs via Aliyun API
782782
message <- "fetching attached ENIs from aliyun\n"
783783
ctx := context.Background()
784-
enis, err := f.eniFactory.ecs.GetAttachedENIs(ctx, false)
784+
enis, err := f.eniFactory.ecs.GetAttachedENIs(ctx, false, f.trunkOnEni)
785785
if err != nil {
786786
message <- fmt.Sprintf("error while fetching from remote: %s\n", err.Error())
787787
return
@@ -943,7 +943,7 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub
943943
Initializer: func(holder pool.ResourceHolder) error {
944944
ctx := context.Background()
945945
// not use main ENI for ENI multiple ip allocate
946-
enis, err := ecs.GetAttachedENIs(ctx, false)
946+
enis, err := ecs.GetAttachedENIs(ctx, false, factory.trunkOnEni)
947947
if err != nil {
948948
return fmt.Errorf("error get attach ENI on pool init, %w", err)
949949
}

daemon/eni.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func newENIResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, allocated
106106
Factory: factory,
107107
Initializer: func(holder pool.ResourceHolder) error {
108108
ctx := context.Background()
109-
enis, err := ecs.GetAttachedENIs(ctx, false)
109+
enis, err := ecs.GetAttachedENIs(ctx, false, factory.trunkOnEni)
110110
if err != nil {
111111
return fmt.Errorf("error get attach ENI on pool init, %w", err)
112112
}
@@ -425,7 +425,7 @@ func (f *eniFactory) Check(res types.NetworkResource) error {
425425
}
426426

427427
func (f *eniFactory) ListResource() (map[string]types.NetworkResource, error) {
428-
enis, err := f.ecs.GetAttachedENIs(context.Background(), false)
428+
enis, err := f.ecs.GetAttachedENIs(context.Background(), false, f.trunkOnEni)
429429
if err != nil {
430430
return nil, err
431431
}

daemon/k8s.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/AliyunContainerService/terway/pkg/utils"
2222
"github.com/AliyunContainerService/terway/pkg/version"
2323
"github.com/AliyunContainerService/terway/types"
24+
"github.com/AliyunContainerService/terway/types/daemon"
2425

2526
"github.com/pkg/errors"
2627
log "github.com/sirupsen/logrus"
@@ -269,7 +270,7 @@ func (k *k8s) WaitTrunkReady() (string, error) {
269270
}
270271

271272
// newK8S return Kubernetes service by pod spec and daemon mode
272-
func newK8S(master, kubeconfig string, daemonMode string, globalConfig *types.Configure) (Kubernetes, error) {
273+
func newK8S(master, kubeconfig string, daemonMode string, globalConfig *daemon.Config) (Kubernetes, error) {
273274

274275
k8sRestConfig, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
275276
if err != nil {

pkg/aliyun/aliyun.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (e *Impl) destroyInterface(ctx context.Context, eniID, instanceID, trunkENI
177177

178178
// GetAttachedENIs of instanceId
179179
// containsMainENI is contains the main interface(eth0) of instance
180-
func (e *Impl) GetAttachedENIs(ctx context.Context, containsMainENI bool) ([]*types.ENI, error) {
180+
func (e *Impl) GetAttachedENIs(ctx context.Context, containsMainENI bool, trunkENIID string) ([]*types.ENI, error) {
181181
enis, err := e.metadata.GetENIs(containsMainENI)
182182
if err != nil {
183183
return nil, fmt.Errorf("error get eni config by mac, %w", err)
@@ -188,14 +188,20 @@ func (e *Impl) GetAttachedENIs(ctx context.Context, containsMainENI bool) ([]*ty
188188
for _, eni := range enis {
189189
eniIDs = append(eniIDs, eni.ID)
190190
enisMap[eni.ID] = eni
191+
192+
if trunkENIID == eni.ID {
193+
eni.Trunk = true
194+
}
191195
}
192196
if e.eniTypeAttr && len(eniIDs) > 0 {
193-
eniSet, err := e.DescribeNetworkInterface(ctx, "", eniIDs, "", "", "", nil)
194-
if err != nil {
195-
return nil, err
196-
}
197-
for _, eni := range eniSet {
198-
enisMap[eni.NetworkInterfaceID].Trunk = eni.Type == client.ENITypeTrunk
197+
if trunkENIID == "" {
198+
eniSet, err := e.DescribeNetworkInterface(ctx, "", eniIDs, "", "", "", nil)
199+
if err != nil {
200+
return nil, err
201+
}
202+
for _, eni := range eniSet {
203+
enisMap[eni.NetworkInterfaceID].Trunk = eni.Type == client.ENITypeTrunk
204+
}
199205
}
200206
}
201207
return enis, nil

pkg/aliyun/client/ecs.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -384,22 +384,40 @@ func (a *OpenAPI) UnAssignIpv6Addresses(ctx context.Context, eniID string, ips [
384384
}
385385

386386
func (a *OpenAPI) DescribeInstanceTypes(ctx context.Context, types []string) ([]ecs.InstanceType, error) {
387-
req := ecs.CreateDescribeInstanceTypesRequest()
388-
if types != nil {
389-
req.InstanceTypes = &types
390-
}
391-
start := time.Now()
392-
resp, err := a.ClientSet.ECS().DescribeInstanceTypes(req)
393-
metric.OpenAPILatency.WithLabelValues("DescribeInstanceTypes", fmt.Sprint(err != nil)).Observe(metric.MsSince(start))
387+
var result []ecs.InstanceType
394388

395-
l := log.WithFields(map[string]interface{}{
396-
LogFieldAPI: "DescribeInstanceTypes",
397-
})
398-
if err != nil {
399-
l.WithField(LogFieldRequestID, apiErr.ErrRequestID(err)).Error(err)
400-
return nil, err
389+
nextToken := ""
390+
for {
391+
req := ecs.CreateDescribeInstanceTypesRequest()
392+
req.NextToken = nextToken
393+
// nb(l1b0k): see https://help.aliyun.com/practice_detail/461278.
394+
req.MaxResults = requests.NewInteger(100)
395+
if types != nil {
396+
req.InstanceTypes = &types
397+
}
398+
start := time.Now()
399+
resp, err := a.ClientSet.ECS().DescribeInstanceTypes(req)
400+
metric.OpenAPILatency.WithLabelValues("DescribeInstanceTypes", fmt.Sprint(err != nil)).Observe(metric.MsSince(start))
401+
402+
l := log.WithFields(map[string]interface{}{
403+
LogFieldAPI: "DescribeInstanceTypes",
404+
})
405+
if err != nil {
406+
l.WithField(LogFieldRequestID, apiErr.ErrRequestID(err)).Error(err)
407+
return nil, err
408+
}
409+
410+
for _, r := range resp.InstanceTypes.InstanceType {
411+
result = append(result, r)
412+
}
413+
414+
if resp.NextToken == "" {
415+
break
416+
}
417+
nextToken = resp.NextToken
401418
}
402-
return resp.InstanceTypes.InstanceType, nil
419+
420+
return result, nil
403421
}
404422

405423
func (a *OpenAPI) ModifyNetworkInterfaceAttribute(ctx context.Context, eniID string, securityGroupIDs []string) error {

0 commit comments

Comments
 (0)