@@ -2,6 +2,7 @@ package block
22
33import (
44 "encoding/binary"
5+ "sync"
56
67 "github.com/cockroachdb/errors"
78 "github.com/cockroachdb/pebble/internal/base"
@@ -21,9 +22,28 @@ type noopCompressor struct{}
2122type snappyCompressor struct {}
2223type minlzCompressor struct {}
2324
25+ // adaptiveCompressor dynamically switches between snappy and zstd
26+ // compression. It prefers using zstd because of its potential for
27+ // high compression ratios. However, if zstd does not achieve a good
28+ // compression ratio, we apply exponential backoff before trying zstd again.
29+ // If the compression ratio is high (50% or better), we continue using zstd.
30+ type adaptiveCompressor struct {
31+ // timeTilTry is the number of operations to wait before
32+ // attempting zstd compression again after a poor result.
33+ timeTilTry int
34+
35+ // zstdBackoffStep is how much we increase timeTilTry
36+ // each time zstd compression fails to achieve at least
37+ // a 50% compression ratio.
38+ zstdBackoffStep int
39+
40+ zstdCompressor Compressor
41+ }
42+
2443var _ Compressor = noopCompressor {}
2544var _ Compressor = snappyCompressor {}
2645var _ Compressor = minlzCompressor {}
46+ var _ Compressor = (* adaptiveCompressor )(nil )
2747
2848func (noopCompressor ) Compress (dst , src []byte ) (CompressionIndicator , []byte ) {
2949 panic ("NoCompressionCompressor.Compress() should not be called." )
@@ -62,11 +82,60 @@ func GetCompressor(c Compression) Compressor {
6282 return getZstdCompressor ()
6383 case MinlzCompression :
6484 return minlzCompressor {}
85+ case AdaptiveCompression :
86+ return adaptiveCompressorPool .Get ().(* adaptiveCompressor )
6587 default :
6688 panic ("Invalid compression type." )
6789 }
6890}
6991
92+ var adaptiveCompressorPool = sync.Pool {
93+ New : func () any {
94+ return & adaptiveCompressor {zstdBackoffStep : 1 , zstdCompressor : getZstdCompressor ()}
95+ },
96+ }
97+
98+ func (a * adaptiveCompressor ) Compress (dst , src []byte ) (CompressionIndicator , []byte ) {
99+ var algo CompressionIndicator
100+ var compressedBuf []byte
101+ if a .timeTilTry == 0 {
102+ z := a .zstdCompressor
103+ algo , compressedBuf = z .Compress (dst , src )
104+ // Perform a backoff if zstd compression ratio wasn't better than 50%.
105+ if 10 * len (compressedBuf ) >= 5 * len (src ) {
106+ a .increaseBackoff ()
107+ } else {
108+ a .resetBackoff ()
109+ }
110+ } else {
111+ // Use Snappy
112+ algo , compressedBuf = (snappyCompressor {}).Compress (dst , src )
113+ }
114+ a .timeTilTry --
115+ return algo , compressedBuf
116+ }
117+
118+ func (a * adaptiveCompressor ) Close () {
119+ a .timeTilTry = 0
120+ a .zstdBackoffStep = 1
121+ adaptiveCompressorPool .Put (a )
122+ }
123+
124+ // Exponential backoff for zstd
125+ func (a * adaptiveCompressor ) increaseBackoff () {
126+ a .zstdBackoffStep *= 2
127+ if a .timeTilTry == 0 {
128+ a .timeTilTry = a .zstdBackoffStep
129+ } else {
130+ a .timeTilTry += a .zstdBackoffStep
131+ }
132+ }
133+
134+ func (a * adaptiveCompressor ) resetBackoff () {
135+ a .zstdBackoffStep = 1
136+ a .timeTilTry = 1
137+ }
138+
70139type Decompressor interface {
71140 // DecompressInto decompresses compressed into buf. The buf slice must have the
72141 // exact size as the decompressed value. Callers may use DecompressedLen to
0 commit comments