diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 79b34d8f7..fa3473d79 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -4,6 +4,7 @@ on: push: branches: - "**" + pull_request: concurrency: group: ${{ github.workflow }}-${{ github.ref }} diff --git a/go.mod b/go.mod index b6d0cf50a..754a32884 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( github.com/yargevad/filepathx v1.0.0 github.com/zalando/go-keyring v0.2.4 golang.org/x/oauth2 v0.23.0 + golang.org/x/sync v0.11.0 k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 k8s.io/client-go v1.5.2 @@ -197,7 +198,6 @@ require ( golang.org/x/crypto v0.32.0 // indirect golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect golang.org/x/net v0.34.0 // indirect - golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/term v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect diff --git a/go.sum b/go.sum index e308aef30..2279e56ae 100644 --- a/go.sum +++ b/go.sum @@ -971,8 +971,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/internal/aws/aws.go b/internal/aws/aws.go index b2ce8b15b..20dc172e5 100644 --- a/internal/aws/aws.go +++ b/internal/aws/aws.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" @@ -168,60 +170,36 @@ func getFilteredLambdaFuncs(client *lambda.Client, nextMarker *string, allFuncti // GetLambdaPackageData returns a digest and metadata of a Lambda function package func (staticCreds *AWSStaticCreds) GetLambdaPackageData(filter *filters.ResourceFilterOptions) ([]*LambdaData, error) { - lambdaData := []*LambdaData{} client, err := staticCreds.NewLambdaClient() if err != nil { - return lambdaData, err + return nil, err } filteredFunctions, err := getFilteredLambdaFuncs(client, nil, &[]types.FunctionConfiguration{}, filter) if err != nil { - return lambdaData, err + return nil, err } - var ( - wg sync.WaitGroup - mutex = &sync.Mutex{} - ) - - // run concurrently - errs := make(chan error, 1) // Buffered only for the first error - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // Make sure it's called to release resources even if no errors + lambdaData := make([]*LambdaData, 0, len(*filteredFunctions)) + mutex := new(sync.Mutex) + g, _ := errgroup.WithContext(context.Background()) for _, function := range *filteredFunctions { - wg.Add(1) - go func(functionName string) { - defer wg.Done() - // Check if any error occurred in any other gorouties: - select { - case <-ctx.Done(): - return // Error somewhere, terminate - default: // Default is a must to avoid blocking - } - oneLambdaData, err := getAndProcessOneLambdaFunc(client, functionName) + g.Go(func() error { + oneLambdaData, err := getAndProcessOneLambdaFunc(client, *function.FunctionName) if err != nil { - // Non-blocking send of error - select { - case errs <- err: - default: - } - cancel() // send cancel signal to goroutines - return + return err } mutex.Lock() lambdaData = append(lambdaData, oneLambdaData) mutex.Unlock() - - }(*function.FunctionName) - + return nil + }) } - wg.Wait() - // Return (first) error, if any: - if ctx.Err() != nil { - return lambdaData, <-errs + if err := g.Wait(); err != nil { + return lambdaData, err } return lambdaData, nil @@ -463,61 +441,36 @@ func getFilteredECSClusters(client *ecs.Client, nextToken *string, allClusters * // GetEcsTasksData returns a list of tasks data for an ECS cluster or service func (staticCreds *AWSStaticCreds) GetEcsTasksData(filter *filters.ResourceFilterOptions) ([]*EcsTaskData, error) { - allTasksData := []*EcsTaskData{} client, err := staticCreds.NewECSClient() if err != nil { - return allTasksData, err + return nil, err } filteredClusters, err := getFilteredECSClusters(client, nil, &[]ecsTypes.Cluster{}, filter) if err != nil { - return allTasksData, err + return nil, err } - var ( - wg sync.WaitGroup - mutex = &sync.Mutex{} - ) - - // run concurrently - errs := make(chan error, 1) // Buffered only for the first error - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // Make sure it's called to release resources even if no errors + allTasksData := make([]*EcsTaskData, 0, len(*filteredClusters)) + mutex := new(sync.Mutex) + g, _ := errgroup.WithContext(context.Background()) for _, cluster := range *filteredClusters { - wg.Add(1) - go func(cluster string) { - defer wg.Done() - // Check if any error occurred in any other gorouties: - select { - case <-ctx.Done(): - return // Error somewhere, terminate - default: // Default is must to avoid blocking - } - - tasksData, err := getTasksDataInCluster(client, cluster) + g.Go(func() error { + tasksData, err := getTasksDataInCluster(client, *cluster.ClusterName) if err != nil { - // Non-blocking send of error - select { - case errs <- err: - default: - } - cancel() // send cancel signal to goroutines - return + return err } mutex.Lock() allTasksData = append(allTasksData, tasksData...) mutex.Unlock() - - }(*cluster.ClusterName) + return nil + }) } - wg.Wait() - // Return (first) error, if any: - if ctx.Err() != nil { - return allTasksData, <-errs + if err := g.Wait(); err != nil { + return nil, err } - return allTasksData, nil } diff --git a/internal/azure/azure_apps.go b/internal/azure/azure_apps.go index 8189c9268..b3799dff4 100644 --- a/internal/azure/azure_apps.go +++ b/internal/azure/azure_apps.go @@ -15,6 +15,8 @@ import ( "strings" "sync" + "golang.org/x/sync/errgroup" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/containers/azcontainerregistry" @@ -39,7 +41,7 @@ type AzureClient struct { AppServiceFactory *armappservice.ClientFactory } -// AppData represents the harvested Azure service app and function app data +// AppData is the harvested Azure service app and function app data type AppData struct { AppName string `json:"app_name"` AppKind string `json:"app_kind"` @@ -48,12 +50,12 @@ type AppData struct { StartedAt int64 `json:"creationTimestamp"` } -// AzureAppsRequest represents the PUT request body to be sent to Kosli from CLI +// AzureAppsRequest is request body to be sent to Kosli server from CLI type AzureAppsRequest struct { Artifacts []*AppData `json:"artifacts"` } -func (staticCreds *AzureStaticCredentials) GetAzureAppsData(logger *logger.Logger) (appsData []*AppData, err error) { +func (staticCreds *AzureStaticCredentials) GetAzureAppsData(logger *logger.Logger) ([]*AppData, error) { azureClient, err := staticCreds.NewAzureClient() if err != nil { return nil, err @@ -71,65 +73,40 @@ func (staticCreds *AzureStaticCredentials) GetAzureAppsData(logger *logger.Logge } // run concurrently - var wg sync.WaitGroup - errs := make(chan error, 1) // Buffered only for the first error - appsChan := make(chan *AppData, len(appsInfo)) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // Make sure it's called to release resources even if no errors + mutex := new(sync.Mutex) + appsData := make([]*AppData, 0, len(appsInfo)) + g, _ := errgroup.WithContext(context.Background()) for _, app := range appsInfo { - wg.Add(1) - go func(app *armappservice.Site) { - defer wg.Done() - - select { - case <-ctx.Done(): - return // Error somewhere, terminate - default: // Default is a must to avoid blocking - } - - if strings.ToLower(*app.Properties.State) != "running" { - logger.Debug("app %s is not running, skipping from report", *app.Name) - return - } + if strings.ToLower(*app.Properties.State) != "running" { + logger.Debug("app %s is not running, skipping from report", *app.Name) + continue + } + g.Go(func() error { data, err := azureClient.NewAppData(app, logger) if err != nil { - select { - case errs <- err: - default: - } - cancel() // send cancel signal to goroutines - return + return err } if !data.IsEmpty() { - appsChan <- &data + mutex.Lock() + appsData = append(appsData, &data) + mutex.Unlock() } - }(app) + return nil + }) } - wg.Wait() - close(appsChan) - - // Return (first) error, if any: - if ctx.Err() != nil { - return appsData, <-errs - } - - for app := range appsChan { - appsData = append(appsData, app) + if err := g.Wait(); err != nil { + return nil, err } - if appsData == nil { - appsData = make([]*AppData, 0) - } return appsData, nil } +// NewAppData constructs and return AppData for the provided armappservice.Site func (azureClient *AzureClient) NewAppData(app *armappservice.Site, logger *logger.Logger) (AppData, error) { - // Construct and return AppData for the provided armappservice.Site - // get image name from "DOCKER|tookyregistry.azurecr.io/tookyregistry/tooky/sha256:cb29a6" linuxFxVersion := strings.Split(*app.Properties.SiteConfig.LinuxFxVersion, "|") notDocker := len(linuxFxVersion) != 2 || linuxFxVersion[0] != "DOCKER" @@ -387,9 +364,9 @@ func (azureClient *AzureClient) GetImageFingerprintFromRegistry(imageName string return fingerprint, nil } +// parseImageName parses the image name to extract the repository name and tag +// Example: tookyregistry.azurecr.io/tooky/sha256:latest func parseImageName(imageName string) (registryUrl, repoName, tag string) { - // Parse the image name to extract the repository name and tag - // Example: tookyregistry.azurecr.io/tooky/sha256:latest splitFullImageName := strings.SplitN(imageName, "/", 2) if len(splitFullImageName) != 2 { return "", "", "" @@ -398,7 +375,6 @@ func parseImageName(imageName string) (registryUrl, repoName, tag string) { registryUrl = fmt.Sprintf("https://%s", splitFullImageName[0]) if strings.Contains(splitFullImageName[1], "@sha256:") { - // Example: tookyregistry.azurecr.io/tooky@sha256:cb29a6..7 imageNameAndTag := strings.SplitN(splitFullImageName[1], "@", 2) repoName = imageNameAndTag[0] tag = imageNameAndTag[1] diff --git a/internal/kube/kube.go b/internal/kube/kube.go index 8441917e6..4ea7c8a2d 100644 --- a/internal/kube/kube.go +++ b/internal/kube/kube.go @@ -7,6 +7,7 @@ import ( "github.com/kosli-dev/cli/internal/filters" "github.com/kosli-dev/cli/internal/logger" + "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -84,7 +85,6 @@ func NewK8sClientSet(kubeconfigPath string) (*K8SConnection, error) { func (clientset *K8SConnection) GetPodsData(filter *filters.ResourceFilterOptions, logger *logger.Logger) ([]*PodData, error) { var ( podsData = []*PodData{} - wg sync.WaitGroup mutex = &sync.Mutex{} ) @@ -104,43 +104,23 @@ func (clientset *K8SConnection) GetPodsData(filter *filters.ResourceFilterOption logger.Info("scanning the following namespaces: %v ", filteredNamespaces) - // run concurrently - errs := make(chan error, 1) // Buffered only for the first error - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // Make sure it's called to release resources even if no errors + g, _ := errgroup.WithContext(context.Background()) for _, ns := range filteredNamespaces { - wg.Add(1) - go func(ns string) { - defer wg.Done() - // Check if any error occurred in any other gorouties: - select { - case <-ctx.Done(): - return // Error somewhere, terminate - default: // Default is must to avoid blocking - } - + g.Go(func() error { pods, err := clientset.getPodsInNamespace(ns) if err != nil { - // Non-blocking send of error - select { - case errs <- err: - default: - } - cancel() // send cancel signal to goroutines - return + return err } mutex.Lock() list.Items = append(list.Items, pods...) mutex.Unlock() - - }(ns) + return nil + }) } - wg.Wait() - // Return (first) error, if any: - if ctx.Err() != nil { - return podsData, <-errs + if err := g.Wait(); err != nil { + return nil, err } return processPods(list), nil @@ -178,63 +158,43 @@ func (clientset *K8SConnection) filterNamespaces(filter *filters.ResourceFilterO return filter.IncludeNames, nil } } - result := []string{} // get all namespaces in the cluster nsList, err := clientset.GetClusterNamespaces() if err != nil { - return result, err + return nil, err } + namespaces := []string{} + if len(filter.IncludeNames) == 0 && len(filter.IncludeNamesRegex) == 0 && len(filter.ExcludeNames) == 0 && len(filter.ExcludeNamesRegex) == 0 { for _, ns := range nsList { - result = append(result, ns.Name) + namespaces = append(namespaces, ns.Name) } - return result, nil + return namespaces, nil } - var ( - wg sync.WaitGroup - mutex = &sync.Mutex{} - ) - - errs := make(chan error, 1) // Buffered only for the first error - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // Make sure it's called to release resources even if no errors + mutex := new(sync.Mutex) + g, _ := errgroup.WithContext(context.Background()) for _, ns := range nsList { - wg.Add(1) - go func(ns string) { - defer wg.Done() - - // Check if any error occurred in any other gorouties: - select { - case <-ctx.Done(): - return // Error somewhere, terminate - default: // Default is must to avoid blocking - } - - include, err := filter.ShouldInclude(ns) + g.Go(func() error { + include, err := filter.ShouldInclude(ns.Name) if err != nil { - select { - case errs <- err: - default: - } - cancel() // send cancel signal to goroutines - return + return err } if include { mutex.Lock() - result = append(result, ns) + namespaces = append(namespaces, ns.Name) mutex.Unlock() } - }(ns.Name) + return nil + }) } - wg.Wait() - if ctx.Err() != nil { - return result, <-errs + if err := g.Wait(); err != nil { + return nil, err } - return result, nil + return namespaces, nil } // getPodsInNamespace get pods in a specific namespace in a cluster