Skip to content

Commit b74cebf

Browse files
authored
Merge pull request prometheus#12920 from prymitive/compactLock
Fix locks in db.reloadBlocks()
2 parents 8cd9069 + e372812 commit b74cebf

File tree

2 files changed

+17
-63
lines changed

2 files changed

+17
-63
lines changed

tsdb/db.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -992,9 +992,14 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
992992
db.metrics.maxBytes.Set(float64(maxBytes))
993993
db.metrics.retentionDuration.Set((time.Duration(opts.RetentionDuration) * time.Millisecond).Seconds())
994994

995+
// Calling db.reload() calls db.reloadBlocks() which requires cmtx to be locked.
996+
db.cmtx.Lock()
995997
if err := db.reload(); err != nil {
998+
db.cmtx.Unlock()
996999
return nil, err
9971000
}
1001+
db.cmtx.Unlock()
1002+
9981003
// Set the min valid time for the ingested samples
9991004
// to be no lower than the maxt of the last block.
10001005
minValidTime := int64(math.MinInt64)
@@ -1363,6 +1368,7 @@ func (db *DB) CompactOOOHead(ctx context.Context) error {
13631368
// Callback for testing.
13641369
var compactOOOHeadTestingCallback func()
13651370

1371+
// The db.cmtx mutex should be held before calling this method.
13661372
func (db *DB) compactOOOHead(ctx context.Context) error {
13671373
if !db.oooWasEnabled.Load() {
13681374
return nil
@@ -1417,6 +1423,7 @@ func (db *DB) compactOOOHead(ctx context.Context) error {
14171423

14181424
// compactOOO creates a new block per possible block range in the compactor's directory from the OOO Head given.
14191425
// Each ULID in the result corresponds to a block in a unique time range.
1426+
// The db.cmtx mutex should be held before calling this method.
14201427
func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID, err error) {
14211428
start := time.Now()
14221429

@@ -1461,7 +1468,7 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID
14611468
}
14621469

14631470
// compactHead compacts the given RangeHead.
1464-
// The compaction mutex should be held before calling this method.
1471+
// The db.cmtx should be held before calling this method.
14651472
func (db *DB) compactHead(head *RangeHead) error {
14661473
uids, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil)
14671474
if err != nil {
@@ -1487,7 +1494,7 @@ func (db *DB) compactHead(head *RangeHead) error {
14871494
}
14881495

14891496
// compactBlocks compacts all the eligible on-disk blocks.
1490-
// The compaction mutex should be held before calling this method.
1497+
// The db.cmtx should be held before calling this method.
14911498
func (db *DB) compactBlocks() (err error) {
14921499
// Check for compactions of multiple blocks.
14931500
for {
@@ -1544,6 +1551,7 @@ func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) {
15441551
}
15451552

15461553
// reload reloads blocks and truncates the head and its WAL.
1554+
// The db.cmtx mutex should be held before calling this method.
15471555
func (db *DB) reload() error {
15481556
if err := db.reloadBlocks(); err != nil {
15491557
return fmt.Errorf("reloadBlocks: %w", err)
@@ -1560,6 +1568,7 @@ func (db *DB) reload() error {
15601568

15611569
// reloadBlocks reloads blocks without touching head.
15621570
// Blocks that are obsolete due to replacement or retention will be deleted.
1571+
// The db.cmtx mutex should be held before calling this method.
15631572
func (db *DB) reloadBlocks() (err error) {
15641573
defer func() {
15651574
if err != nil {
@@ -1568,13 +1577,9 @@ func (db *DB) reloadBlocks() (err error) {
15681577
db.metrics.reloads.Inc()
15691578
}()
15701579

1571-
// Now that we reload TSDB every minute, there is a high chance for a race condition with a reload
1572-
// triggered by CleanTombstones(). We need to lock the reload to avoid the situation where
1573-
// a normal reload and CleanTombstones try to delete the same block.
1574-
db.mtx.Lock()
1575-
defer db.mtx.Unlock()
1576-
1580+
db.mtx.RLock()
15771581
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory)
1582+
db.mtx.RUnlock()
15781583
if err != nil {
15791584
return err
15801585
}
@@ -1600,11 +1605,13 @@ func (db *DB) reloadBlocks() (err error) {
16001605
if len(corrupted) > 0 {
16011606
// Corrupted but no child loaded for it.
16021607
// Close all new blocks to release the lock for windows.
1608+
db.mtx.RLock()
16031609
for _, block := range loadable {
16041610
if _, open := getBlock(db.blocks, block.Meta().ULID); !open {
16051611
block.Close()
16061612
}
16071613
}
1614+
db.mtx.RUnlock()
16081615
errs := tsdb_errors.NewMulti()
16091616
for ulid, err := range corrupted {
16101617
if err != nil {
@@ -1643,8 +1650,10 @@ func (db *DB) reloadBlocks() (err error) {
16431650
})
16441651

16451652
// Swap new blocks first for subsequently created readers to be seen.
1653+
db.mtx.Lock()
16461654
oldBlocks := db.blocks
16471655
db.blocks = toLoad
1656+
db.mtx.Unlock()
16481657

16491658
// Only check overlapping blocks when overlapping compaction is enabled.
16501659
if db.opts.EnableOverlappingCompaction {

tsdb/db_test.go

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,61 +1352,6 @@ func TestTombstoneCleanFail(t *testing.T) {
13521352
require.Len(t, intersection(oldBlockDirs, actualBlockDirs), len(actualBlockDirs)-1)
13531353
}
13541354

1355-
// TestTombstoneCleanRetentionLimitsRace tests that a CleanTombstones operation
1356-
// and retention limit policies, when triggered at the same time,
1357-
// won't race against each other.
1358-
func TestTombstoneCleanRetentionLimitsRace(t *testing.T) {
1359-
if testing.Short() {
1360-
t.Skip("skipping test in short mode.")
1361-
}
1362-
1363-
opts := DefaultOptions()
1364-
var wg sync.WaitGroup
1365-
1366-
// We want to make sure that a race doesn't happen when a normal reload and a CleanTombstones()
1367-
// reload try to delete the same block. Without the correct lock placement, it can happen if a
1368-
// block is marked for deletion due to retention limits and also has tombstones to be cleaned at
1369-
// the same time.
1370-
//
1371-
// That is something tricky to trigger, so let's try several times just to make sure.
1372-
for i := 0; i < 20; i++ {
1373-
t.Run(fmt.Sprintf("iteration%d", i), func(t *testing.T) {
1374-
db := openTestDB(t, opts, nil)
1375-
totalBlocks := 20
1376-
dbDir := db.Dir()
1377-
// Generate some blocks with old mint (near epoch).
1378-
for j := 0; j < totalBlocks; j++ {
1379-
blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1))
1380-
block, err := OpenBlock(nil, blockDir, nil, nil)
1381-
require.NoError(t, err)
1382-
// Cover block with tombstones so it can be deleted with CleanTombstones() as well.
1383-
tomb := tombstones.NewMemTombstones()
1384-
tomb.AddInterval(0, tombstones.Interval{Mint: int64(j), Maxt: int64(j) + 1})
1385-
block.tombstones = tomb
1386-
1387-
db.blocks = append(db.blocks, block)
1388-
}
1389-
1390-
wg.Add(2)
1391-
// Run reload and CleanTombstones together, with a small time window randomization
1392-
go func() {
1393-
defer wg.Done()
1394-
time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond)))
1395-
require.NoError(t, db.reloadBlocks())
1396-
}()
1397-
go func() {
1398-
defer wg.Done()
1399-
time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond)))
1400-
require.NoError(t, db.CleanTombstones())
1401-
}()
1402-
1403-
wg.Wait()
1404-
1405-
require.NoError(t, db.Close())
1406-
})
1407-
}
1408-
}
1409-
14101355
func intersection(oldBlocks, actualBlocks []string) (intersection []string) {
14111356
hash := make(map[string]bool)
14121357
for _, e := range oldBlocks {

0 commit comments

Comments
 (0)