diff --git a/db.go b/db.go index d13b18087d..91df8462ce 100644 --- a/db.go +++ b/db.go @@ -2078,6 +2078,7 @@ func (d *DB) Metrics() *Metrics { metrics.Table.CompressedCountSnappy += int64(compressionTypes.snappy) metrics.Table.CompressedCountZstd += int64(compressionTypes.zstd) metrics.Table.CompressedCountMinlz += int64(compressionTypes.minlz) + metrics.Table.CompressedCountAdaptive += int64(compressionTypes.adaptive) metrics.Table.CompressedCountNone += int64(compressionTypes.none) } diff --git a/metrics.go b/metrics.go index b1614043d0..30bc95f2b7 100644 --- a/metrics.go +++ b/metrics.go @@ -294,6 +294,8 @@ type Metrics struct { CompressedCountMinlz int64 // The number of sstables that are uncompressed. CompressedCountNone int64 + // The number of sstables that are compressed with adaptive. + CompressedCountAdaptive int64 // Local file sizes. Local struct { @@ -696,6 +698,9 @@ func (m *Metrics) SafeFormat(w redact.SafePrinter, _ rune) { if count := m.Table.CompressedCountMinlz; count > 0 { w.Printf(" minlz: %d", redact.Safe(count)) } + if count := m.Table.CompressedCountAdaptive; count > 0 { + w.Printf(" adaptive: %d", redact.Safe(count)) + } if count := m.Table.CompressedCountNone; count > 0 { w.Printf(" none: %d", redact.Safe(count)) } diff --git a/options.go b/options.go index fa24878e6b..bb4ecbbff6 100644 --- a/options.go +++ b/options.go @@ -45,11 +45,12 @@ type Compression = block.Compression // Exported Compression constants. const ( - DefaultCompression = block.DefaultCompression - NoCompression = block.NoCompression - SnappyCompression = block.SnappyCompression - ZstdCompression = block.ZstdCompression - MinlzCompression = block.MinlzCompression + DefaultCompression = block.DefaultCompression + NoCompression = block.NoCompression + SnappyCompression = block.SnappyCompression + ZstdCompression = block.ZstdCompression + MinlzCompression = block.MinlzCompression + AdaptiveCompression = block.AdaptiveCompression ) // FilterType exports the base.FilterType type. @@ -1952,6 +1953,8 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error { l.Compression = func() Compression { return ZstdCompression } case "Minlz": l.Compression = func() Compression { return MinlzCompression } + case "Adaptive": + l.Compression = func() Compression { return AdaptiveCompression } default: return errors.Errorf("pebble: unknown compression: %q", errors.Safe(value)) } diff --git a/sstable/block/compression.go b/sstable/block/compression.go index 8027d9446e..691836e6a4 100644 --- a/sstable/block/compression.go +++ b/sstable/block/compression.go @@ -26,6 +26,10 @@ const ( SnappyCompression ZstdCompression MinlzCompression + // AdaptiveCompression dynamically chooses between snappy and zstd + // based on recent compression effectiveness. See comment for adaptiveCompressor + // for more details. + AdaptiveCompression NCompression ) @@ -43,6 +47,8 @@ func (c Compression) String() string { return "ZSTD" case MinlzCompression: return "Minlz" + case AdaptiveCompression: + return "Adaptive" default: return "Unknown" } @@ -62,6 +68,8 @@ func CompressionFromString(s string) Compression { return ZstdCompression case "Minlz": return MinlzCompression + case "Adaptive": + return AdaptiveCompression default: return DefaultCompression } diff --git a/sstable/block/compressor.go b/sstable/block/compressor.go index f85c289d92..9e1eab45db 100644 --- a/sstable/block/compressor.go +++ b/sstable/block/compressor.go @@ -2,6 +2,7 @@ package block import ( "encoding/binary" + "sync" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -21,9 +22,28 @@ type noopCompressor struct{} type snappyCompressor struct{} type minlzCompressor struct{} +// adaptiveCompressor dynamically switches between snappy and zstd +// compression. It prefers using zstd because of its potential for +// high compression ratios. However, if zstd does not achieve a good +// compression ratio, we apply exponential backoff before trying zstd again. +// If the compression ratio is high (50% or better), we continue using zstd. +type adaptiveCompressor struct { + // timeTilTry is the number of operations to wait before + // attempting zstd compression again after a poor result. + timeTilTry int + + // zstdBackoffStep is how much we increase timeTilTry + // each time zstd compression fails to achieve at least + // a 50% compression ratio. + zstdBackoffStep int + + zstdCompressor Compressor +} + var _ Compressor = noopCompressor{} var _ Compressor = snappyCompressor{} var _ Compressor = minlzCompressor{} +var _ Compressor = (*adaptiveCompressor)(nil) func (noopCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) { dst = append(dst[:0], src...) @@ -63,11 +83,56 @@ func GetCompressor(c Compression) Compressor { return getZstdCompressor() case MinlzCompression: return minlzCompressor{} + case AdaptiveCompression: + return adaptiveCompressorPool.Get().(*adaptiveCompressor) default: panic("Invalid compression type.") } } +var adaptiveCompressorPool = sync.Pool{ + New: func() any { + return &adaptiveCompressor{zstdBackoffStep: 1, zstdCompressor: getZstdCompressor()} + }, +} + +func (a *adaptiveCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) { + var algo CompressionIndicator + var compressedBuf []byte + if a.timeTilTry == 0 { + z := a.zstdCompressor + algo, compressedBuf = z.Compress(dst, src) + // Perform a backoff if zstd compression ratio wasn't better than 50%. + if 10*len(compressedBuf) >= 5*len(src) { + a.increaseBackoff() + } else { + a.resetBackoff() + } + } else { + // Use Snappy + algo, compressedBuf = (snappyCompressor{}).Compress(dst, src) + } + a.timeTilTry-- + return algo, compressedBuf +} + +func (a *adaptiveCompressor) Close() { + a.timeTilTry = 0 + a.zstdBackoffStep = 1 + adaptiveCompressorPool.Put(a) +} + +// Exponential backoff for zstd +func (a *adaptiveCompressor) increaseBackoff() { + a.zstdBackoffStep *= 2 + a.timeTilTry += a.zstdBackoffStep +} + +func (a *adaptiveCompressor) resetBackoff() { + a.zstdBackoffStep = 1 + a.timeTilTry = 1 +} + type Decompressor interface { // DecompressInto decompresses compressed into buf. The buf slice must have the // exact size as the decompressed value. Callers may use DecompressedLen to diff --git a/table_stats.go b/table_stats.go index c60dd6c311..2b8c63830b 100644 --- a/table_stats.go +++ b/table_stats.go @@ -1042,7 +1042,7 @@ var compressionTypeAnnotator = manifest.Annotator[compressionTypes]{ type compressionTypeAggregator struct{} type compressionTypes struct { - snappy, zstd, minlz, none, unknown uint64 + snappy, zstd, minlz, adaptive, none, unknown uint64 } func (a compressionTypeAggregator) Zero(dst *compressionTypes) *compressionTypes { @@ -1063,6 +1063,8 @@ func (a compressionTypeAggregator) Accumulate( dst.zstd++ case MinlzCompression: dst.minlz++ + case AdaptiveCompression: + dst.adaptive++ case NoCompression: dst.none++ default: @@ -1077,6 +1079,7 @@ func (a compressionTypeAggregator) Merge( dst.snappy += src.snappy dst.zstd += src.zstd dst.minlz += src.minlz + dst.adaptive += src.adaptive dst.none += src.none dst.unknown += src.unknown return dst