From e32f1c83e7eb4cdc081ba1aba433059c54b2c995 Mon Sep 17 00:00:00 2001 From: grokspawn Date: Thu, 12 Jun 2025 11:39:11 -0500 Subject: [PATCH 1/4] primitive stampede protection Signed-off-by: grokspawn --- .../registry/resolver/cache/cache.go | 40 ++++++++++++++++--- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/pkg/controller/registry/resolver/cache/cache.go b/pkg/controller/registry/resolver/cache/cache.go index f9b9e03a94..4d332b99e3 100644 --- a/pkg/controller/registry/resolver/cache/cache.go +++ b/pkg/controller/registry/resolver/cache/cache.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "slices" "sort" "sync" "time" @@ -147,7 +148,7 @@ func (c *NamespacedOperatorCache) Error() error { func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder { const ( - CachePopulateTimeout = time.Minute + cachePopulateTimeout = time.Minute ) sources := c.sp.Sources(namespaces...) @@ -169,7 +170,9 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder { if snapshot.Valid() { result.snapshots[key] = snapshot } else { - misses = append(misses, key) + if !snapshot.RequestSentinelActive() { + misses = append(misses, key) + } } }() } @@ -209,19 +212,35 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder { } misses = misses[found:] + // remove any with a "live" outstanding request + misses = slices.DeleteFunc(misses, func(key SourceKey) bool { + hdr := c.snapshots[key] + + // if we already have a request timestamp, we have an outstanding request, so prevent stacking + // and just send new requests if the previous one has expired + if hdr != nil && hdr.RequestSentinelActive() { + c.logger.Printf("Skipping new request for %s, already in progress", key) + return true + } + return false + }) + for _, miss := range misses { - ctx, cancel := context.WithTimeout(context.Background(), CachePopulateTimeout) + ctx, cancel := context.WithTimeout(context.Background(), cachePopulateTimeout) hdr := snapshotHeader{ - key: miss, - pop: cancel, - priority: c.sourcePriorityProvider.Priority(miss), + key: miss, + pop: cancel, + priority: c.sourcePriorityProvider.Priority(miss), + requestSentinel: time.Now().Add(cachePopulateTimeout), // set sentinel to prevent stacking requests } hdr.m.Lock() c.snapshots[miss] = &hdr result.snapshots[miss] = &hdr + // don't adjust the request sentinel in the goroutine for any outcome, so that we don't stampede sources + // instead, reevaluate the sentinel during the next snapshot go func(ctx context.Context, hdr *snapshotHeader, source Source) { defer hdr.m.Unlock() c.sem <- struct{}{} @@ -294,6 +313,8 @@ type snapshotHeader struct { pop context.CancelFunc err error priority int + + requestSentinel time.Time } func (hdr *snapshotHeader) Cancel() { @@ -314,6 +335,13 @@ func (hdr *snapshotHeader) Valid() bool { return true } +func (hdr *snapshotHeader) RequestSentinelActive() bool { + hdr.m.RLock() + defer hdr.m.RUnlock() + + return time.Now().Before(hdr.requestSentinel) +} + type sortableSnapshots struct { snapshots []*snapshotHeader preferredNamespace string From f023cd83250fe6564e4298af67120076a1c75da8 Mon Sep 17 00:00:00 2001 From: grokspawn Date: Tue, 1 Jul 2025 10:03:17 -0500 Subject: [PATCH 2/4] adjust e2e polling duration --- test/e2e/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/util.go b/test/e2e/util.go index 1241113bec..0db1541315 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -51,7 +51,7 @@ import ( const ( pollInterval = 100 * time.Millisecond - pollDuration = 5 * time.Minute + pollDuration = 15 * time.Minute olmConfigMap = "olm-operators" // No-longer used, how long do we keep this around? From 75bd96ea85de61e2010d32ee042589d3f892fe80 Mon Sep 17 00:00:00 2001 From: grokspawn Date: Tue, 1 Jul 2025 13:09:46 -0500 Subject: [PATCH 3/4] avoid misses short circuit if request outstanding --- pkg/controller/registry/resolver/cache/cache.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/controller/registry/resolver/cache/cache.go b/pkg/controller/registry/resolver/cache/cache.go index 4d332b99e3..fb226dfdf0 100644 --- a/pkg/controller/registry/resolver/cache/cache.go +++ b/pkg/controller/registry/resolver/cache/cache.go @@ -170,9 +170,7 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder { if snapshot.Valid() { result.snapshots[key] = snapshot } else { - if !snapshot.RequestSentinelActive() { - misses = append(misses, key) - } + misses = append(misses, key) } }() } From b277e87b24441b9226a71dcbe9b8f6734ad1a84b Mon Sep 17 00:00:00 2001 From: grokspawn Date: Thu, 10 Jul 2025 07:46:31 -0500 Subject: [PATCH 4/4] changing the log to be more prominent --- pkg/controller/registry/resolver/cache/cache.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/controller/registry/resolver/cache/cache.go b/pkg/controller/registry/resolver/cache/cache.go index fb226dfdf0..7db59bbc52 100644 --- a/pkg/controller/registry/resolver/cache/cache.go +++ b/pkg/controller/registry/resolver/cache/cache.go @@ -212,12 +212,15 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder { // remove any with a "live" outstanding request misses = slices.DeleteFunc(misses, func(key SourceKey) bool { + fl := c.logger.(logrus.FieldLogger) + fl.Errorf(">>> checking for outstanding request for %s", key) + hdr := c.snapshots[key] // if we already have a request timestamp, we have an outstanding request, so prevent stacking // and just send new requests if the previous one has expired if hdr != nil && hdr.RequestSentinelActive() { - c.logger.Printf("Skipping new request for %s, already in progress", key) + fl.Errorf("skipping new request for %s, already in progress", key) return true } return false