Skip to content

Commit b00a118

Browse files
committed
Refactor goroutines in aws package to simplify code
1 parent a84271b commit b00a118

File tree

2 files changed

+27
-74
lines changed

2 files changed

+27
-74
lines changed

internal/aws/aws.go

Lines changed: 26 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"sync"
1212
"time"
1313

14+
"golang.org/x/sync/errgroup"
15+
1416
"github.com/aws/aws-sdk-go-v2/aws"
1517
"github.com/aws/aws-sdk-go-v2/config"
1618
"github.com/aws/aws-sdk-go-v2/credentials"
@@ -168,60 +170,36 @@ func getFilteredLambdaFuncs(client *lambda.Client, nextMarker *string, allFuncti
168170

169171
// GetLambdaPackageData returns a digest and metadata of a Lambda function package
170172
func (staticCreds *AWSStaticCreds) GetLambdaPackageData(filter *filters.ResourceFilterOptions) ([]*LambdaData, error) {
171-
lambdaData := []*LambdaData{}
172173
client, err := staticCreds.NewLambdaClient()
173174
if err != nil {
174-
return lambdaData, err
175+
return nil, err
175176
}
176177

177178
filteredFunctions, err := getFilteredLambdaFuncs(client, nil, &[]types.FunctionConfiguration{}, filter)
178179
if err != nil {
179-
return lambdaData, err
180+
return nil, err
180181
}
181182

182-
var (
183-
wg sync.WaitGroup
184-
mutex = &sync.Mutex{}
185-
)
186-
187-
// run concurrently
188-
errs := make(chan error, 1) // Buffered only for the first error
189-
ctx, cancel := context.WithCancel(context.Background())
190-
defer cancel() // Make sure it's called to release resources even if no errors
183+
lambdaData := make([]*LambdaData, 0, len(*filteredFunctions))
184+
mutex := new(sync.Mutex)
185+
g, _ := errgroup.WithContext(context.Background())
191186

192187
for _, function := range *filteredFunctions {
193-
wg.Add(1)
194-
go func(functionName string) {
195-
defer wg.Done()
196-
// Check if any error occurred in any other gorouties:
197-
select {
198-
case <-ctx.Done():
199-
return // Error somewhere, terminate
200-
default: // Default is a must to avoid blocking
201-
}
202-
oneLambdaData, err := getAndProcessOneLambdaFunc(client, functionName)
188+
g.Go(func() error {
189+
oneLambdaData, err := getAndProcessOneLambdaFunc(client, *function.FunctionName)
203190
if err != nil {
204-
// Non-blocking send of error
205-
select {
206-
case errs <- err:
207-
default:
208-
}
209-
cancel() // send cancel signal to goroutines
210-
return
191+
return err
211192
}
212193

213194
mutex.Lock()
214195
lambdaData = append(lambdaData, oneLambdaData)
215196
mutex.Unlock()
216-
217-
}(*function.FunctionName)
218-
197+
return nil
198+
})
219199
}
220200

221-
wg.Wait()
222-
// Return (first) error, if any:
223-
if ctx.Err() != nil {
224-
return lambdaData, <-errs
201+
if err := g.Wait(); err != nil {
202+
return lambdaData, err
225203
}
226204

227205
return lambdaData, nil
@@ -463,61 +441,36 @@ func getFilteredECSClusters(client *ecs.Client, nextToken *string, allClusters *
463441

464442
// GetEcsTasksData returns a list of tasks data for an ECS cluster or service
465443
func (staticCreds *AWSStaticCreds) GetEcsTasksData(filter *filters.ResourceFilterOptions) ([]*EcsTaskData, error) {
466-
allTasksData := []*EcsTaskData{}
467444
client, err := staticCreds.NewECSClient()
468445
if err != nil {
469-
return allTasksData, err
446+
return nil, err
470447
}
471448

472449
filteredClusters, err := getFilteredECSClusters(client, nil, &[]ecsTypes.Cluster{}, filter)
473450
if err != nil {
474-
return allTasksData, err
451+
return nil, err
475452
}
476453

477-
var (
478-
wg sync.WaitGroup
479-
mutex = &sync.Mutex{}
480-
)
481-
482-
// run concurrently
483-
errs := make(chan error, 1) // Buffered only for the first error
484-
ctx, cancel := context.WithCancel(context.Background())
485-
defer cancel() // Make sure it's called to release resources even if no errors
454+
allTasksData := make([]*EcsTaskData, 0, len(*filteredClusters))
455+
mutex := new(sync.Mutex)
456+
g, _ := errgroup.WithContext(context.Background())
486457

487458
for _, cluster := range *filteredClusters {
488-
wg.Add(1)
489-
go func(cluster string) {
490-
defer wg.Done()
491-
// Check if any error occurred in any other gorouties:
492-
select {
493-
case <-ctx.Done():
494-
return // Error somewhere, terminate
495-
default: // Default is must to avoid blocking
496-
}
497-
498-
tasksData, err := getTasksDataInCluster(client, cluster)
459+
g.Go(func() error {
460+
tasksData, err := getTasksDataInCluster(client, *cluster.ClusterName)
499461
if err != nil {
500-
// Non-blocking send of error
501-
select {
502-
case errs <- err:
503-
default:
504-
}
505-
cancel() // send cancel signal to goroutines
506-
return
462+
return err
507463
}
508464
mutex.Lock()
509465
allTasksData = append(allTasksData, tasksData...)
510466
mutex.Unlock()
511-
512-
}(*cluster.ClusterName)
467+
return nil
468+
})
513469
}
514470

515-
wg.Wait()
516-
// Return (first) error, if any:
517-
if ctx.Err() != nil {
518-
return allTasksData, <-errs
471+
if err := g.Wait(); err != nil {
472+
return nil, err
519473
}
520-
521474
return allTasksData, nil
522475
}
523476

internal/azure/azure_apps.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (staticCreds *AzureStaticCredentials) GetAzureAppsData(logger *logger.Logge
9999
}
100100

101101
if err := g.Wait(); err != nil {
102-
return appsData, err
102+
return nil, err
103103
}
104104

105105
return appsData, nil

0 commit comments

Comments
 (0)