From f107f65ab8ddeba81e60fa2788b04acfea65218f Mon Sep 17 00:00:00 2001 From: orouz Date: Wed, 11 Feb 2026 18:11:28 +0200 Subject: [PATCH] fix project metadata request (#3893) (cherry picked from commit 10e1725060d74da98a5ca59ab8a52ab850ac6b5c) --- go.mod | 2 +- .../fetching/fetchers/gcp/assets_fetcher.go | 1 - .../fetchers/gcp/assets_fetcher_test.go | 1 - .../fetching/fetchers/gcp/log_sink_fetcher.go | 1 - .../fetchers/gcp/log_sink_fetcher_test.go | 1 - .../fetchers/gcp/monitoring_fetcher.go | 1 - .../fetchers/gcp/monitoring_fetcher_test.go | 1 - .../fetching/fetchers/gcp/networks_fetcher.go | 1 - .../fetchers/gcp/networks_fetcher_test.go | 1 - .../fetching/fetchers/gcp/policies_fetcher.go | 1 - .../fetchers/gcp/policies_fetcher_test.go | 1 - .../fetchers/gcp/service_usage_fetcher.go | 1 - .../gcp/service_usage_fetcher_test.go | 1 - .../gcplib/inventory/resource_manager.go | 79 +++++++++++-------- .../gcplib/inventory/resource_manager_test.go | 63 +++++++++++++++ 15 files changed, 111 insertions(+), 45 deletions(-) diff --git a/go.mod b/go.mod index ff6d3aecd9..be4e2feb33 100644 --- a/go.mod +++ b/go.mod @@ -561,7 +561,7 @@ require ( golang.org/x/crypto v0.47.0 // indirect golang.org/x/mod v0.31.0 // indirect golang.org/x/net v0.49.0 // indirect - golang.org/x/sync v0.19.0 // indirect + golang.org/x/sync v0.19.0 golang.org/x/sys v0.40.0 // indirect golang.org/x/term v0.39.0 // indirect golang.org/x/text v0.33.0 // indirect diff --git a/internal/resources/fetching/fetchers/gcp/assets_fetcher.go b/internal/resources/fetching/fetchers/gcp/assets_fetcher.go index b4f9a04478..a7ba0242e4 100644 --- a/internal/resources/fetching/fetchers/gcp/assets_fetcher.go +++ b/internal/resources/fetching/fetchers/gcp/assets_fetcher.go @@ -81,7 +81,6 @@ func NewGcpAssetsFetcher(_ context.Context, log *clog.Logger, ch chan fetching.R func (f *GcpAssetsFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error { f.log.Info("GcpAssetsFetcher.Fetch start") defer f.log.Info("GcpAssetsFetcher.Fetch done") - defer f.provider.Clear() resultsCh := make(chan *inventory.ExtendedGcpAsset) go f.provider.ListAssetTypes(ctx, lo.Keys(reversedGcpAssetTypes), resultsCh) diff --git a/internal/resources/fetching/fetchers/gcp/assets_fetcher_test.go b/internal/resources/fetching/fetchers/gcp/assets_fetcher_test.go index ec24163ede..62a16cc047 100644 --- a/internal/resources/fetching/fetchers/gcp/assets_fetcher_test.go +++ b/internal/resources/fetching/fetchers/gcp/assets_fetcher_test.go @@ -69,7 +69,6 @@ func (s *GcpAssetsFetcherTestSuite) TestFetcher_Fetch() { AssetType: "compute.googleapis.com/Instance", }, } - mockInventoryService.EXPECT().Clear() mockInventoryService.On("ListAssetTypes", mock.Anything, mock.Anything, mock.Anything). Run(func(args mock.Arguments) { ch := args.Get(2).(chan<- *inventory.ExtendedGcpAsset) diff --git a/internal/resources/fetching/fetchers/gcp/log_sink_fetcher.go b/internal/resources/fetching/fetchers/gcp/log_sink_fetcher.go index 45870f9b76..b506ef4eb4 100644 --- a/internal/resources/fetching/fetchers/gcp/log_sink_fetcher.go +++ b/internal/resources/fetching/fetchers/gcp/log_sink_fetcher.go @@ -57,7 +57,6 @@ func NewGcpLogSinkFetcher(_ context.Context, log *clog.Logger, ch chan fetching. func (f *GcpLogSinkFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error { f.log.Info("Starting GcpLogSinkFetcher.Fetch") defer f.log.Info("GcpLogSinkFetcher.Fetch done") - defer f.provider.Clear() resultsCh := make(chan *inventory.ProjectAssets) go f.provider.ListProjectAssets(ctx, []string{inventory.LogSinkAssetType}, resultsCh) diff --git a/internal/resources/fetching/fetchers/gcp/log_sink_fetcher_test.go b/internal/resources/fetching/fetchers/gcp/log_sink_fetcher_test.go index 6fb85646b3..9d2f6cfbbe 100644 --- a/internal/resources/fetching/fetchers/gcp/log_sink_fetcher_test.go +++ b/internal/resources/fetching/fetchers/gcp/log_sink_fetcher_test.go @@ -70,7 +70,6 @@ func (s *GcpLogSinkFetcherTestSuite) TestLogSinkFetcher_Fetch_Success() { AccountId: "1", }, } - mockInventoryService.EXPECT().Clear() mockInventoryService.On("ListProjectAssets", mock.Anything, []string{inventory.LogSinkAssetType}, mock.Anything). Run(func(args mock.Arguments) { ch := args.Get(2).(chan<- *inventory.ProjectAssets) diff --git a/internal/resources/fetching/fetchers/gcp/monitoring_fetcher.go b/internal/resources/fetching/fetchers/gcp/monitoring_fetcher.go index cb70049bcd..78327acefe 100644 --- a/internal/resources/fetching/fetchers/gcp/monitoring_fetcher.go +++ b/internal/resources/fetching/fetchers/gcp/monitoring_fetcher.go @@ -52,7 +52,6 @@ func NewGcpMonitoringFetcher(_ context.Context, log *clog.Logger, ch chan fetchi func (f *GcpMonitoringFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error { f.log.Info("Starting GcpMonitoringFetcher.Fetch") defer f.log.Info("GcpMonitoringFetcher.Fetch done") - defer f.provider.Clear() resultsCh := make(chan *inventory.MonitoringAsset) go f.provider.ListMonitoringAssets(ctx, resultsCh) diff --git a/internal/resources/fetching/fetchers/gcp/monitoring_fetcher_test.go b/internal/resources/fetching/fetchers/gcp/monitoring_fetcher_test.go index 4cc7135482..3d5cb7d387 100644 --- a/internal/resources/fetching/fetchers/gcp/monitoring_fetcher_test.go +++ b/internal/resources/fetching/fetchers/gcp/monitoring_fetcher_test.go @@ -75,7 +75,6 @@ func (s *GcpMonitoringFetcherTestSuite) TestMonitoringFetcher_Fetch_Success() { }, } - mockInventoryService.EXPECT().Clear() mockInventoryService.On("ListMonitoringAssets", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { ch := args.Get(1).(chan<- *inventory.MonitoringAsset) diff --git a/internal/resources/fetching/fetchers/gcp/networks_fetcher.go b/internal/resources/fetching/fetchers/gcp/networks_fetcher.go index 2e0f506070..2f1250a01d 100644 --- a/internal/resources/fetching/fetchers/gcp/networks_fetcher.go +++ b/internal/resources/fetching/fetchers/gcp/networks_fetcher.go @@ -51,7 +51,6 @@ func NewGcpNetworksFetcher(_ context.Context, log *clog.Logger, ch chan fetching func (f *GcpNetworksFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error { f.log.Info("Starting GcpNetworksFetcher.Fetch") defer f.log.Info("GcpNetworksFetcher.Fetch done") - defer f.provider.Clear() resultsCh := make(chan *inventory.ExtendedGcpAsset) go f.provider.ListNetworkAssets(ctx, resultsCh) diff --git a/internal/resources/fetching/fetchers/gcp/networks_fetcher_test.go b/internal/resources/fetching/fetchers/gcp/networks_fetcher_test.go index ad608c148f..5d5ce8df29 100644 --- a/internal/resources/fetching/fetchers/gcp/networks_fetcher_test.go +++ b/internal/resources/fetching/fetchers/gcp/networks_fetcher_test.go @@ -67,7 +67,6 @@ func (s *GcpNetworksFetcherTestSuite) TestNetworksFetcher_Fetch_Success() { AccountId: "1", }, } - mockInventoryService.EXPECT().Clear() mockInventoryService.On("ListNetworkAssets", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { ch := args.Get(1).(chan<- *inventory.ExtendedGcpAsset) diff --git a/internal/resources/fetching/fetchers/gcp/policies_fetcher.go b/internal/resources/fetching/fetchers/gcp/policies_fetcher.go index 35f3e5a4ef..eab87adfa1 100644 --- a/internal/resources/fetching/fetchers/gcp/policies_fetcher.go +++ b/internal/resources/fetching/fetchers/gcp/policies_fetcher.go @@ -52,7 +52,6 @@ func NewGcpPoliciesFetcher(_ context.Context, log *clog.Logger, ch chan fetching func (f *GcpPoliciesFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error { f.log.Info("Starting GcpPoliciesFetcher.Fetch") defer f.log.Info("GcpPoliciesFetcher.Fetch done") - defer f.provider.Clear() resultsCh := make(chan *inventory.ProjectPoliciesAsset) go f.provider.ListProjectsAncestorsPolicies(ctx, resultsCh) diff --git a/internal/resources/fetching/fetchers/gcp/policies_fetcher_test.go b/internal/resources/fetching/fetchers/gcp/policies_fetcher_test.go index f6c8b50c7f..b2964c1c87 100644 --- a/internal/resources/fetching/fetchers/gcp/policies_fetcher_test.go +++ b/internal/resources/fetching/fetchers/gcp/policies_fetcher_test.go @@ -70,7 +70,6 @@ func (s *GcpPoliciesFetcherTestSuite) TestPoliciesFetcher_Fetch_Success() { }, } - mockInventoryService.EXPECT().Clear() mockInventoryService.On("ListProjectsAncestorsPolicies", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { ch := args.Get(1).(chan<- *inventory.ProjectPoliciesAsset) diff --git a/internal/resources/fetching/fetchers/gcp/service_usage_fetcher.go b/internal/resources/fetching/fetchers/gcp/service_usage_fetcher.go index 522496855f..728cb3b8e9 100644 --- a/internal/resources/fetching/fetchers/gcp/service_usage_fetcher.go +++ b/internal/resources/fetching/fetchers/gcp/service_usage_fetcher.go @@ -57,7 +57,6 @@ func NewGcpServiceUsageFetcher(_ context.Context, log *clog.Logger, ch chan fetc func (f *GcpServiceUsageFetcher) Fetch(ctx context.Context, cycleMetadata cycle.Metadata) error { f.log.Info("Starting GcpServiceUsageFetcher.Fetch") defer f.log.Info("GcpServiceUsageFetcher.Fetch done") - defer f.provider.Clear() resultsCh := make(chan *inventory.ProjectAssets) go f.provider.ListProjectAssets(ctx, []string{inventory.ServiceUsageAssetType}, resultsCh) diff --git a/internal/resources/fetching/fetchers/gcp/service_usage_fetcher_test.go b/internal/resources/fetching/fetchers/gcp/service_usage_fetcher_test.go index 406304dafd..52e6394fae 100644 --- a/internal/resources/fetching/fetchers/gcp/service_usage_fetcher_test.go +++ b/internal/resources/fetching/fetchers/gcp/service_usage_fetcher_test.go @@ -69,7 +69,6 @@ func (s *GcpServiceUsageFetcherTestSuite) TestServiceUsageFetcher_Fetch_Success( }, } - mockInventoryService.EXPECT().Clear() mockInventoryService.On("ListProjectAssets", mock.Anything, []string{inventory.ServiceUsageAssetType}, mock.Anything). Run(func(args mock.Arguments) { ch := args.Get(2).(chan<- *inventory.ProjectAssets) diff --git a/internal/resources/providers/gcplib/inventory/resource_manager.go b/internal/resources/providers/gcplib/inventory/resource_manager.go index a49cee8f23..c76b755a0e 100644 --- a/internal/resources/providers/gcplib/inventory/resource_manager.go +++ b/internal/resources/providers/gcplib/inventory/resource_manager.go @@ -22,8 +22,11 @@ import ( "fmt" "strings" "sync" + "time" "cloud.google.com/go/asset/apiv1/assetpb" + "golang.org/x/sync/singleflight" + "golang.org/x/time/rate" "google.golang.org/api/cloudresourcemanager/v3" "google.golang.org/api/option" @@ -36,6 +39,8 @@ type ResourceManagerWrapper struct { log *clog.Logger config auth.GcpFactoryConfig accountMetadataCache sync.Map + metadataGroup singleflight.Group + projectsRateLimiter *rate.Limiter getProjectDisplayName func(ctx context.Context, parent string) string getOrganizationDisplayName func(ctx context.Context, parent string) string } @@ -50,36 +55,43 @@ func NewResourceManagerWrapper(ctx context.Context, log *clog.Logger, gcpConfig var orgName string var orgNameMu sync.Mutex - return &ResourceManagerWrapper{ + + wrapper := &ResourceManagerWrapper{ log: log, config: gcpConfig, accountMetadataCache: sync.Map{}, - getProjectDisplayName: func(ctx context.Context, parent string) string { - prj, err := crmService.Projects.Get(parent).Context(ctx).Do() - if err != nil { - log.Errorf("error fetching GCP Project: %s, error: %s", parent, err) - return "" - } - return prj.DisplayName - }, - getOrganizationDisplayName: func(ctx context.Context, parent string) string { - orgNameMu.Lock() - defer orgNameMu.Unlock() - - if orgName != "" { - return orgName - } - - // parent is the ID of the organization, and is the same for every call, so we cache it - org, err := crmService.Organizations.Get(parent).Context(ctx).Do() - if err != nil { - log.Errorf("error fetching GCP Org: %s, error: %s", parent, err) - return "" - } - orgName = org.DisplayName + projectsRateLimiter: rate.NewLimiter(rate.Every(time.Minute/600), 1), + } + wrapper.getProjectDisplayName = func(ctx context.Context, parent string) string { + if err := wrapper.projectsRateLimiter.Wait(ctx); err != nil { + log.Errorf("rate limiter error for GCP Project: %s, error: %s", parent, err) + return "" + } + prj, err := crmService.Projects.Get(parent).Context(ctx).Do() + if err != nil { + log.Errorf("error fetching GCP Project: %s, error: %s", parent, err) + return "" + } + return prj.DisplayName + } + wrapper.getOrganizationDisplayName = func(ctx context.Context, parent string) string { + orgNameMu.Lock() + defer orgNameMu.Unlock() + + if orgName != "" { return orgName - }, - }, nil + } + + // parent is the ID of the organization, and is the same for every call, so we cache it + org, err := crmService.Organizations.Get(parent).Context(ctx).Do() + if err != nil { + log.Errorf("error fetching GCP Org: %s, error: %s", parent, err) + return "" + } + orgName = org.DisplayName + return orgName + } + return wrapper, nil } func (c *ResourceManagerWrapper) GetCloudMetadata(ctx context.Context, asset *assetpb.Asset) *fetching.CloudAccountMetadata { @@ -94,9 +106,14 @@ func (c *ResourceManagerWrapper) GetCloudMetadata(ctx context.Context, asset *as } c.log.Errorf("error casting cloud account metadata for key: %s", key) } - cloudAccountMetadata := c.getMetadata(ctx, orgId, projectId) - c.accountMetadataCache.Store(key, cloudAccountMetadata) - return cloudAccountMetadata + + result, _, _ := c.metadataGroup.Do(key, func() (any, error) { + metadata := c.getMetadata(ctx, orgId, projectId) + c.accountMetadataCache.Store(key, metadata) + return metadata, nil + }) + + return result.(*fetching.CloudAccountMetadata) } func (c *ResourceManagerWrapper) getMetadata(ctx context.Context, orgId string, projectId string) *fetching.CloudAccountMetadata { @@ -127,9 +144,7 @@ func (c *ResourceManagerWrapper) getMetadata(ctx context.Context, orgId string, } } -func (c *ResourceManagerWrapper) Clear() { - c.accountMetadataCache.Clear() -} +func (c *ResourceManagerWrapper) Clear() {} func getOrganizationId(ancestors []string) string { last := ancestors[len(ancestors)-1] diff --git a/internal/resources/providers/gcplib/inventory/resource_manager_test.go b/internal/resources/providers/gcplib/inventory/resource_manager_test.go index aaa8c49f81..ef9ccf08b6 100644 --- a/internal/resources/providers/gcplib/inventory/resource_manager_test.go +++ b/internal/resources/providers/gcplib/inventory/resource_manager_test.go @@ -20,10 +20,13 @@ package inventory import ( "context" "sync" + "sync/atomic" "testing" + "time" "cloud.google.com/go/asset/apiv1/assetpb" "github.com/stretchr/testify/suite" + "golang.org/x/time/rate" "google.golang.org/api/option" "github.com/elastic/cloudbeat/internal/resources/providers/gcplib/auth" @@ -165,3 +168,63 @@ func (s *ResourceManagerTestSuite) TestGetOrganizationId() { func (s *ResourceManagerTestSuite) TestGetProjectId() { s.Equal("5", getProjectId(ancestors)) } + +func (s *ResourceManagerTestSuite) TestProjectsRateLimiterWait() { + t := s.T() + ctx := t.Context() + duration := time.Millisecond + crm := s.NewMockResourceManagerWrapper() + crm.projectsRateLimiter = rate.NewLimiter(rate.Every(duration), 1) + crm.getProjectDisplayName = func(ctx context.Context, _ string) string { + if err := crm.projectsRateLimiter.Wait(ctx); err != nil { + return "" + } + return "projectName" + } + + totalRequests := 5 + startTime := time.Now() + for range totalRequests { + crm.getProjectDisplayName(ctx, "projects/test") + } + endTime := time.Now() + + actualDuration := endTime.Sub(startTime) + minDuration := duration * time.Duration(totalRequests-1) // 1st request is instant, rest wait + s.GreaterOrEqual(actualDuration, minDuration) +} + +func (s *ResourceManagerTestSuite) TestSingleflightDeduplicatesConcurrentRequests() { + t := s.T() + ctx := t.Context() + crm := s.NewMockResourceManagerWrapper() + + var fetchCount atomic.Int32 + crm.getProjectDisplayName = func(_ context.Context, _ string) string { + fetchCount.Add(1) + time.Sleep(10 * time.Millisecond) + return "projectName" + } + + asset := &assetpb.Asset{ + Name: "projects/1", + Ancestors: []string{ + "projects/1", + "organizations/1", + }, + } + + const numRequests = 10 + var wg sync.WaitGroup + wg.Add(numRequests) + for range numRequests { + go func() { + defer wg.Done() + crm.GetCloudMetadata(ctx, asset) + }() + } + wg.Wait() + + // only 1 fetch, rest waited on singleflight + s.Equal(int32(1), fetchCount.Load()) +}