Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2078,6 +2078,7 @@ func (d *DB) Metrics() *Metrics {
metrics.Table.CompressedCountSnappy += int64(compressionTypes.snappy)
metrics.Table.CompressedCountZstd += int64(compressionTypes.zstd)
metrics.Table.CompressedCountMinlz += int64(compressionTypes.minlz)
metrics.Table.CompressedCountAdaptive += int64(compressionTypes.adaptive)
metrics.Table.CompressedCountNone += int64(compressionTypes.none)
}

Expand Down
5 changes: 5 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ type Metrics struct {
CompressedCountMinlz int64
// The number of sstables that are uncompressed.
CompressedCountNone int64
// The number of sstables that are compressed with adaptive.
CompressedCountAdaptive int64

// Local file sizes.
Local struct {
Expand Down Expand Up @@ -696,6 +698,9 @@ func (m *Metrics) SafeFormat(w redact.SafePrinter, _ rune) {
if count := m.Table.CompressedCountMinlz; count > 0 {
w.Printf(" minlz: %d", redact.Safe(count))
}
if count := m.Table.CompressedCountAdaptive; count > 0 {
w.Printf(" adaptive: %d", redact.Safe(count))
}
if count := m.Table.CompressedCountNone; count > 0 {
w.Printf(" none: %d", redact.Safe(count))
}
Expand Down
13 changes: 8 additions & 5 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ type Compression = block.Compression

// Exported Compression constants.
const (
DefaultCompression = block.DefaultCompression
NoCompression = block.NoCompression
SnappyCompression = block.SnappyCompression
ZstdCompression = block.ZstdCompression
MinlzCompression = block.MinlzCompression
DefaultCompression = block.DefaultCompression
NoCompression = block.NoCompression
SnappyCompression = block.SnappyCompression
ZstdCompression = block.ZstdCompression
MinlzCompression = block.MinlzCompression
AdaptiveCompression = block.AdaptiveCompression
)

// FilterType exports the base.FilterType type.
Expand Down Expand Up @@ -1952,6 +1953,8 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
l.Compression = func() Compression { return ZstdCompression }
case "Minlz":
l.Compression = func() Compression { return MinlzCompression }
case "Adaptive":
l.Compression = func() Compression { return AdaptiveCompression }
default:
return errors.Errorf("pebble: unknown compression: %q", errors.Safe(value))
}
Expand Down
8 changes: 8 additions & 0 deletions sstable/block/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ const (
SnappyCompression
ZstdCompression
MinlzCompression
// AdaptiveCompression dynamically chooses between snappy and zstd
// based on recent compression effectiveness. See comment for adaptiveCompressor
// for more details.
AdaptiveCompression
NCompression
)

Expand All @@ -43,6 +47,8 @@ func (c Compression) String() string {
return "ZSTD"
case MinlzCompression:
return "Minlz"
case AdaptiveCompression:
return "Adaptive"
default:
return "Unknown"
}
Expand All @@ -62,6 +68,8 @@ func CompressionFromString(s string) Compression {
return ZstdCompression
case "Minlz":
return MinlzCompression
case "Adaptive":
return AdaptiveCompression
default:
return DefaultCompression
}
Expand Down
65 changes: 65 additions & 0 deletions sstable/block/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package block

import (
"encoding/binary"
"sync"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
Expand All @@ -21,9 +22,28 @@ type noopCompressor struct{}
type snappyCompressor struct{}
type minlzCompressor struct{}

// adaptiveCompressor dynamically switches between snappy and zstd
// compression. It prefers using zstd because of its potential for
// high compression ratios. However, if zstd does not achieve a good
// compression ratio, we apply exponential backoff before trying zstd again.
// If the compression ratio is high (50% or better), we continue using zstd.
type adaptiveCompressor struct {
// timeTilTry is the number of operations to wait before
// attempting zstd compression again after a poor result.
timeTilTry int

// zstdBackoffStep is how much we increase timeTilTry
// each time zstd compression fails to achieve at least
// a 50% compression ratio.
zstdBackoffStep int

zstdCompressor Compressor
}

var _ Compressor = noopCompressor{}
var _ Compressor = snappyCompressor{}
var _ Compressor = minlzCompressor{}
var _ Compressor = (*adaptiveCompressor)(nil)

func (noopCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
dst = append(dst[:0], src...)
Expand Down Expand Up @@ -63,11 +83,56 @@ func GetCompressor(c Compression) Compressor {
return getZstdCompressor()
case MinlzCompression:
return minlzCompressor{}
case AdaptiveCompression:
return adaptiveCompressorPool.Get().(*adaptiveCompressor)
default:
panic("Invalid compression type.")
}
}

var adaptiveCompressorPool = sync.Pool{
New: func() any {
return &adaptiveCompressor{zstdBackoffStep: 1, zstdCompressor: getZstdCompressor()}
},
}

func (a *adaptiveCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
var algo CompressionIndicator
var compressedBuf []byte
if a.timeTilTry == 0 {
z := a.zstdCompressor
algo, compressedBuf = z.Compress(dst, src)
// Perform a backoff if zstd compression ratio wasn't better than 50%.
if 10*len(compressedBuf) >= 5*len(src) {
a.increaseBackoff()
} else {
a.resetBackoff()
}
} else {
// Use Snappy
algo, compressedBuf = (snappyCompressor{}).Compress(dst, src)
}
a.timeTilTry--
return algo, compressedBuf
}

func (a *adaptiveCompressor) Close() {
a.timeTilTry = 0
a.zstdBackoffStep = 1
adaptiveCompressorPool.Put(a)
}

// Exponential backoff for zstd
func (a *adaptiveCompressor) increaseBackoff() {
a.zstdBackoffStep *= 2
a.timeTilTry += a.zstdBackoffStep
}

func (a *adaptiveCompressor) resetBackoff() {
a.zstdBackoffStep = 1
a.timeTilTry = 1
}

type Decompressor interface {
// DecompressInto decompresses compressed into buf. The buf slice must have the
// exact size as the decompressed value. Callers may use DecompressedLen to
Expand Down
5 changes: 4 additions & 1 deletion table_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@ var compressionTypeAnnotator = manifest.Annotator[compressionTypes]{
type compressionTypeAggregator struct{}

type compressionTypes struct {
snappy, zstd, minlz, none, unknown uint64
snappy, zstd, minlz, adaptive, none, unknown uint64
}

func (a compressionTypeAggregator) Zero(dst *compressionTypes) *compressionTypes {
Expand All @@ -1063,6 +1063,8 @@ func (a compressionTypeAggregator) Accumulate(
dst.zstd++
case MinlzCompression:
dst.minlz++
case AdaptiveCompression:
dst.adaptive++
case NoCompression:
dst.none++
default:
Expand All @@ -1077,6 +1079,7 @@ func (a compressionTypeAggregator) Merge(
dst.snappy += src.snappy
dst.zstd += src.zstd
dst.minlz += src.minlz
dst.adaptive += src.adaptive
dst.none += src.none
dst.unknown += src.unknown
return dst
Expand Down