Skip to content

Commit 5c1580d

Browse files
committed
metrics: add compression counters
Add running counters that keep track of how many bytes were compressed and decompressed. The counters are segregated along the same lines where compression settings can differ: L5 vs L6 vs other levels, and data vs value vs other blocks. The intention is to estimate the CPU usage change for a different compression profile (in conjunction with data about each algorithm's performance, as obtained by the compression analyzer).
1 parent 1bddddf commit 5c1580d

31 files changed

+502
-84
lines changed

compaction.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2959,6 +2959,8 @@ func (d *DB) runCopyCompaction(
29592959
var wrote uint64
29602960
err = d.fileCache.withReader(ctx, block.NoReadEnv, inputMeta.VirtualMeta(), func(r *sstable.Reader, env sstable.ReadEnv) error {
29612961
var err error
2962+
writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, d.TableFormat())
2963+
writerOpts.CompressionCounters = d.compressionCounters.Compressed.ForLevel(base.MakeLevel(c.outputLevel.level))
29622964
// TODO(radu): plumb a ReadEnv to CopySpan (it could use the buffer pool
29632965
// or update category stats).
29642966
wrote, err = sstable.CopySpan(ctx,

db.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,8 @@ type DB struct {
552552
// compaction concurrency
553553
openedAt time.Time
554554

555+
compressionCounters block.CompressionCounters
556+
555557
iterTracker *inflight.Tracker
556558
}
557559

@@ -2014,6 +2016,9 @@ func (d *DB) Metrics() *Metrics {
20142016
blobCompressionMetrics := blobCompressionStatsAnnotator.Annotation(&vers.BlobFiles)
20152017
metrics.BlobFiles.Compression.MergeWith(&blobCompressionMetrics)
20162018

2019+
metrics.CompressionCounters.LogicalBytesCompressed = d.compressionCounters.LoadCompressed()
2020+
metrics.CompressionCounters.LogicalBytesDecompressed = d.compressionCounters.LoadDecompressed()
2021+
20172022
metrics.BlockCache = d.opts.Cache.Metrics()
20182023
metrics.FileCache, metrics.Filter = d.fileCache.Metrics()
20192024
metrics.TableIters = d.fileCache.IterCount()

flushable_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
5858

5959
// We can reuse the ingestLoad function for this test even if we're
6060
// not actually ingesting a file.
61-
lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheHandle, pendingOutputs)
61+
lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheHandle, &d.compressionCounters, pendingOutputs)
6262
if err != nil {
6363
t.Fatal(err)
6464
}

ingest.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ func ingestLoad1(
257257
fmv FormatMajorVersion,
258258
readable objstorage.Readable,
259259
cacheHandle *cache.Handle,
260+
compressionCounters *block.CompressionCounters,
260261
tableNum base.TableNum,
261262
rangeKeyValidator rangeKeyIngestValidator,
262263
) (
@@ -270,6 +271,9 @@ func ingestLoad1(
270271
CacheHandle: cacheHandle,
271272
FileNum: base.PhysicalTableDiskFileNum(tableNum),
272273
}
274+
if compressionCounters != nil {
275+
o.CompressionCounters = &compressionCounters.Decompressed
276+
}
273277
r, err := sstable.NewReader(ctx, readable, o)
274278
if err != nil {
275279
return nil, keyspan.Span{}, base.BlockReadStats{}, errors.CombineErrors(err, readable.Close())
@@ -498,6 +502,7 @@ func ingestLoad(
498502
shared []SharedSSTMeta,
499503
external []ExternalFile,
500504
cacheHandle *cache.Handle,
505+
compressionCounters *block.CompressionCounters,
501506
pending []base.TableNum,
502507
) (ingestLoadResult, error) {
503508
localFileNums := pending[:len(paths)]
@@ -531,7 +536,7 @@ func ingestLoad(
531536
if !shouldDisableRangeKeyChecks {
532537
rangeKeyValidator = validateSuffixedBoundaries(opts.Comparer, lastRangeKey)
533538
}
534-
m, lastRangeKey, blockReadStats, err = ingestLoad1(ctx, opts, fmv, readable, cacheHandle, localFileNums[i], rangeKeyValidator)
539+
m, lastRangeKey, blockReadStats, err = ingestLoad1(ctx, opts, fmv, readable, cacheHandle, compressionCounters, localFileNums[i], rangeKeyValidator)
535540
if err != nil {
536541
return ingestLoadResult{}, err
537542
}
@@ -1480,7 +1485,7 @@ func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats,
14801485
// Load the metadata for all the files being ingested. This step detects
14811486
// and elides empty sstables.
14821487
loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external,
1483-
d.cacheHandle, pendingOutputs)
1488+
d.cacheHandle, &d.compressionCounters, pendingOutputs)
14841489
if err != nil {
14851490
return IngestOperationStats{}, err
14861491
}

ingest_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func TestIngestLoad(t *testing.T) {
148148
FS: mem,
149149
}
150150
opts.WithFSDefaults()
151-
lr, err := ingestLoad(context.Background(), opts, dbVersion, []string{"ext"}, nil, nil, nil, []base.TableNum{1})
151+
lr, err := ingestLoad(context.Background(), opts, dbVersion, []string{"ext"}, nil, nil, nil, nil, []base.TableNum{1})
152152
if err != nil {
153153
return err.Error()
154154
}
@@ -247,7 +247,7 @@ func TestIngestLoadRand(t *testing.T) {
247247
}
248248
opts.WithFSDefaults()
249249
opts.EnsureDefaults()
250-
lr, err := ingestLoad(context.Background(), opts, version, paths, nil, nil, nil, pending)
250+
lr, err := ingestLoad(context.Background(), opts, version, paths, nil, nil, nil, nil, pending)
251251
require.NoError(t, err)
252252

253253
// Reset flaky stats.
@@ -272,7 +272,7 @@ func TestIngestLoadInvalid(t *testing.T) {
272272
FS: mem,
273273
}
274274
opts.WithFSDefaults()
275-
if _, err := ingestLoad(context.Background(), opts, internalFormatNewest, []string{"invalid"}, nil, nil, nil, []base.TableNum{1}); err == nil {
275+
if _, err := ingestLoad(context.Background(), opts, internalFormatNewest, []string{"invalid"}, nil, nil, nil, nil, []base.TableNum{1}); err == nil {
276276
t.Fatalf("expected error, but found success")
277277
}
278278
}

internal.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
package pebble
66

7-
import "github.com/cockroachdb/pebble/internal/base"
7+
import (
8+
"github.com/cockroachdb/pebble/internal/base"
9+
"github.com/cockroachdb/pebble/sstable/block"
10+
)
811

912
// SeqNum exports the base.SeqNum type.
1013
type SeqNum = base.SeqNum
@@ -80,3 +83,5 @@ type ShortAttribute = base.ShortAttribute
8083
// LazyValue.Clone requires a pointer to a LazyFetcher struct to avoid
8184
// allocations. No code outside Pebble needs to peer into a LazyFetcher.
8285
type LazyFetcher = base.LazyFetcher
86+
87+
type CompressionCounters = block.CompressionCounters

metrics.go

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ type Metrics struct {
256256
Zombie metrics.CountAndSizeByPlacement
257257
}
258258

259-
// Compression statistics for sstable data (does not include blob files).
259+
// Compression statistics for the live sstables.
260260
Compression CompressionMetrics
261261

262262
// Garbage bytes.
@@ -322,9 +322,17 @@ type Metrics struct {
322322
// value sizes.
323323
ReferencedBackingValueSize uint64
324324

325+
// Compression statistics for the live blob files.
325326
Compression CompressionMetrics
326327
}
327328

329+
// CompressionCounters are cumulative counters for the number of logical
330+
// (uncompressed) bytes that went through compression and decompression.
331+
CompressionCounters struct {
332+
LogicalBytesCompressed block.ByLevel[block.ByKind[uint64]]
333+
LogicalBytesDecompressed block.ByLevel[block.ByKind[uint64]]
334+
}
335+
328336
FileCache FileCacheMetrics
329337

330338
// Count of the number of open sstable iterators.
@@ -465,7 +473,7 @@ type KeysMetrics struct {
465473

466474
// CompressionMetrics contains compression metrics for sstables or blob files.
467475
type CompressionMetrics struct {
468-
// NoCompressionBytes is the total number of bytes in files that do are not
476+
// NoCompressionBytes is the total number of bytes in files that are not
469477
// compressed. Data can be uncompressed when 1) compression is disabled; 2)
470478
// for certain special types of blocks; and 3) for blocks that are not
471479
// compressible.
@@ -790,6 +798,17 @@ var (
790798
table.Div(),
791799
table.String("blob files", 13, table.AlignRight, func(i compressionInfo) string { return i.blobFiles }),
792800
)
801+
compressionCountersTableHeader = ` Logical bytes compressed / decompressed`
802+
803+
compressionCountersTable = table.Define[compressionCountersInfo](
804+
table.String("level", 5, table.AlignRight, func(i compressionCountersInfo) string { return i.level }),
805+
table.Div(),
806+
table.String("data blocks", 14, table.AlignCenter, func(i compressionCountersInfo) string { return i.DataBlocks }),
807+
table.Div(),
808+
table.String("value blocks", 14, table.AlignCenter, func(i compressionCountersInfo) string { return i.ValueBlocks }),
809+
table.Div(),
810+
table.String("other blocks", 14, table.AlignCenter, func(i compressionCountersInfo) string { return i.OtherBlocks }),
811+
)
793812
deletePacerTableHeader = `DELETE PACER`
794813
deletePacerTable = table.Define[deletePacerInfo](
795814
table.String("", 14, table.AlignRight, func(i deletePacerInfo) string { return i.label }),
@@ -921,6 +940,34 @@ func makeCompressionInfo(algorithm string, table, blob CompressionStatsForSettin
921940
return i
922941
}
923942

943+
type compressionCountersInfo struct {
944+
level string
945+
block.ByKind[string]
946+
}
947+
948+
func makeCompressionCountersInfo(m *Metrics) []compressionCountersInfo {
949+
var result []compressionCountersInfo
950+
isZero := func(c *block.ByKind[uint64]) bool {
951+
return c.DataBlocks == 0 && c.ValueBlocks == 0 && c.OtherBlocks == 0
952+
}
953+
addLevel := func(level string, compressed, decompressed *block.ByKind[uint64]) {
954+
if isZero(compressed) && isZero(decompressed) {
955+
return
956+
}
957+
result = append(result, compressionCountersInfo{
958+
level: level,
959+
ByKind: block.ByKind[string]{
960+
DataBlocks: humanizeBytes(compressed.DataBlocks) + " / " + humanizeBytes(decompressed.DataBlocks),
961+
ValueBlocks: humanizeBytes(compressed.ValueBlocks) + " / " + humanizeBytes(decompressed.ValueBlocks),
962+
OtherBlocks: humanizeBytes(compressed.OtherBlocks) + " / " + humanizeBytes(decompressed.OtherBlocks)},
963+
})
964+
}
965+
addLevel("L0-L4", &m.CompressionCounters.LogicalBytesCompressed.OtherLevels, &m.CompressionCounters.LogicalBytesDecompressed.OtherLevels)
966+
addLevel("L5", &m.CompressionCounters.LogicalBytesCompressed.L5, &m.CompressionCounters.LogicalBytesDecompressed.L5)
967+
addLevel("L6", &m.CompressionCounters.LogicalBytesCompressed.L6, &m.CompressionCounters.LogicalBytesDecompressed.L6)
968+
return result
969+
}
970+
924971
// String pretty-prints the metrics.
925972
//
926973
// See testdata/metrics for an example.
@@ -1112,6 +1159,10 @@ func (m *Metrics) String() string {
11121159
})
11131160
cur = compressionTable.Render(cur, table.RenderOptions{}, compressionContents...)
11141161

1162+
cur = cur.NewlineReturn()
1163+
cur = cur.WriteString(compressionCountersTableHeader).NewlineReturn()
1164+
cur = compressionCountersTable.Render(cur, table.RenderOptions{}, makeCompressionCountersInfo(m)...)
1165+
11151166
cur = cur.NewlineReturn()
11161167
cur.WriteString(deletePacerTableHeader)
11171168
deletePacerContents := []deletePacerInfo{
@@ -1162,8 +1213,8 @@ func (m *Metrics) StringForTests() string {
11621213

11631214
// We recalculate the file cache size using the 64-bit sizes, and we ignore
11641215
// the genericcache metadata size which is harder to adjust.
1165-
const sstableReaderSize64bit = 280
1166-
const blobFileReaderSize64bit = 112
1216+
const sstableReaderSize64bit = 288
1217+
const blobFileReaderSize64bit = 120
11671218
mCopy.FileCache.Size = mCopy.FileCache.TableCount*sstableReaderSize64bit + mCopy.FileCache.BlobFileCount*blobFileReaderSize64bit
11681219
if math.MaxInt == math.MaxInt64 {
11691220
// Verify the 64-bit sizes, so they are kept updated.

metrics_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,24 @@ func exampleMetrics() Metrics {
174174
m.BlobFiles.Compression.Zstd.CompressedBytes = 100 * GB
175175
m.BlobFiles.Compression.Zstd.UncompressedBytes = 500 * GB
176176

177+
byKind := func(n uint64) block.ByKind[uint64] {
178+
return block.ByKind[uint64]{
179+
DataBlocks: n * 10 * GB,
180+
ValueBlocks: n * 100 * GB,
181+
OtherBlocks: n * GB,
182+
}
183+
}
184+
m.CompressionCounters.LogicalBytesCompressed = block.ByLevel[block.ByKind[uint64]]{
185+
L5: byKind(5),
186+
L6: byKind(6),
187+
OtherLevels: byKind(1),
188+
}
189+
m.CompressionCounters.LogicalBytesDecompressed = block.ByLevel[block.ByKind[uint64]]{
190+
L5: byKind(50),
191+
L6: byKind(60),
192+
OtherLevels: byKind(10),
193+
}
194+
177195
m.FileCache.Size = 1 * MB
178196
m.FileCache.TableCount = 180
179197
m.FileCache.BlobFileCount = 181

open.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,9 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
319319
opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
320320
defer opts.FileCache.Unref()
321321
}
322-
d.fileCache = opts.FileCache.newHandle(d.cacheHandle, d.objProvider, d.opts.LoggerAndTracer, d.opts.MakeReaderOptions(), d.reportCorruption)
322+
fileCacheReaderOpts := d.opts.MakeReaderOptions()
323+
fileCacheReaderOpts.CompressionCounters = &d.compressionCounters.Decompressed
324+
d.fileCache = opts.FileCache.newHandle(d.cacheHandle, d.objProvider, d.opts.LoggerAndTracer, fileCacheReaderOpts, d.reportCorruption)
323325
d.newIters = d.fileCache.newIters
324326
d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
325327

options.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2692,17 +2692,20 @@ func (o *Options) MakeWriterOptions(level int, format sstable.TableFormat) sstab
26922692
// makeWriterOptions constructs sstable.WriterOptions for the specified level
26932693
// using the current DB options and format.
26942694
func (d *DB) makeWriterOptions(level int) sstable.WriterOptions {
2695-
return d.opts.MakeWriterOptions(level, d.TableFormat())
2695+
o := d.opts.MakeWriterOptions(level, d.TableFormat())
2696+
o.CompressionCounters = d.compressionCounters.Compressed.ForLevel(base.MakeLevel(level))
2697+
return o
26962698
}
26972699

26982700
// makeBlobWriterOptions constructs blob.FileWriterOptions using the current DB
26992701
// options and format.
27002702
func (d *DB) makeBlobWriterOptions(level int) blob.FileWriterOptions {
27012703
lo := &d.opts.Levels[level]
27022704
return blob.FileWriterOptions{
2703-
Format: d.BlobFileFormat(),
2704-
Compression: lo.Compression(),
2705-
ChecksumType: block.ChecksumTypeCRC32c,
2705+
Format: d.BlobFileFormat(),
2706+
Compression: lo.Compression(),
2707+
CompressionCounters: d.compressionCounters.Compressed.ForLevel(base.MakeLevel(level)),
2708+
ChecksumType: block.ChecksumTypeCRC32c,
27062709
FlushGovernor: block.MakeFlushGovernor(
27072710
lo.BlockSize,
27082711
lo.BlockSizeThreshold,

0 commit comments

Comments
 (0)