diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index f3c1f674eeb..6a8a46795a1 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -197,10 +197,17 @@ func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd return c.result(promise) } -func (c *BlocksPostingsForMatchersCache) result(promise *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) { +func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) { return func(ctx context.Context) (index.Postings, error) { - ids, err := promise.result(ctx) - return index.NewListPostings(ids), err + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ce.done: + if ctx.Err() != nil { + return nil, ctx.Err() + } + return index.NewListPostings(ce.v), ce.err + } } } @@ -327,9 +334,12 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error) c.expire() } - // If is cached but is expired, lets try to replace the cache value - if ok && loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) { - if c.cachedValues.CompareAndSwap(k, loaded, r) { + if ok { + // If the promise is already in the cache, lets wait it to fetch the data. + <-loaded.(*cacheEntryPromise[V]).done + + // If is cached but is expired, lets try to replace the cache value. + if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) { r.v, r.sizeBytes, r.err = fetch() r.sizeBytes += int64(len(k)) c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes) @@ -404,19 +414,6 @@ type cacheEntryPromise[V any] struct { err error } -func (ce *cacheEntryPromise[V]) result(ctx context.Context) (V, error) { - select { - case <-ctx.Done(): - return ce.v, ctx.Err() - case <-ce.done: - if ctx.Err() != nil { - return ce.v, ctx.Err() - } - - return ce.v, ce.err - } -} - func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool { ts := ce.ts r := now.Sub(ts) diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index d4b80abcbd3..51d4ac78d48 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -1,29 +1,59 @@ package tsdb import ( - "context" "fmt" "strings" + "sync" "testing" "time" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) +func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) { + cfg := PostingsCacheConfig{ + Enabled: true, + Ttl: time.Hour, + MaxBytes: 10 << 20, + } + m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) + cache := newFifoCache[int](cfg, "test", m, time.Now) + calls := atomic.Int64{} + concurrency := 100 + wg := sync.WaitGroup{} + wg.Add(concurrency) + + fetchFunc := func() (int, int64, error) { + calls.Inc() + time.Sleep(100 * time.Millisecond) + return 0, 0, nil + } + + for i := 0; i < 100; i++ { + go func() { + defer wg.Done() + cache.getPromiseForKey("key1", fetchFunc) + }() + } + + wg.Wait() + require.Equal(t, int64(1), calls.Load()) + +} + func TestFifoCacheDisabled(t *testing.T) { cfg := PostingsCacheConfig{} cfg.Enabled = false - m := NewPostingCacheMetrics(prometheus.DefaultRegisterer) + m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) timeNow := time.Now cache := newFifoCache[int](cfg, "test", m, timeNow) old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) { return 1, 0, nil }) require.False(t, loaded) - v, err := old.result(context.Background()) - require.NoError(t, err) - require.Equal(t, 1, v) + require.Equal(t, 1, old.v) require.False(t, cache.contains("key1")) } @@ -68,17 +98,13 @@ func TestFifoCacheExpire(t *testing.T) { return 1, 8, nil }) require.False(t, loaded) - v, err := p.result(context.Background()) - require.NoError(t, err) - require.Equal(t, 1, v) + require.Equal(t, 1, p.v) require.True(t, cache.contains(key)) p, loaded = cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 0, nil }) require.True(t, loaded) - v, err = p.result(context.Background()) - require.NoError(t, err) - require.Equal(t, 1, v) + require.Equal(t, 1, p.v) } totalCacheSize := 0 @@ -104,10 +130,8 @@ func TestFifoCacheExpire(t *testing.T) { return 2, 18, nil }) require.False(t, loaded) - v, err := p.result(context.Background()) - require.NoError(t, err) // New value - require.Equal(t, 2, v) + require.Equal(t, 2, p.v) // Total Size Updated require.Equal(t, originalSize+10, cache.cachedBytes) }