Skip to content

Commit a56b461

Browse files
authored
fix:remove goroutine concurrency as it increased complexity without significantly improving collection speed (#70)
* fix: remove goroutine concurrency as it increased complexity without significantly improving collection speed * swas using new sdk * go mod tidy
1 parent 58f6d01 commit a56b461

File tree

57 files changed

+755
-1759
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+755
-1759
lines changed

collector/alicloud/collector/cloudapi/apigateway.go

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ package cloudapi
1717

1818
import (
1919
"context"
20-
"sync"
21-
2220
cloudapi20160714 "github.com/alibabacloud-go/cloudapi-20160714/v5/client"
2321
"github.com/alibabacloud-go/tea/tea"
2422
"github.com/cloudrec/alicloud/collector"
@@ -28,8 +26,6 @@ import (
2826
"go.uber.org/zap"
2927
)
3028

31-
const maxWorkers = 10
32-
3329
// GetAPIGatewayResource 返回API Gateway资源定义
3430
func GetAPIGatewayResource() schema.Resource {
3531
return schema.Resource{
@@ -62,30 +58,13 @@ func GetAPIGatewayDetail(ctx context.Context, service schema.ServiceInterface, r
6258
return err
6359
}
6460

65-
var wg sync.WaitGroup
66-
tasks := make(chan *cloudapi20160714.DescribeApisResponseBodyApiSummarysApiSummary, len(apis))
67-
68-
// 启动工作协程
69-
for i := 0; i < maxWorkers; i++ {
70-
wg.Add(1)
71-
go func() {
72-
defer wg.Done()
73-
for api := range tasks {
74-
detail := describeAPIDetail(ctx, client, api)
75-
if detail != nil {
76-
res <- detail
77-
}
78-
}
79-
}()
80-
}
81-
82-
// 添加任务
8361
for _, api := range apis {
84-
tasks <- api
62+
res <- &APIGatewayDetail{
63+
ApiSummary: api,
64+
ApiInfo: describeAPI(ctx, client, api),
65+
}
8566
}
86-
close(tasks)
8767

88-
wg.Wait()
8968
return nil
9069
}
9170

@@ -117,8 +96,7 @@ func listAPIs(ctx context.Context, c *cloudapi20160714.Client) ([]*cloudapi20160
11796
return apis, nil
11897
}
11998

120-
// describeAPIDetail 获取单个API详细信息
121-
func describeAPIDetail(ctx context.Context, c *cloudapi20160714.Client, api *cloudapi20160714.DescribeApisResponseBodyApiSummarysApiSummary) *APIGatewayDetail {
99+
func describeAPI(ctx context.Context, c *cloudapi20160714.Client, api *cloudapi20160714.DescribeApisResponseBodyApiSummarysApiSummary) *cloudapi20160714.DescribeApiResponseBody {
122100
req := &cloudapi20160714.DescribeApiRequest{}
123101
req.ApiId = api.ApiId
124102

@@ -128,8 +106,5 @@ func describeAPIDetail(ctx context.Context, c *cloudapi20160714.Client, api *clo
128106
return nil
129107
}
130108

131-
return &APIGatewayDetail{
132-
ApiSummary: api,
133-
ApiInfo: resp.Body,
134-
}
109+
return resp.Body
135110
}

collector/alicloud/collector/ons/instance.go

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ package ons
1717

1818
import (
1919
"context"
20-
"sync"
21-
2220
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
2321
"github.com/aliyun/alibaba-cloud-sdk-go/services/ons"
2422
"github.com/cloudrec/alicloud/collector"
@@ -28,8 +26,6 @@ import (
2826
"go.uber.org/zap"
2927
)
3028

31-
const maxWorkers = 10
32-
3329
// GetInstanceResource returns ONS Instance resource definition
3430
func GetInstanceResource() schema.Resource {
3531
return schema.Resource{
@@ -50,7 +46,7 @@ func GetInstanceResource() schema.Resource {
5046
// InstanceDetail aggregates resource details
5147
type InstanceDetail struct {
5248
Instance ons.InstanceVO
53-
InstanceBaseInfo ons.InstanceBaseInfo
49+
InstanceBaseInfo *ons.InstanceBaseInfo
5450
}
5551

5652
// GetInstanceDetail gets ONS Instance details
@@ -63,28 +59,13 @@ func GetInstanceDetail(ctx context.Context, service schema.ServiceInterface, res
6359
return err
6460
}
6561

66-
var wg sync.WaitGroup
67-
tasks := make(chan ons.InstanceVO, len(resources))
68-
69-
for i := 0; i < maxWorkers; i++ {
70-
wg.Add(1)
71-
go func() {
72-
defer wg.Done()
73-
for resource := range tasks {
74-
detail := describeInstanceDetail(ctx, client, resource)
75-
if detail != nil {
76-
res <- detail
77-
}
78-
}
79-
}()
80-
}
81-
8262
for _, resource := range resources {
83-
tasks <- resource
84-
}
85-
close(tasks)
8663

87-
wg.Wait()
64+
res <- &InstanceDetail{
65+
Instance: resource,
66+
InstanceBaseInfo: describeInstance(ctx, client, resource),
67+
}
68+
}
8869
return nil
8970
}
9071

@@ -106,8 +87,8 @@ func listInstances(ctx context.Context, c *ons.Client) ([]ons.InstanceVO, error)
10687
return resources, nil
10788
}
10889

109-
// describeInstanceDetail gets details for a single ONS Instance
110-
func describeInstanceDetail(ctx context.Context, c *ons.Client, resource ons.InstanceVO) *InstanceDetail {
90+
// describeInstance gets details for a single ONS Instance
91+
func describeInstance(ctx context.Context, c *ons.Client, resource ons.InstanceVO) *ons.InstanceBaseInfo {
11192
req := ons.CreateOnsInstanceBaseInfoRequest()
11293
req.InitWithApiInfo("Ons", "2019-02-14", "OnsInstanceBaseInfo", "ons", "openAPI")
11394
req.Method = requests.POST
@@ -116,13 +97,8 @@ func describeInstanceDetail(ctx context.Context, c *ons.Client, resource ons.Ins
11697
response, err := c.OnsInstanceBaseInfo(req)
11798
if err != nil {
11899
log.CtxLogger(ctx).Error("failed to get instance base info", zap.String("instanceId", resource.InstanceId), zap.Error(err))
119-
return &InstanceDetail{
120-
Instance: resource,
121-
}
100+
return nil
122101
}
123102

124-
return &InstanceDetail{
125-
Instance: resource,
126-
InstanceBaseInfo: response.InstanceBaseInfo,
127-
}
103+
return &response.InstanceBaseInfo
128104
}

collector/alicloud/collector/services.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import (
6060
selectdb20230522 "github.com/alibabacloud-go/selectdb-20230522/v3/client"
6161
slb20140515 "github.com/alibabacloud-go/slb-20140515/v4/client"
6262
sls20201230 "github.com/alibabacloud-go/sls-20201230/v6/client"
63+
swas_open20200601 "github.com/alibabacloud-go/swas-open-20200601/v3/client"
6364
tablestore20201209 "github.com/alibabacloud-go/tablestore-20201209/client"
6465
util "github.com/alibabacloud-go/tea-utils/v2/service"
6566
"github.com/alibabacloud-go/tea/tea"
@@ -84,7 +85,6 @@ import (
8485
"github.com/aliyun/alibaba-cloud-sdk-go/services/ons"
8586
"github.com/aliyun/alibaba-cloud-sdk-go/services/ram"
8687
"github.com/aliyun/alibaba-cloud-sdk-go/services/sgw"
87-
"github.com/aliyun/alibaba-cloud-sdk-go/services/swas-open"
8888
"github.com/aliyun/alibaba-cloud-sdk-go/services/vod"
8989
"github.com/aliyun/alibaba-cloud-sdk-go/services/vpc"
9090
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
@@ -185,7 +185,7 @@ type Services struct {
185185
ECP *eds_aic20230930.Client
186186
Eflo *eflo.Client
187187
EfloController *eflo_controller.Client
188-
SWAS *swas_open.Client
188+
SWAS *swas_open20200601.Client
189189
Ons *ons.Client
190190
GA *ga.Client
191191
DCDN *dcdn.Client
@@ -199,8 +199,6 @@ func (s *Services) Clone() schema.ServiceInterface {
199199
return &Services{}
200200
}
201201

202-
203-
204202
func (s *Services) InitServices(cloudAccountParam schema.CloudAccountParam) (err error) {
205203
param := cloudAccountParam.CommonCloudAccountParam
206204
s.CloudAccountId = cloudAccountParam.CloudAccountId
@@ -499,7 +497,7 @@ func (s *Services) InitServices(cloudAccountParam schema.CloudAccountParam) (err
499497
log.CtxLogger(ctx).Warn("init eci client failed", zap.Error(err))
500498
}
501499
case SWAS:
502-
s.SWAS, err = swas_open.NewClientWithAccessKey(param.Region, param.AK, param.SK)
500+
s.SWAS, err = createSWASClient(param.Region, s.Config)
503501
if err != nil {
504502
log.CtxLogger(ctx).Warn("init swas client failed", zap.Error(err))
505503
}
@@ -553,6 +551,12 @@ func (s *Services) InitServices(cloudAccountParam schema.CloudAccountParam) (err
553551
return nil
554552
}
555553

554+
func createSWASClient(region string, config *openapi.Config) (client *swas_open20200601.Client, err error) {
555+
config.Endpoint = tea.String("swas." + region + ".aliyuncs.com")
556+
client, err = swas_open20200601.NewClient(config)
557+
return client, err
558+
}
559+
556560
func createVPCClient(region string, config *openapi.Config) (client *vpc.Client, err error) {
557561
client, err = vpc.NewClientWithAccessKey(region, *config.AccessKeyId, *config.AccessKeySecret)
558562
return client, err

collector/alicloud/collector/swas/swas.go

Lines changed: 44 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,15 @@ package swas
1717

1818
import (
1919
"context"
20-
"sync"
21-
22-
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
23-
swas "github.com/aliyun/alibaba-cloud-sdk-go/services/swas-open"
20+
swas_open20200601 "github.com/alibabacloud-go/swas-open-20200601/v3/client"
21+
"github.com/alibabacloud-go/tea/tea"
2422
"github.com/cloudrec/alicloud/collector"
2523
"github.com/core-sdk/constant"
2624
"github.com/core-sdk/log"
2725
"github.com/core-sdk/schema"
2826
"go.uber.org/zap"
2927
)
3028

31-
const maxWorkers = 10
32-
3329
// GetInstanceResource returns SWAS instance resource definition
3430
func GetInstanceResource() schema.Resource {
3531
return schema.Resource{
@@ -48,84 +44,78 @@ func GetInstanceResource() schema.Resource {
4844

4945
// Detail aggregates SWAS instance and firewall rule information
5046
type Detail struct {
51-
Instance *swas.Instance
52-
FirewallRules []swas.FirewallRule
47+
Instance *swas_open20200601.ListInstancesResponseBodyInstances
48+
FirewallRules []*swas_open20200601.ListFirewallRulesResponseBodyFirewallRules
5349
}
5450

5551
// ListInstancesResource gets SWAS instance details
5652
func ListInstancesResource(ctx context.Context, service schema.ServiceInterface, res chan<- any) error {
5753
cli := service.(*collector.Services).SWAS
58-
req := swas.CreateListInstancesRequest()
59-
req.PageSize = requests.NewInteger(50)
60-
req.PageNumber = requests.NewInteger(1)
6154

62-
count := 0
63-
for {
64-
resp, err := cli.ListInstances(req)
65-
if err != nil {
66-
log.CtxLogger(ctx).Warn("ListInstances error", zap.Error(err))
67-
return err
68-
}
55+
instances, err := listInstances(ctx, cli)
56+
if err != nil {
57+
log.CtxLogger(ctx).Warn("ListInstances error", zap.Error(err))
58+
return err
59+
}
6960

70-
var wg sync.WaitGroup
71-
tasks := make(chan swas.Instance, len(resp.Instances))
72-
73-
// 启动工作协程
74-
for i := 0; i < maxWorkers; i++ {
75-
wg.Add(1)
76-
go func() {
77-
defer wg.Done()
78-
for instance := range tasks {
79-
d := &Detail{
80-
Instance: &instance,
81-
FirewallRules: listFirewallRules(ctx, cli, instance.InstanceId),
82-
}
83-
84-
res <- d
85-
}
86-
}()
61+
for _, instance := range instances {
62+
firewallRules := listFirewallRules(ctx, cli, instance.InstanceId)
63+
res <- &Detail{
64+
Instance: instance,
65+
FirewallRules: firewallRules,
8766
}
67+
}
68+
69+
return nil
70+
}
8871

89-
// 添加任务
90-
for _, instance := range resp.Instances {
91-
tasks <- instance
72+
// listInstances lists all swas instances in a region.
73+
func listInstances(ctx context.Context, cli *swas_open20200601.Client) (instances []*swas_open20200601.ListInstancesResponseBodyInstances, err error) {
74+
request := &swas_open20200601.ListInstancesRequest{
75+
PageNumber: tea.Int32(1),
76+
PageSize: tea.Int32(50),
77+
}
78+
79+
count := 0
80+
for {
81+
resp, err := cli.ListInstances(request)
82+
if err != nil {
83+
return nil, err
9284
}
93-
close(tasks)
9485

95-
wg.Wait()
86+
instances = append(instances, resp.Body.Instances...)
9687

97-
count += len(resp.Instances)
98-
if count >= resp.TotalCount || len(resp.Instances) == 0 {
88+
count += len(resp.Body.Instances)
89+
if count >= int(*resp.Body.TotalCount) || len(resp.Body.Instances) == 0 {
9990
break
10091
}
101-
req.PageNumber = requests.NewInteger(resp.PageNumber + 1)
92+
request.PageNumber = tea.Int32(*request.PageNumber + 1)
10293
}
10394

104-
return nil
95+
return instances, nil
10596
}
10697

10798
// listFirewallRules gets firewall rules for a specific instance
108-
func listFirewallRules(ctx context.Context, client *swas.Client, instanceId string) (firewallRules []swas.FirewallRule) {
109-
req := swas.CreateListFirewallRulesRequest()
110-
req.InstanceId = instanceId
111-
req.PageSize = requests.NewInteger(constant.DefaultPageSize)
112-
req.PageNumber = requests.NewInteger(1)
99+
func listFirewallRules(ctx context.Context, client *swas_open20200601.Client, instanceId *string) (firewallRules []*swas_open20200601.ListFirewallRulesResponseBodyFirewallRules) {
113100

114101
count := 0
115102
for {
116-
resp, err := client.ListFirewallRules(req)
103+
request := &swas_open20200601.ListFirewallRulesRequest{
104+
InstanceId: instanceId,
105+
}
106+
resp, err := client.ListFirewallRules(request)
117107
if err != nil {
118108
log.CtxLogger(ctx).Warn("ListFirewallRules error", zap.Error(err))
119109
break
120110
}
121111

122-
firewallRules = append(firewallRules, resp.FirewallRules...)
123-
count += len(resp.FirewallRules)
112+
firewallRules = append(firewallRules, resp.Body.FirewallRules...)
113+
count += len(resp.Body.FirewallRules)
124114

125-
if count >= resp.TotalCount || len(resp.FirewallRules) < constant.DefaultPageSize {
115+
if count >= int(*resp.Body.TotalCount) || len(resp.Body.FirewallRules) == 0 {
126116
break
127117
}
128-
req.PageNumber = requests.NewInteger(resp.PageNumber + 1)
118+
request.PageNumber = tea.Int32(*request.PageNumber + 1)
129119
}
130120

131121
return firewallRules

0 commit comments

Comments
 (0)