Skip to content

Commit f4208b5

Browse files
authored
Improve bucket index loader to handle edge cases (#3717)
* Improve bucket index loader to handle edge cases Signed-off-by: Marco Pracucci <[email protected]> * Updated CHANGELOG Signed-off-by: Marco Pracucci <[email protected]>
1 parent 60ab544 commit f4208b5

File tree

3 files changed

+173
-20
lines changed

3 files changed

+173
-20
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* `cortex_alertmanager_sync_configs_failed_total`
2929
* `cortex_alertmanager_tenants_discovered`
3030
* `cortex_alertmanager_tenants_owned`
31-
* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers, store-gateways and rulers. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 #3625 #3711
31+
* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers, store-gateways and rulers. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 #3625 #3711 #3715
3232
* [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier, store-gateway and ruler. When enabled, the querier, store-gateway and ruler will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics are exported by the querier and ruler: #3614 #3625
3333
* `cortex_bucket_index_loads_total`
3434
* `cortex_bucket_index_load_failures_total`

pkg/storage/tsdb/bucketindex/loader.go

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,12 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) {
110110
// (eg. corrupted bucket index or not existing).
111111
l.cacheIndex(userID, nil, err)
112112

113-
l.loadFailures.Inc()
114113
if errors.Is(err, ErrIndexNotFound) {
115114
level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID)
116115
} else {
116+
// We don't track ErrIndexNotFound as failure because it's a legit case (eg. a tenant just
117+
// started to remote write and its blocks haven't uploaded to storage yet).
118+
l.loadFailures.Inc()
117119
level.Error(l.logger).Log("msg", "unable to load bucket index", "user", userID, "err", err)
118120
}
119121

@@ -166,12 +168,17 @@ func (l *Loader) checkCachedIndexesToUpdateAndDelete() (toUpdate, toDelete []str
166168
defer l.indexesMx.RUnlock()
167169

168170
for userID, entry := range l.indexes {
171+
// Given ErrIndexNotFound is a legit case and assuming UpdateOnErrorInterval is lower than
172+
// UpdateOnStaleInterval, we don't consider ErrIndexNotFound as an error with regards to the
173+
// refresh interval and so it will updated once stale.
174+
isError := entry.err != nil && !errors.Is(entry.err, ErrIndexNotFound)
175+
169176
switch {
170177
case now.Sub(entry.getRequestedAt()) >= l.cfg.IdleTimeout:
171178
toDelete = append(toDelete, userID)
172-
case entry.err != nil && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnErrorInterval:
179+
case isError && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnErrorInterval:
173180
toUpdate = append(toUpdate, userID)
174-
case entry.err == nil && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnStaleInterval:
181+
case !isError && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnStaleInterval:
175182
toUpdate = append(toUpdate, userID)
176183
}
177184
}
@@ -186,29 +193,20 @@ func (l *Loader) updateCachedIndex(ctx context.Context, userID string) {
186193
l.loadAttempts.Inc()
187194
startTime := time.Now()
188195
idx, err := ReadIndex(readCtx, l.bkt, userID, l.logger)
189-
190-
if errors.Is(err, ErrIndexNotFound) {
191-
level.Info(l.logger).Log("msg", "unloaded bucket index", "user", userID, "reason", "not found during periodic check")
192-
193-
// Remove from cache.
194-
l.indexesMx.Lock()
195-
delete(l.indexes, userID)
196-
l.indexesMx.Unlock()
197-
198-
return
199-
}
200-
if err != nil {
196+
if err != nil && !errors.Is(err, ErrIndexNotFound) {
201197
l.loadFailures.Inc()
202198
level.Warn(l.logger).Log("msg", "unable to update bucket index", "user", userID, "err", err)
203199
return
204200
}
205201

206202
l.loadDuration.Observe(time.Since(startTime).Seconds())
207203

208-
// Cache it.
204+
// We cache it either it was successfully refreshed or wasn't found. An use case for caching the ErrIndexNotFound
205+
// is when a tenant has rules configured but hasn't started remote writing yet. Rules will be evaluated and
206+
// bucket index loaded by the ruler.
209207
l.indexesMx.Lock()
210208
l.indexes[userID].index = idx
211-
l.indexes[userID].err = nil
209+
l.indexes[userID].err = err
212210
l.indexes[userID].setUpdatedAt(startTime)
213211
l.indexesMx.Unlock()
214212
}

pkg/storage/tsdb/bucketindex/loader_test.go

Lines changed: 157 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,13 @@ func TestLoader_GetIndex_ShouldCacheError(t *testing.T) {
9797
require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
9898
})
9999

100+
// Write a corrupted index.
101+
require.NoError(t, bkt.Upload(ctx, path.Join("user-1", IndexCompressedFilename), strings.NewReader("invalid!}")))
102+
100103
// Request the index multiple times.
101104
for i := 0; i < 10; i++ {
102105
_, err := loader.GetIndex(ctx, "user-1")
103-
require.Equal(t, ErrIndexNotFound, err)
106+
require.Equal(t, ErrIndexCorrupted, err)
104107
}
105108

106109
// Ensure metrics have been updated accordingly.
@@ -121,6 +124,42 @@ func TestLoader_GetIndex_ShouldCacheError(t *testing.T) {
121124
))
122125
}
123126

127+
func TestLoader_GetIndex_ShouldCacheIndexNotFoundError(t *testing.T) {
128+
ctx := context.Background()
129+
reg := prometheus.NewPedanticRegistry()
130+
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
131+
132+
// Create the loader.
133+
loader := NewLoader(prepareLoaderConfig(), bkt, log.NewNopLogger(), reg)
134+
require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
135+
t.Cleanup(func() {
136+
require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
137+
})
138+
139+
// Request the index multiple times.
140+
for i := 0; i < 10; i++ {
141+
_, err := loader.GetIndex(ctx, "user-1")
142+
require.Equal(t, ErrIndexNotFound, err)
143+
}
144+
145+
// Ensure metrics have been updated accordingly.
146+
assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
147+
# HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures.
148+
# TYPE cortex_bucket_index_load_failures_total counter
149+
cortex_bucket_index_load_failures_total 0
150+
# HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
151+
# TYPE cortex_bucket_index_loaded gauge
152+
cortex_bucket_index_loaded 0
153+
# HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts.
154+
# TYPE cortex_bucket_index_loads_total counter
155+
cortex_bucket_index_loads_total 1
156+
`),
157+
"cortex_bucket_index_loads_total",
158+
"cortex_bucket_index_load_failures_total",
159+
"cortex_bucket_index_loaded",
160+
))
161+
}
162+
124163
func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadSuccess(t *testing.T) {
125164
ctx := context.Background()
126165
reg := prometheus.NewPedanticRegistry()
@@ -191,6 +230,9 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T)
191230
reg := prometheus.NewPedanticRegistry()
192231
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
193232

233+
// Write a corrupted index.
234+
require.NoError(t, bkt.Upload(ctx, path.Join("user-1", IndexCompressedFilename), strings.NewReader("invalid!}")))
235+
194236
// Create the loader.
195237
cfg := LoaderConfig{
196238
CheckInterval: time.Second,
@@ -205,6 +247,59 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T)
205247
require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
206248
})
207249

250+
_, err := loader.GetIndex(ctx, "user-1")
251+
assert.Equal(t, ErrIndexCorrupted, err)
252+
253+
// Upload the bucket index.
254+
idx := &Index{
255+
Version: IndexVersion1,
256+
Blocks: Blocks{
257+
{ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20},
258+
},
259+
BlockDeletionMarks: nil,
260+
UpdatedAt: time.Now().Unix(),
261+
}
262+
require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx))
263+
264+
// Wait until the index has been updated in background.
265+
test.Poll(t, 3*time.Second, nil, func() interface{} {
266+
_, err := loader.GetIndex(ctx, "user-1")
267+
return err
268+
})
269+
270+
actualIdx, err := loader.GetIndex(ctx, "user-1")
271+
require.NoError(t, err)
272+
assert.Equal(t, idx, actualIdx)
273+
274+
// Ensure metrics have been updated accordingly.
275+
assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
276+
# HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
277+
# TYPE cortex_bucket_index_loaded gauge
278+
cortex_bucket_index_loaded 1
279+
`),
280+
"cortex_bucket_index_loaded",
281+
))
282+
}
283+
284+
func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousIndexNotFound(t *testing.T) {
285+
ctx := context.Background()
286+
reg := prometheus.NewPedanticRegistry()
287+
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
288+
289+
// Create the loader.
290+
cfg := LoaderConfig{
291+
CheckInterval: time.Second,
292+
UpdateOnStaleInterval: time.Second,
293+
UpdateOnErrorInterval: time.Hour, // Intentionally high to not hit it.
294+
IdleTimeout: time.Hour, // Intentionally high to not hit it.
295+
}
296+
297+
loader := NewLoader(cfg, bkt, log.NewNopLogger(), reg)
298+
require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
299+
t.Cleanup(func() {
300+
require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
301+
})
302+
208303
_, err := loader.GetIndex(ctx, "user-1")
209304
assert.Equal(t, ErrIndexNotFound, err)
210305

@@ -239,7 +334,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T)
239334
))
240335
}
241336

242-
func TestLoader_ShouldNotCacheErrorOnBackgroundUpdates(t *testing.T) {
337+
func TestLoader_ShouldNotCacheCriticalErrorOnBackgroundUpdates(t *testing.T) {
243338
ctx := context.Background()
244339
reg := prometheus.NewPedanticRegistry()
245340
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
@@ -295,6 +390,66 @@ func TestLoader_ShouldNotCacheErrorOnBackgroundUpdates(t *testing.T) {
295390
))
296391
}
297392

393+
func TestLoader_ShouldCacheIndexNotFoundOnBackgroundUpdates(t *testing.T) {
394+
ctx := context.Background()
395+
reg := prometheus.NewPedanticRegistry()
396+
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
397+
398+
// Create a bucket index.
399+
idx := &Index{
400+
Version: IndexVersion1,
401+
Blocks: Blocks{
402+
{ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20},
403+
},
404+
BlockDeletionMarks: nil,
405+
UpdatedAt: time.Now().Unix(),
406+
}
407+
require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx))
408+
409+
// Create the loader.
410+
cfg := LoaderConfig{
411+
CheckInterval: time.Second,
412+
UpdateOnStaleInterval: time.Second,
413+
UpdateOnErrorInterval: time.Second,
414+
IdleTimeout: time.Hour, // Intentionally high to not hit it.
415+
}
416+
417+
loader := NewLoader(cfg, bkt, log.NewNopLogger(), reg)
418+
require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
419+
t.Cleanup(func() {
420+
require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
421+
})
422+
423+
actualIdx, err := loader.GetIndex(ctx, "user-1")
424+
require.NoError(t, err)
425+
assert.Equal(t, idx, actualIdx)
426+
427+
// Delete the bucket index.
428+
require.NoError(t, DeleteIndex(ctx, bkt, "user-1"))
429+
430+
// Wait until the next index load attempt occurs.
431+
prevLoads := testutil.ToFloat64(loader.loadAttempts)
432+
test.Poll(t, 3*time.Second, true, func() interface{} {
433+
return testutil.ToFloat64(loader.loadAttempts) > prevLoads
434+
})
435+
436+
// We expect the bucket index is not considered loaded because of the error.
437+
assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
438+
# HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
439+
# TYPE cortex_bucket_index_loaded gauge
440+
cortex_bucket_index_loaded 0
441+
`),
442+
"cortex_bucket_index_loaded",
443+
))
444+
445+
// Try to get the index again. We expect no load attempt because the error has been cached.
446+
prevLoads = testutil.ToFloat64(loader.loadAttempts)
447+
actualIdx, err = loader.GetIndex(ctx, "user-1")
448+
assert.Equal(t, ErrIndexNotFound, err)
449+
assert.Nil(t, actualIdx)
450+
assert.Equal(t, prevLoads, testutil.ToFloat64(loader.loadAttempts))
451+
}
452+
298453
func TestLoader_ShouldOffloadIndexIfNotFoundDuringBackgroundUpdates(t *testing.T) {
299454
ctx := context.Background()
300455
reg := prometheus.NewPedanticRegistry()

0 commit comments

Comments
 (0)