Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- "**"
pull_request:

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
github.com/yargevad/filepathx v1.0.0
github.com/zalando/go-keyring v0.2.4
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.11.0
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/client-go v1.5.2
Expand Down Expand Up @@ -197,7 +198,6 @@ require (
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/term v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -971,8 +971,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
99 changes: 26 additions & 73 deletions internal/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
Expand Down Expand Up @@ -168,60 +170,36 @@ func getFilteredLambdaFuncs(client *lambda.Client, nextMarker *string, allFuncti

// GetLambdaPackageData returns a digest and metadata of a Lambda function package
func (staticCreds *AWSStaticCreds) GetLambdaPackageData(filter *filters.ResourceFilterOptions) ([]*LambdaData, error) {
lambdaData := []*LambdaData{}
client, err := staticCreds.NewLambdaClient()
if err != nil {
return lambdaData, err
return nil, err
}

filteredFunctions, err := getFilteredLambdaFuncs(client, nil, &[]types.FunctionConfiguration{}, filter)
if err != nil {
return lambdaData, err
return nil, err
}

var (
wg sync.WaitGroup
mutex = &sync.Mutex{}
)

// run concurrently
errs := make(chan error, 1) // Buffered only for the first error
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors
lambdaData := make([]*LambdaData, 0, len(*filteredFunctions))
mutex := new(sync.Mutex)
g, _ := errgroup.WithContext(context.Background())

for _, function := range *filteredFunctions {
wg.Add(1)
go func(functionName string) {
defer wg.Done()
// Check if any error occurred in any other gorouties:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is a must to avoid blocking
}
oneLambdaData, err := getAndProcessOneLambdaFunc(client, functionName)
g.Go(func() error {
oneLambdaData, err := getAndProcessOneLambdaFunc(client, *function.FunctionName)
if err != nil {
// Non-blocking send of error
select {
case errs <- err:
default:
}
cancel() // send cancel signal to goroutines
return
return err
}

mutex.Lock()
lambdaData = append(lambdaData, oneLambdaData)
mutex.Unlock()

}(*function.FunctionName)

return nil
})
}

wg.Wait()
// Return (first) error, if any:
if ctx.Err() != nil {
return lambdaData, <-errs
if err := g.Wait(); err != nil {
return lambdaData, err
}

return lambdaData, nil
Expand Down Expand Up @@ -463,61 +441,36 @@ func getFilteredECSClusters(client *ecs.Client, nextToken *string, allClusters *

// GetEcsTasksData returns a list of tasks data for an ECS cluster or service
func (staticCreds *AWSStaticCreds) GetEcsTasksData(filter *filters.ResourceFilterOptions) ([]*EcsTaskData, error) {
allTasksData := []*EcsTaskData{}
client, err := staticCreds.NewECSClient()
if err != nil {
return allTasksData, err
return nil, err
}

filteredClusters, err := getFilteredECSClusters(client, nil, &[]ecsTypes.Cluster{}, filter)
if err != nil {
return allTasksData, err
return nil, err
}

var (
wg sync.WaitGroup
mutex = &sync.Mutex{}
)

// run concurrently
errs := make(chan error, 1) // Buffered only for the first error
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors
allTasksData := make([]*EcsTaskData, 0, len(*filteredClusters))
mutex := new(sync.Mutex)
g, _ := errgroup.WithContext(context.Background())

for _, cluster := range *filteredClusters {
wg.Add(1)
go func(cluster string) {
defer wg.Done()
// Check if any error occurred in any other gorouties:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}

tasksData, err := getTasksDataInCluster(client, cluster)
g.Go(func() error {
tasksData, err := getTasksDataInCluster(client, *cluster.ClusterName)
if err != nil {
// Non-blocking send of error
select {
case errs <- err:
default:
}
cancel() // send cancel signal to goroutines
return
return err
}
mutex.Lock()
allTasksData = append(allTasksData, tasksData...)
mutex.Unlock()

}(*cluster.ClusterName)
return nil
})
}

wg.Wait()
// Return (first) error, if any:
if ctx.Err() != nil {
return allTasksData, <-errs
if err := g.Wait(); err != nil {
return nil, err
}

return allTasksData, nil
}

Expand Down
72 changes: 24 additions & 48 deletions internal/azure/azure_apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"strings"
"sync"

"golang.org/x/sync/errgroup"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/containers/azcontainerregistry"
Expand All @@ -39,7 +41,7 @@ type AzureClient struct {
AppServiceFactory *armappservice.ClientFactory
}

// AppData represents the harvested Azure service app and function app data
// AppData is the harvested Azure service app and function app data
type AppData struct {
AppName string `json:"app_name"`
AppKind string `json:"app_kind"`
Expand All @@ -48,12 +50,12 @@ type AppData struct {
StartedAt int64 `json:"creationTimestamp"`
}

// AzureAppsRequest represents the PUT request body to be sent to Kosli from CLI
// AzureAppsRequest is request body to be sent to Kosli server from CLI
type AzureAppsRequest struct {
Artifacts []*AppData `json:"artifacts"`
}

func (staticCreds *AzureStaticCredentials) GetAzureAppsData(logger *logger.Logger) (appsData []*AppData, err error) {
func (staticCreds *AzureStaticCredentials) GetAzureAppsData(logger *logger.Logger) ([]*AppData, error) {
azureClient, err := staticCreds.NewAzureClient()
if err != nil {
return nil, err
Expand All @@ -71,65 +73,40 @@ func (staticCreds *AzureStaticCredentials) GetAzureAppsData(logger *logger.Logge
}

// run concurrently
var wg sync.WaitGroup
errs := make(chan error, 1) // Buffered only for the first error
appsChan := make(chan *AppData, len(appsInfo))
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors
mutex := new(sync.Mutex)
appsData := make([]*AppData, 0, len(appsInfo))
g, _ := errgroup.WithContext(context.Background())

for _, app := range appsInfo {
wg.Add(1)
go func(app *armappservice.Site) {
defer wg.Done()

select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is a must to avoid blocking
}

if strings.ToLower(*app.Properties.State) != "running" {
logger.Debug("app %s is not running, skipping from report", *app.Name)
return
}
if strings.ToLower(*app.Properties.State) != "running" {
logger.Debug("app %s is not running, skipping from report", *app.Name)
continue
}

g.Go(func() error {
data, err := azureClient.NewAppData(app, logger)
if err != nil {
select {
case errs <- err:
default:
}
cancel() // send cancel signal to goroutines
return
return err
}

if !data.IsEmpty() {
appsChan <- &data
mutex.Lock()
appsData = append(appsData, &data)
mutex.Unlock()
}
}(app)
return nil
})
}

wg.Wait()
close(appsChan)

// Return (first) error, if any:
if ctx.Err() != nil {
return appsData, <-errs
}

for app := range appsChan {
appsData = append(appsData, app)
if err := g.Wait(); err != nil {
return nil, err
}

if appsData == nil {
appsData = make([]*AppData, 0)
}
return appsData, nil
}

// NewAppData constructs and return AppData for the provided armappservice.Site
func (azureClient *AzureClient) NewAppData(app *armappservice.Site, logger *logger.Logger) (AppData, error) {
// Construct and return AppData for the provided armappservice.Site

// get image name from "DOCKER|tookyregistry.azurecr.io/tookyregistry/tooky/sha256:cb29a6"
linuxFxVersion := strings.Split(*app.Properties.SiteConfig.LinuxFxVersion, "|")
notDocker := len(linuxFxVersion) != 2 || linuxFxVersion[0] != "DOCKER"
Expand Down Expand Up @@ -387,9 +364,9 @@ func (azureClient *AzureClient) GetImageFingerprintFromRegistry(imageName string
return fingerprint, nil
}

// parseImageName parses the image name to extract the repository name and tag
// Example: tookyregistry.azurecr.io/tooky/sha256:latest
func parseImageName(imageName string) (registryUrl, repoName, tag string) {
// Parse the image name to extract the repository name and tag
// Example: tookyregistry.azurecr.io/tooky/sha256:latest
splitFullImageName := strings.SplitN(imageName, "/", 2)
if len(splitFullImageName) != 2 {
return "", "", ""
Expand All @@ -398,7 +375,6 @@ func parseImageName(imageName string) (registryUrl, repoName, tag string) {
registryUrl = fmt.Sprintf("https://%s", splitFullImageName[0])

if strings.Contains(splitFullImageName[1], "@sha256:") {
// Example: tookyregistry.azurecr.io/tooky@sha256:cb29a6..7
imageNameAndTag := strings.SplitN(splitFullImageName[1], "@", 2)
repoName = imageNameAndTag[0]
tag = imageNameAndTag[1]
Expand Down
Loading
Loading