44 "context"
55 "fmt"
66 "io"
7+ "slices"
78 "sort"
89 "sync"
910 "time"
@@ -147,7 +148,7 @@ func (c *NamespacedOperatorCache) Error() error {
147148
148149func (c * Cache ) Namespaced (namespaces ... string ) MultiCatalogOperatorFinder {
149150 const (
150- CachePopulateTimeout = time .Minute
151+ cachePopulateTimeout = time .Minute
151152 )
152153
153154 sources := c .sp .Sources (namespaces ... )
@@ -169,7 +170,9 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
169170 if snapshot .Valid () {
170171 result .snapshots [key ] = snapshot
171172 } else {
172- misses = append (misses , key )
173+ if ! snapshot .RequestSentinelActive () {
174+ misses = append (misses , key )
175+ }
173176 }
174177 }()
175178 }
@@ -209,19 +212,35 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
209212 }
210213 misses = misses [found :]
211214
215+ // remove any with a "live" outstanding request
216+ misses = slices .DeleteFunc (misses , func (key SourceKey ) bool {
217+ hdr := c .snapshots [key ]
218+
219+ // if we already have a request timestamp, we have an outstanding request, so prevent stacking
220+ // and just send new requests if the previous one has expired
221+ if hdr != nil && hdr .RequestSentinelActive () {
222+ c .logger .Printf ("Skipping new request for %s, already in progress" , key )
223+ return true
224+ }
225+ return false
226+ })
227+
212228 for _ , miss := range misses {
213- ctx , cancel := context .WithTimeout (context .Background (), CachePopulateTimeout )
229+ ctx , cancel := context .WithTimeout (context .Background (), cachePopulateTimeout )
214230
215231 hdr := snapshotHeader {
216- key : miss ,
217- pop : cancel ,
218- priority : c .sourcePriorityProvider .Priority (miss ),
232+ key : miss ,
233+ pop : cancel ,
234+ priority : c .sourcePriorityProvider .Priority (miss ),
235+ requestSentinel : time .Now ().Add (cachePopulateTimeout ), // set sentinel to prevent stacking requests
219236 }
220237
221238 hdr .m .Lock ()
222239 c .snapshots [miss ] = & hdr
223240 result .snapshots [miss ] = & hdr
224241
242+ // don't adjust the request sentinel in the goroutine for any outcome, so that we don't stampede sources
243+ // instead, reevaluate the sentinel during the next snapshot
225244 go func (ctx context.Context , hdr * snapshotHeader , source Source ) {
226245 defer hdr .m .Unlock ()
227246 c .sem <- struct {}{}
@@ -294,6 +313,8 @@ type snapshotHeader struct {
294313 pop context.CancelFunc
295314 err error
296315 priority int
316+
317+ requestSentinel time.Time
297318}
298319
299320func (hdr * snapshotHeader ) Cancel () {
@@ -314,6 +335,16 @@ func (hdr *snapshotHeader) Valid() bool {
314335 return true
315336}
316337
338+ func (hdr * snapshotHeader ) RequestSentinelActive () bool {
339+ if hdr == nil {
340+ return false
341+ }
342+
343+ hdr .m .RLock ()
344+ defer hdr .m .RUnlock ()
345+ return time .Now ().Before (hdr .requestSentinel )
346+ }
347+
317348type sortableSnapshots struct {
318349 snapshots []* snapshotHeader
319350 preferredNamespace string
0 commit comments