Skip to content

Commit df0693c

Browse files
authored
Using AsyncOperationProcessor to store items on cache (#6007)
* Using backfillProcessor to store items on cache Signed-off-by: alanprot <[email protected]> * using thanos CacheType strings Signed-off-by: alanprot <[email protected]> * rafactor backfillDroppeItems metrics Signed-off-by: alanprot <[email protected]> * Creating new cortex_store_multilevel_index_cache_store_dropped_items_total metric Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent ddc0b1d commit df0693c

File tree

4 files changed

+87
-90
lines changed

4 files changed

+87
-90
lines changed

pkg/storage/tsdb/inmemory_index_cache.go

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -55,25 +55,25 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *storecach
5555
Name: "thanos_store_index_cache_items_added_total",
5656
Help: "Total number of items that were added to the index cache.",
5757
}, []string{"item_type"})
58-
c.added.WithLabelValues(cacheTypePostings)
59-
c.added.WithLabelValues(cacheTypeSeries)
60-
c.added.WithLabelValues(cacheTypeExpandedPostings)
58+
c.added.WithLabelValues(storecache.CacheTypePostings)
59+
c.added.WithLabelValues(storecache.CacheTypeSeries)
60+
c.added.WithLabelValues(storecache.CacheTypeExpandedPostings)
6161

62-
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant)
63-
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant)
64-
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant)
62+
c.commonMetrics.RequestTotal.WithLabelValues(storecache.CacheTypePostings, tenancy.DefaultTenant)
63+
c.commonMetrics.RequestTotal.WithLabelValues(storecache.CacheTypeSeries, tenancy.DefaultTenant)
64+
c.commonMetrics.RequestTotal.WithLabelValues(storecache.CacheTypeExpandedPostings, tenancy.DefaultTenant)
6565

6666
c.overflow = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
6767
Name: "thanos_store_index_cache_items_overflowed_total",
6868
Help: "Total number of items that could not be added to the cache due to being too big.",
6969
}, []string{"item_type"})
70-
c.overflow.WithLabelValues(cacheTypePostings)
71-
c.overflow.WithLabelValues(cacheTypeSeries)
72-
c.overflow.WithLabelValues(cacheTypeExpandedPostings)
70+
c.overflow.WithLabelValues(storecache.CacheTypePostings)
71+
c.overflow.WithLabelValues(storecache.CacheTypeSeries)
72+
c.overflow.WithLabelValues(storecache.CacheTypeExpandedPostings)
7373

74-
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant)
75-
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant)
76-
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant)
74+
c.commonMetrics.HitsTotal.WithLabelValues(storecache.CacheTypePostings, tenancy.DefaultTenant)
75+
c.commonMetrics.HitsTotal.WithLabelValues(storecache.CacheTypeSeries, tenancy.DefaultTenant)
76+
c.commonMetrics.HitsTotal.WithLabelValues(storecache.CacheTypeExpandedPostings, tenancy.DefaultTenant)
7777

7878
c.cache = fastcache.New(int(config.MaxSize))
7979
level.Info(logger).Log(
@@ -132,14 +132,14 @@ func copyToKey(l labels.Label) storecache.CacheKeyPostings {
132132
// StorePostings sets the postings identified by the ulid and label to the value v,
133133
// if the postings already exists in the cache it is not mutated.
134134
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
135-
c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v)))
136-
c.set(cacheTypePostings, storecache.CacheKey{Block: blockID.String(), Key: copyToKey(l)}, v)
135+
c.commonMetrics.DataSizeBytes.WithLabelValues(storecache.CacheTypePostings, tenant).Observe(float64(len(v)))
136+
c.set(storecache.CacheTypePostings, storecache.CacheKey{Block: blockID.String(), Key: copyToKey(l)}, v)
137137
}
138138

139139
// FetchMultiPostings fetches multiple postings - each identified by a label -
140140
// and returns a map containing cache hits, along with a list of missing keys.
141141
func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
142-
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypePostings, tenant))
142+
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(storecache.CacheTypePostings, tenant))
143143
defer timer.ObserveDuration()
144144

145145
hits = map[labels.Label][]byte{}
@@ -149,8 +149,8 @@ func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID uli
149149
hit := 0
150150
for _, key := range keys {
151151
if ctx.Err() != nil {
152-
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(requests))
153-
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(hit))
152+
c.commonMetrics.RequestTotal.WithLabelValues(storecache.CacheTypePostings, tenant).Add(float64(requests))
153+
c.commonMetrics.HitsTotal.WithLabelValues(storecache.CacheTypePostings, tenant).Add(float64(hit))
154154
return hits, misses
155155
}
156156
requests++
@@ -162,29 +162,29 @@ func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID uli
162162

163163
misses = append(misses, key)
164164
}
165-
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(requests))
166-
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(hit))
165+
c.commonMetrics.RequestTotal.WithLabelValues(storecache.CacheTypePostings, tenant).Add(float64(requests))
166+
c.commonMetrics.HitsTotal.WithLabelValues(storecache.CacheTypePostings, tenant).Add(float64(hit))
167167

168168
return hits, misses
169169
}
170170

171171
// StoreExpandedPostings stores expanded postings for a set of label matchers.
172172
func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
173-
c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v)))
174-
c.set(cacheTypeExpandedPostings, storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeyExpandedPostings(storecache.LabelMatchersToString(matchers))}, v)
173+
c.commonMetrics.DataSizeBytes.WithLabelValues(storecache.CacheTypeExpandedPostings, tenant).Observe(float64(len(v)))
174+
c.set(storecache.CacheTypeExpandedPostings, storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeyExpandedPostings(storecache.LabelMatchersToString(matchers))}, v)
175175
}
176176

177177
// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
178178
func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
179-
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenant))
179+
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(storecache.CacheTypeExpandedPostings, tenant))
180180
defer timer.ObserveDuration()
181181

182182
if ctx.Err() != nil {
183183
return nil, false
184184
}
185-
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Inc()
185+
c.commonMetrics.RequestTotal.WithLabelValues(storecache.CacheTypeExpandedPostings, tenant).Inc()
186186
if b, ok := c.get(storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeyExpandedPostings(storecache.LabelMatchersToString(matchers))}); ok {
187-
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Inc()
187+
c.commonMetrics.HitsTotal.WithLabelValues(storecache.CacheTypeExpandedPostings, tenant).Inc()
188188
return b, true
189189
}
190190
return nil, false
@@ -193,14 +193,14 @@ func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID
193193
// StoreSeries sets the series identified by the ulid and id to the value v,
194194
// if the series already exists in the cache it is not mutated.
195195
func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
196-
c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v)))
197-
c.set(cacheTypeSeries, storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeySeries(id)}, v)
196+
c.commonMetrics.DataSizeBytes.WithLabelValues(storecache.CacheTypeSeries, tenant).Observe(float64(len(v)))
197+
c.set(storecache.CacheTypeSeries, storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeySeries(id)}, v)
198198
}
199199

200200
// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
201201
// and returns a map containing cache hits, along with a list of missing IDs.
202202
func (c *InMemoryIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
203-
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypeSeries, tenant))
203+
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(storecache.CacheTypeSeries, tenant))
204204
defer timer.ObserveDuration()
205205

206206
hits = map[storage.SeriesRef][]byte{}
@@ -210,8 +210,8 @@ func (c *InMemoryIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.
210210
hit := 0
211211
for _, id := range ids {
212212
if ctx.Err() != nil {
213-
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(requests))
214-
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(hit))
213+
c.commonMetrics.RequestTotal.WithLabelValues(storecache.CacheTypeSeries, tenant).Add(float64(requests))
214+
c.commonMetrics.HitsTotal.WithLabelValues(storecache.CacheTypeSeries, tenant).Add(float64(hit))
215215
return hits, misses
216216
}
217217
requests++
@@ -223,8 +223,8 @@ func (c *InMemoryIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.
223223

224224
misses = append(misses, id)
225225
}
226-
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(requests))
227-
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(hit))
226+
c.commonMetrics.RequestTotal.WithLabelValues(storecache.CacheTypeSeries, tenant).Add(float64(requests))
227+
c.commonMetrics.HitsTotal.WithLabelValues(storecache.CacheTypeSeries, tenant).Add(float64(hit))
228228

229229
return hits, misses
230230
}

pkg/storage/tsdb/inmemory_index_cache_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) {
5858
get func(storage.SeriesRef) ([]byte, bool)
5959
}{
6060
{
61-
typ: cacheTypePostings,
61+
typ: storecache.CacheTypePostings,
6262
set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b, tenancy.DefaultTenant) },
6363
get: func(id storage.SeriesRef) ([]byte, bool) {
6464
hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl}, tenancy.DefaultTenant)
@@ -68,7 +68,7 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) {
6868
},
6969
},
7070
{
71-
typ: cacheTypeSeries,
71+
typ: storecache.CacheTypeSeries,
7272
set: func(id storage.SeriesRef, b []byte) { cache.StoreSeries(uid(id), id, b, tenancy.DefaultTenant) },
7373
get: func(id storage.SeriesRef) ([]byte, bool) {
7474
hits, _ := cache.FetchMultiSeries(ctx, uid(id), []storage.SeriesRef{id}, tenancy.DefaultTenant)
@@ -78,7 +78,7 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) {
7878
},
7979
},
8080
{
81-
typ: cacheTypeExpandedPostings,
81+
typ: storecache.CacheTypeExpandedPostings,
8282
set: func(id storage.SeriesRef, b []byte) {
8383
cache.StoreExpandedPostings(uid(id), []*labels.Matcher{matcher}, b, tenancy.DefaultTenant)
8484
},
@@ -128,7 +128,7 @@ func TestInMemoryIndexCacheSetOverflow(t *testing.T) {
128128
}
129129
cache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, nil, config)
130130
testutil.Ok(t, err)
131-
counter := cache.overflow.WithLabelValues(cacheTypeSeries)
131+
counter := cache.overflow.WithLabelValues(storecache.CacheTypeSeries)
132132
id := ulid.MustNew(ulid.Now(), nil)
133133
// Insert a small value won't trigger item overflow.
134134
cache.StoreSeries(id, 1, []byte("0"), tenancy.DefaultTenant)

0 commit comments

Comments
 (0)