Skip to content

Commit 72bfc29

Browse files
authored
Add check to avoid query 5xx when closing tsdb (#6616)
* Add a ingester readLock Signed-off-by: Daniel Deluiggi <[email protected]> * changelog Signed-off-by: Daniel Deluiggi <[email protected]> --------- Signed-off-by: Daniel Deluiggi <[email protected]>
1 parent 480bd6a commit 72bfc29

File tree

3 files changed

+109
-1
lines changed

3 files changed

+109
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* [BUGFIX] Querier: Fix marshal native histogram with empty bucket when protobuf codec is enabled. #6595
2121
* [BUGFIX] Query Frontend: Fix samples scanned and peak samples query stats when query hits results cache. #6591
2222
* [BUGFIX] Query Frontend: Fix panic caused by nil pointer dereference. #6609
23+
* [BUGFIX] Ingester: Add check to avoid query 5xx when closing tsdb. #6616
2324

2425
## 1.19.0 2025-02-27
2526

pkg/ingester/ingester.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ type userTSDB struct {
305305
stateMtx sync.RWMutex
306306
state tsdbState
307307
pushesInFlight sync.WaitGroup // Increased with stateMtx read lock held, only if state == active or activeShipping.
308+
readInFlight sync.WaitGroup // Increased with stateMtx read lock held, only if state == active, activeShipping or forceCompacting.
308309

309310
// Used to detect idle TSDBs.
310311
lastUpdate atomic.Int64
@@ -1508,6 +1509,29 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
15081509
return &cortexpb.WriteResponse{}, nil
15091510
}
15101511

1512+
func (u *userTSDB) acquireReadLock() error {
1513+
u.stateMtx.RLock()
1514+
defer u.stateMtx.RUnlock()
1515+
1516+
switch u.state {
1517+
case active:
1518+
case activeShipping:
1519+
case forceCompacting:
1520+
// Read are allowed.
1521+
case closing:
1522+
return errors.New("TSDB is closing")
1523+
default:
1524+
return errors.New("TSDB is not active")
1525+
}
1526+
1527+
u.readInFlight.Add(1)
1528+
return nil
1529+
}
1530+
1531+
func (u *userTSDB) releaseReadLock() {
1532+
u.readInFlight.Done()
1533+
}
1534+
15111535
func (u *userTSDB) acquireAppendLock() error {
15121536
u.stateMtx.RLock()
15131537
defer u.stateMtx.RUnlock()
@@ -1555,6 +1579,11 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
15551579
return &client.ExemplarQueryResponse{}, nil
15561580
}
15571581

1582+
if err := db.acquireReadLock(); err != nil {
1583+
return &client.ExemplarQueryResponse{}, nil
1584+
}
1585+
defer db.releaseReadLock()
1586+
15581587
q, err := db.ExemplarQuerier(ctx)
15591588
if err != nil {
15601589
return nil, err
@@ -1648,6 +1677,11 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
16481677
return &client.LabelValuesResponse{}, cleanup, nil
16491678
}
16501679

1680+
if err := db.acquireReadLock(); err != nil {
1681+
return &client.LabelValuesResponse{}, cleanup, nil
1682+
}
1683+
defer db.releaseReadLock()
1684+
16511685
mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin)
16521686
if err != nil {
16531687
return nil, cleanup, err
@@ -1738,6 +1772,11 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
17381772
return &client.LabelNamesResponse{}, cleanup, nil
17391773
}
17401774

1775+
if err := db.acquireReadLock(); err != nil {
1776+
return &client.LabelNamesResponse{}, cleanup, nil
1777+
}
1778+
defer db.releaseReadLock()
1779+
17411780
mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin)
17421781
if err != nil {
17431782
return nil, cleanup, err
@@ -1836,6 +1875,11 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
18361875
return cleanup, nil
18371876
}
18381877

1878+
if err := db.acquireReadLock(); err != nil {
1879+
return cleanup, nil
1880+
}
1881+
defer db.releaseReadLock()
1882+
18391883
// Parse the request
18401884
_, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(i.matchersCache, req)
18411885
if err != nil {
@@ -2070,6 +2114,11 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
20702114
return nil
20712115
}
20722116

2117+
if err := db.acquireReadLock(); err != nil {
2118+
return nil
2119+
}
2120+
defer db.releaseReadLock()
2121+
20732122
numSamples := 0
20742123
numSeries := 0
20752124
totalDataBytes := 0
@@ -2857,8 +2906,9 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes
28572906
// If TSDB is fully closed, we will set state to 'closed', which will prevent this deferred closing -> active transition.
28582907
defer userDB.casState(closing, active)
28592908

2860-
// Make sure we don't ignore any possible inflight pushes.
2909+
// Make sure we don't ignore any possible inflight requests.
28612910
userDB.pushesInFlight.Wait()
2911+
userDB.readInFlight.Wait()
28622912

28632913
// Verify again, things may have changed during the checks and pushes.
28642914
tenantDeleted := false

pkg/ingester/ingester_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4327,6 +4327,63 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) {
43274327
require.NotNil(t, db)
43284328
}
43294329

4330+
func TestIngester_ReadNotFailWhenTSDBIsBeingDeleted(t *testing.T) {
4331+
4332+
tc := map[string]struct {
4333+
state tsdbState
4334+
}{
4335+
"closingTsdb": {state: closing},
4336+
"closedTsdb": {state: closed},
4337+
}
4338+
for name, c := range tc {
4339+
t.Run(name, func(t *testing.T) {
4340+
ctx := context.Background()
4341+
cfg := defaultIngesterTestConfig(t)
4342+
cfg.BlocksStorageConfig.TSDB.CloseIdleTSDBTimeout = 0 // Will not run the loop, but will allow us to close any TSDB fast.
4343+
cfg.BlocksStorageConfig.TSDB.KeepUserTSDBOpenOnShutdown = true
4344+
4345+
// Create ingester
4346+
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
4347+
require.NoError(t, err)
4348+
4349+
require.NoError(t, services.StartAndAwaitRunning(ctx, i))
4350+
defer services.StopAndAwaitTerminated(ctx, i) //nolint:errcheck
4351+
4352+
// Wait until it's ACTIVE
4353+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
4354+
return i.lifecycler.GetState()
4355+
})
4356+
4357+
pushSingleSampleAtTime(t, i, 1*time.Minute.Milliseconds())
4358+
4359+
db, err := i.getOrCreateTSDB(userID, true)
4360+
require.NoError(t, err)
4361+
require.NotNil(t, db)
4362+
4363+
err = db.Close()
4364+
require.NoError(t, err)
4365+
4366+
b := db.casState(active, c.state)
4367+
require.True(t, b)
4368+
4369+
// Mock request
4370+
ctx = user.InjectOrgID(context.Background(), userID)
4371+
4372+
err = i.QueryStream(&client.QueryRequest{EndTimestampMs: 10 * time.Minute.Milliseconds()}, &mockQueryStreamServer{ctx: ctx})
4373+
require.NoError(t, err)
4374+
4375+
_, err = i.LabelNames(ctx, &client.LabelNamesRequest{Limit: int64(1)})
4376+
require.NoError(t, err)
4377+
4378+
_, err = i.LabelValues(ctx, &client.LabelValuesRequest{Limit: int64(1)})
4379+
require.NoError(t, err)
4380+
4381+
_, err = i.MetricsForLabelMatchers(ctx, &client.MetricsForLabelMatchersRequest{Limit: int64(1)})
4382+
require.NoError(t, err)
4383+
})
4384+
}
4385+
}
4386+
43304387
type shipperMock struct {
43314388
mock.Mock
43324389
}

0 commit comments

Comments
 (0)