Skip to content

Commit e56c4c9

Browse files
committed
db: delete pacer rewrite
This change reimplements the delete pacer with an improved design. The previous implementation was prone to falling behind on deletions; the "obsolete ratio" feedback mechanism was broken because we didn't count the queued deletions as obsolete bytes. The implementation was also unnecessarily complicated and intertwined with the rest of the DB code. The new delete pacer is in a separate package with clear API boundaries. It internally maintains all relevant metrics. It is designed to increase the deletion rate whenever deletions stay in the queue for more than 5 minutes (it no longer depends on the obsolete bytes ratio). In addition, instead of proportional control (where the rate is proportional to the backlog, thus going down as we clear it) we switch to constant-horizon control where we keep the same rate until the backlog is cleared. The implementation uses an unbounded `fifo.Queue` instead of a channel of limited size (which can cause unwanted blocking). We still have a safety valve to disable pacing if the queue gets too big. We add delete pacer metrics so that we can monitor the deletion rate and queue depth. Fixes #5424
1 parent c2e588a commit e56c4c9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2374
-1149
lines changed

blob_rewrite.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/cockroachdb/errors"
1818
"github.com/cockroachdb/pebble/internal/base"
19+
"github.com/cockroachdb/pebble/internal/deletepacer"
1920
"github.com/cockroachdb/pebble/internal/manifest"
2021
"github.com/cockroachdb/pebble/internal/problemspans"
2122
"github.com/cockroachdb/pebble/objstorage"
@@ -249,17 +250,17 @@ func (c *blobFileRewriteCompaction) Execute(jobID JobID, d *DB) error {
249250
// Ensure we clean up the blob file we created on failure.
250251
if err != nil {
251252
if objMeta.DiskFileNum != 0 {
252-
d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, []obsoleteFile{
253+
d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, []deletepacer.ObsoleteFile{
253254
{
254-
fileType: base.FileTypeBlob,
255-
fs: d.opts.FS,
256-
path: d.objProvider.Path(objMeta),
257-
fileNum: objMeta.DiskFileNum,
255+
FileType: base.FileTypeBlob,
256+
FS: d.opts.FS,
257+
Path: d.objProvider.Path(objMeta),
258+
FileNum: objMeta.DiskFileNum,
258259
// We don't know the size of the output blob file--it may have
259260
// been half-written. We use the input blob file size as an
260261
// approximation for deletion pacing.
261-
fileSize: c.input.Physical.Size,
262-
isLocal: true,
262+
FileSize: c.input.Physical.Size,
263+
IsLocal: true,
263264
},
264265
})
265266
}

compaction.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ var errEmptyTable = errors.New("pebble: empty table")
4545
var ErrCancelledCompaction = errors.New("pebble: compaction cancelled by a concurrent operation, will retry compaction")
4646

4747
var flushLabels = pprof.Labels("pebble", "flush", "output-level", "L0")
48-
var gcLabels = pprof.Labels("pebble", "gc")
4948

5049
// expandedCompactionByteSizeLimit is the maximum number of bytes in all
5150
// compacted files. We avoid expanding the lower level file set of a compaction

compaction_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3099,7 +3099,7 @@ func TestCompaction_UpdateVersionFails(t *testing.T) {
30993099
}
31003100

31013101
// TestSharedObjectDeletePacing tests that we don't throttle shared object
3102-
// deletes (see the TargetBytesDeletionRate option).
3102+
// deletes (see the DeletePacer.BaselineRate option).
31033103
func TestSharedObjectDeletePacing(t *testing.T) {
31043104
defer leaktest.AfterTest(t)()
31053105

@@ -3109,7 +3109,7 @@ func TestSharedObjectDeletePacing(t *testing.T) {
31093109
"": remote.NewInMem(),
31103110
})
31113111
opts.Experimental.CreateOnShared = remote.CreateOnSharedAll
3112-
opts.TargetByteDeletionRate = func() int { return 1 }
3112+
opts.DeletionPacing.BaselineRate = func() uint64 { return 1 }
31133113
opts.Logger = testutils.Logger{T: t}
31143114

31153115
d, err := Open("", &opts)

db.go

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/pebble/internal/arenaskl"
2222
"github.com/cockroachdb/pebble/internal/base"
2323
"github.com/cockroachdb/pebble/internal/cache"
24+
"github.com/cockroachdb/pebble/internal/deletepacer"
2425
"github.com/cockroachdb/pebble/internal/inflight"
2526
"github.com/cockroachdb/pebble/internal/invalidating"
2627
"github.com/cockroachdb/pebble/internal/invariants"
@@ -310,7 +311,7 @@ type DB struct {
310311
closed *atomic.Value
311312
closedCh chan struct{}
312313

313-
cleanupManager *cleanupManager
314+
deletePacer *deletepacer.DeletePacer
314315

315316
compactionScheduler CompactionScheduler
316317

@@ -485,10 +486,6 @@ type DB struct {
485486
// count acts as a reference count to prohibit file cleaning. See
486487
// DB.{disable,enable}FileDeletions().
487488
disableCount int
488-
// queuedStats holds cumulative stats for files that have been
489-
// queued for deletion by the cleanup manager. These stats are
490-
// monotonically increasing for the *DB's lifetime.
491-
queuedStats obsoleteObjectStats
492489
}
493490

494491
snapshots struct {
@@ -561,7 +558,7 @@ var _ Writer = (*DB)(nil)
561558

562559
// TestOnlyWaitForCleaning MUST only be used in tests.
563560
func (d *DB) TestOnlyWaitForCleaning() {
564-
d.cleanupManager.Wait()
561+
d.deletePacer.WaitForTesting()
565562
}
566563

567564
// Set sets the value for the given key. It overwrites any previous value
@@ -1594,7 +1591,7 @@ func (d *DB) Close() error {
15941591
d.mu.Unlock()
15951592

15961593
// Wait for all cleaning jobs to finish.
1597-
d.cleanupManager.Close()
1594+
d.deletePacer.Close()
15981595

15991596
d.mu.Lock()
16001597
// Sanity check compaction metrics.
@@ -1883,7 +1880,7 @@ func (d *DB) AsyncFlush() (<-chan struct{}, error) {
18831880
func (d *DB) Metrics() *Metrics {
18841881
metrics := &Metrics{}
18851882
walStats := d.mu.log.manager.Stats()
1886-
completedObsoleteFileStats := d.cleanupManager.CompletedStats()
1883+
deletePacerMetrics := d.deletePacer.Metrics()
18871884

18881885
d.mu.Lock()
18891886
vers := d.mu.versions.currentVersion()
@@ -1947,29 +1944,19 @@ func (d *DB) Metrics() *Metrics {
19471944
metrics.Table.ZombieSize = d.mu.versions.zombieTables.TotalSize()
19481945
metrics.Table.Local.ZombieCount, metrics.Table.Local.ZombieSize = d.mu.versions.zombieTables.LocalStats()
19491946

1950-
// The obsolete blob/table metrics have a subtle calculation:
1951-
//
1952-
// (A) The vs.metrics.{Table,BlobFiles}.[Local.]{ObsoleteCount,ObsoleteSize}
1953-
// fields reflect the set of files currently sitting in
1954-
// vs.obsolete{Tables,Blobs} but not yet enqueued to the cleanup manager.
1955-
//
1956-
// (B) The d.mu.fileDeletions.queuedStats field holds the set of files that have
1957-
// been queued for deletion by the cleanup manager.
1958-
//
1959-
// (C) The cleanup manager also maintains cumulative stats for the set of
1960-
// files that have been deleted.
1961-
//
1962-
// The value of currently pending obsolete files is (A) + (B) - (C).
1963-
pendingObsoleteFileStats := d.mu.fileDeletions.queuedStats
1964-
pendingObsoleteFileStats.Sub(completedObsoleteFileStats)
1965-
metrics.Table.Local.ObsoleteCount += pendingObsoleteFileStats.tablesLocal.count
1966-
metrics.Table.Local.ObsoleteSize += pendingObsoleteFileStats.tablesLocal.size
1967-
metrics.Table.ObsoleteCount += int64(pendingObsoleteFileStats.tablesAll.count)
1968-
metrics.Table.ObsoleteSize += pendingObsoleteFileStats.tablesAll.size
1969-
metrics.BlobFiles.Local.ObsoleteCount += pendingObsoleteFileStats.blobFilesLocal.count
1970-
metrics.BlobFiles.Local.ObsoleteSize += pendingObsoleteFileStats.blobFilesLocal.size
1971-
metrics.BlobFiles.ObsoleteCount += pendingObsoleteFileStats.blobFilesAll.count
1972-
metrics.BlobFiles.ObsoleteSize += pendingObsoleteFileStats.blobFilesAll.size
1947+
// The obsolete blob/table metrics have a subtle calculation: the initial
1948+
// metrics we copied from vs.metrics reflect (in
1949+
// {Table,BlobFiles}.[Local.]{ObsoleteCount,ObsoleteSize}) the set of files
1950+
// currently sitting in vs.obsolete{Tables,Blobs} but not yet enqueued to the
1951+
// delete pacer. To complete the metrics, we add the currently enqueued files.
1952+
metrics.Table.Local.ObsoleteCount += deletePacerMetrics.InQueue.Tables.Local.Count
1953+
metrics.Table.Local.ObsoleteSize += deletePacerMetrics.InQueue.Tables.Local.Bytes
1954+
metrics.Table.ObsoleteCount += int64(deletePacerMetrics.InQueue.Tables.All.Count)
1955+
metrics.Table.ObsoleteSize += deletePacerMetrics.InQueue.Tables.All.Bytes
1956+
metrics.BlobFiles.Local.ObsoleteCount += deletePacerMetrics.InQueue.BlobFiles.Local.Count
1957+
metrics.BlobFiles.Local.ObsoleteSize += deletePacerMetrics.InQueue.BlobFiles.Local.Bytes
1958+
metrics.BlobFiles.ObsoleteCount += deletePacerMetrics.InQueue.BlobFiles.All.Count
1959+
metrics.BlobFiles.ObsoleteSize += deletePacerMetrics.InQueue.BlobFiles.All.Bytes
19731960
metrics.private.optionsFileSize = d.optionsFileSize
19741961

19751962
d.mu.versions.logLock()
@@ -2022,6 +2009,8 @@ func (d *DB) Metrics() *Metrics {
20222009
metrics.TableIters = d.fileCache.IterCount()
20232010
metrics.CategoryStats = d.fileCache.SSTStatsCollector().GetStats()
20242011

2012+
metrics.DeletePacer = deletePacerMetrics
2013+
20252014
metrics.SecondaryCacheMetrics = d.objProvider.Metrics()
20262015

20272016
metrics.Uptime = d.opts.private.timeNow().Sub(d.openedAt)

0 commit comments

Comments
 (0)