Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ require (
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/mod v0.32.0 // indirect
golang.org/x/net v0.50.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sync v0.19.0
golang.org/x/sys v0.41.0 // indirect
golang.org/x/term v0.40.0 // indirect
golang.org/x/text v0.34.0 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func NewGcpAssetsFetcher(_ context.Context, log *clog.Logger, ch chan fetching.R
func (f *GcpAssetsFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error {
f.log.Info("GcpAssetsFetcher.Fetch start")
defer f.log.Info("GcpAssetsFetcher.Fetch done")
defer f.provider.Clear()

resultsCh := make(chan *inventory.ExtendedGcpAsset)
go f.provider.ListAssetTypes(ctx, lo.Keys(reversedGcpAssetTypes), resultsCh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (s *GcpAssetsFetcherTestSuite) TestFetcher_Fetch() {
AssetType: "compute.googleapis.com/Instance",
},
}
mockInventoryService.EXPECT().Clear()
mockInventoryService.On("ListAssetTypes", mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
ch := args.Get(2).(chan<- *inventory.ExtendedGcpAsset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func NewGcpLogSinkFetcher(_ context.Context, log *clog.Logger, ch chan fetching.
func (f *GcpLogSinkFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error {
f.log.Info("Starting GcpLogSinkFetcher.Fetch")
defer f.log.Info("GcpLogSinkFetcher.Fetch done")
defer f.provider.Clear()

resultsCh := make(chan *inventory.ProjectAssets)
go f.provider.ListProjectAssets(ctx, []string{inventory.LogSinkAssetType}, resultsCh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func (s *GcpLogSinkFetcherTestSuite) TestLogSinkFetcher_Fetch_Success() {
AccountId: "1",
},
}
mockInventoryService.EXPECT().Clear()
mockInventoryService.On("ListProjectAssets", mock.Anything, []string{inventory.LogSinkAssetType}, mock.Anything).
Run(func(args mock.Arguments) {
ch := args.Get(2).(chan<- *inventory.ProjectAssets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func NewGcpMonitoringFetcher(_ context.Context, log *clog.Logger, ch chan fetchi
func (f *GcpMonitoringFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error {
f.log.Info("Starting GcpMonitoringFetcher.Fetch")
defer f.log.Info("GcpMonitoringFetcher.Fetch done")
defer f.provider.Clear()

resultsCh := make(chan *inventory.MonitoringAsset)
go f.provider.ListMonitoringAssets(ctx, resultsCh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func (s *GcpMonitoringFetcherTestSuite) TestMonitoringFetcher_Fetch_Success() {
},
}

mockInventoryService.EXPECT().Clear()
mockInventoryService.On("ListMonitoringAssets", mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
ch := args.Get(1).(chan<- *inventory.MonitoringAsset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func NewGcpNetworksFetcher(_ context.Context, log *clog.Logger, ch chan fetching
func (f *GcpNetworksFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error {
f.log.Info("Starting GcpNetworksFetcher.Fetch")
defer f.log.Info("GcpNetworksFetcher.Fetch done")
defer f.provider.Clear()

resultsCh := make(chan *inventory.ExtendedGcpAsset)
go f.provider.ListNetworkAssets(ctx, resultsCh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (s *GcpNetworksFetcherTestSuite) TestNetworksFetcher_Fetch_Success() {
AccountId: "1",
},
}
mockInventoryService.EXPECT().Clear()
mockInventoryService.On("ListNetworkAssets", mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
ch := args.Get(1).(chan<- *inventory.ExtendedGcpAsset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func NewGcpPoliciesFetcher(_ context.Context, log *clog.Logger, ch chan fetching
func (f *GcpPoliciesFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error {
f.log.Info("Starting GcpPoliciesFetcher.Fetch")
defer f.log.Info("GcpPoliciesFetcher.Fetch done")
defer f.provider.Clear()

resultsCh := make(chan *inventory.ProjectPoliciesAsset)
go f.provider.ListProjectsAncestorsPolicies(ctx, resultsCh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func (s *GcpPoliciesFetcherTestSuite) TestPoliciesFetcher_Fetch_Success() {
},
}

mockInventoryService.EXPECT().Clear()
mockInventoryService.On("ListProjectsAncestorsPolicies", mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
ch := args.Get(1).(chan<- *inventory.ProjectPoliciesAsset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func NewGcpServiceUsageFetcher(_ context.Context, log *clog.Logger, ch chan fetc
func (f *GcpServiceUsageFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error {
f.log.Info("Starting GcpServiceUsageFetcher.Fetch")
defer f.log.Info("GcpServiceUsageFetcher.Fetch done")
defer f.provider.Clear()

resultsCh := make(chan *inventory.ProjectAssets)
go f.provider.ListProjectAssets(ctx, []string{inventory.ServiceUsageAssetType}, resultsCh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (s *GcpServiceUsageFetcherTestSuite) TestServiceUsageFetcher_Fetch_Success(
},
}

mockInventoryService.EXPECT().Clear()
mockInventoryService.On("ListProjectAssets", mock.Anything, []string{inventory.ServiceUsageAssetType}, mock.Anything).
Run(func(args mock.Arguments) {
ch := args.Get(2).(chan<- *inventory.ProjectAssets)
Expand Down
79 changes: 47 additions & 32 deletions internal/resources/providers/gcplib/inventory/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"fmt"
"strings"
"sync"
"time"

"cloud.google.com/go/asset/apiv1/assetpb"
"golang.org/x/sync/singleflight"
"golang.org/x/time/rate"
"google.golang.org/api/cloudresourcemanager/v3"
"google.golang.org/api/option"

Expand All @@ -36,6 +39,8 @@ type ResourceManagerWrapper struct {
log *clog.Logger
config auth.GcpFactoryConfig
accountMetadataCache sync.Map
metadataGroup singleflight.Group
projectsRateLimiter *rate.Limiter
getProjectDisplayName func(ctx context.Context, parent string) string
getOrganizationDisplayName func(ctx context.Context, parent string) string
}
Expand All @@ -50,36 +55,43 @@ func NewResourceManagerWrapper(ctx context.Context, log *clog.Logger, gcpConfig

var orgName string
var orgNameMu sync.Mutex
return &ResourceManagerWrapper{

wrapper := &ResourceManagerWrapper{
log: log,
config: gcpConfig,
accountMetadataCache: sync.Map{},
getProjectDisplayName: func(ctx context.Context, parent string) string {
prj, err := crmService.Projects.Get(parent).Context(ctx).Do()
if err != nil {
log.Errorf("error fetching GCP Project: %s, error: %s", parent, err)
return ""
}
return prj.DisplayName
},
getOrganizationDisplayName: func(ctx context.Context, parent string) string {
orgNameMu.Lock()
defer orgNameMu.Unlock()

if orgName != "" {
return orgName
}

// parent is the ID of the organization, and is the same for every call, so we cache it
org, err := crmService.Organizations.Get(parent).Context(ctx).Do()
if err != nil {
log.Errorf("error fetching GCP Org: %s, error: %s", parent, err)
return ""
}
orgName = org.DisplayName
projectsRateLimiter: rate.NewLimiter(rate.Every(time.Minute/600), 1),
}
wrapper.getProjectDisplayName = func(ctx context.Context, parent string) string {
if err := wrapper.projectsRateLimiter.Wait(ctx); err != nil {
log.Errorf("rate limiter error for GCP Project: %s, error: %s", parent, err)
return ""
}
prj, err := crmService.Projects.Get(parent).Context(ctx).Do()
if err != nil {
log.Errorf("error fetching GCP Project: %s, error: %s", parent, err)
return ""
}
return prj.DisplayName
}
wrapper.getOrganizationDisplayName = func(ctx context.Context, parent string) string {
orgNameMu.Lock()
defer orgNameMu.Unlock()

if orgName != "" {
return orgName
},
}, nil
}

// parent is the ID of the organization, and is the same for every call, so we cache it
org, err := crmService.Organizations.Get(parent).Context(ctx).Do()
if err != nil {
log.Errorf("error fetching GCP Org: %s, error: %s", parent, err)
return ""
}
orgName = org.DisplayName
return orgName
}
return wrapper, nil
}

func (c *ResourceManagerWrapper) GetCloudMetadata(ctx context.Context, asset *assetpb.Asset) *fetching.CloudAccountMetadata {
Expand All @@ -94,9 +106,14 @@ func (c *ResourceManagerWrapper) GetCloudMetadata(ctx context.Context, asset *as
}
c.log.Errorf("error casting cloud account metadata for key: %s", key)
}
cloudAccountMetadata := c.getMetadata(ctx, orgId, projectId)
c.accountMetadataCache.Store(key, cloudAccountMetadata)
return cloudAccountMetadata

result, _, _ := c.metadataGroup.Do(key, func() (any, error) {
metadata := c.getMetadata(ctx, orgId, projectId)
c.accountMetadataCache.Store(key, metadata)
return metadata, nil
})

return result.(*fetching.CloudAccountMetadata)
}

func (c *ResourceManagerWrapper) getMetadata(ctx context.Context, orgId string, projectId string) *fetching.CloudAccountMetadata {
Expand Down Expand Up @@ -127,9 +144,7 @@ func (c *ResourceManagerWrapper) getMetadata(ctx context.Context, orgId string,
}
}

func (c *ResourceManagerWrapper) Clear() {
c.accountMetadataCache.Clear()
}
func (c *ResourceManagerWrapper) Clear() {}

func getOrganizationId(ancestors []string) string {
last := ancestors[len(ancestors)-1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package inventory
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

"cloud.google.com/go/asset/apiv1/assetpb"
"github.com/stretchr/testify/suite"
"golang.org/x/time/rate"
"google.golang.org/api/option"

"github.com/elastic/cloudbeat/internal/resources/providers/gcplib/auth"
Expand Down Expand Up @@ -165,3 +168,63 @@ func (s *ResourceManagerTestSuite) TestGetOrganizationId() {
func (s *ResourceManagerTestSuite) TestGetProjectId() {
s.Equal("5", getProjectId(ancestors))
}

func (s *ResourceManagerTestSuite) TestProjectsRateLimiterWait() {
t := s.T()
ctx := t.Context()
duration := time.Millisecond
crm := s.NewMockResourceManagerWrapper()
crm.projectsRateLimiter = rate.NewLimiter(rate.Every(duration), 1)
crm.getProjectDisplayName = func(ctx context.Context, _ string) string {
if err := crm.projectsRateLimiter.Wait(ctx); err != nil {
return ""
}
return "projectName"
}

totalRequests := 5
startTime := time.Now()
for range totalRequests {
crm.getProjectDisplayName(ctx, "projects/test")
}
endTime := time.Now()

actualDuration := endTime.Sub(startTime)
minDuration := duration * time.Duration(totalRequests-1) // 1st request is instant, rest wait
s.GreaterOrEqual(actualDuration, minDuration)
}

func (s *ResourceManagerTestSuite) TestSingleflightDeduplicatesConcurrentRequests() {
t := s.T()
ctx := t.Context()
crm := s.NewMockResourceManagerWrapper()

var fetchCount atomic.Int32
crm.getProjectDisplayName = func(_ context.Context, _ string) string {
fetchCount.Add(1)
time.Sleep(10 * time.Millisecond)
return "projectName"
}

asset := &assetpb.Asset{
Name: "projects/1",
Ancestors: []string{
"projects/1",
"organizations/1",
},
}

const numRequests = 10
var wg sync.WaitGroup
wg.Add(numRequests)
for range numRequests {
go func() {
defer wg.Done()
crm.GetCloudMetadata(ctx, asset)
}()
}
wg.Wait()

// only 1 fetch, rest waited on singleflight
s.Equal(int32(1), fetchCount.Load())
}