@@ -2,6 +2,7 @@ package block
22
33import (
44 "encoding/binary"
5+ "sync"
56
67 "github.com/cockroachdb/errors"
78 "github.com/cockroachdb/pebble/internal/base"
@@ -21,9 +22,28 @@ type noopCompressor struct{}
2122type snappyCompressor struct {}
2223type minlzCompressor struct {}
2324
25+ // adaptiveCompressor dynamically switches between snappy and zstd
26+ // compression. It prefers using zstd because of its potential for
27+ // high compression ratios. However, if zstd does not achieve a good
28+ // compression ratio, we apply exponential backoff before trying zstd again.
29+ // If the compression ratio is high (50% or better), we continue using zstd.
30+ type adaptiveCompressor struct {
31+ // timeTilTry is the number of operations to wait before
32+ // attempting zstd compression again after a poor result.
33+ timeTilTry int
34+
35+ // zstdBackoffStep is how much we increase timeTilTry
36+ // each time zstd compression fails to achieve at least
37+ // a 50% compression ratio.
38+ zstdBackoffStep int
39+
40+ zstdCompressor Compressor
41+ }
42+
2443var _ Compressor = noopCompressor {}
2544var _ Compressor = snappyCompressor {}
2645var _ Compressor = minlzCompressor {}
46+ var _ Compressor = (* adaptiveCompressor )(nil )
2747
2848func (noopCompressor ) Compress (dst , src []byte ) (CompressionIndicator , []byte ) {
2949 dst = append (dst [:0 ], src ... )
@@ -63,11 +83,56 @@ func GetCompressor(c Compression) Compressor {
6383 return getZstdCompressor ()
6484 case MinlzCompression :
6585 return minlzCompressor {}
86+ case AdaptiveCompression :
87+ return adaptiveCompressorPool .Get ().(* adaptiveCompressor )
6688 default :
6789 panic ("Invalid compression type." )
6890 }
6991}
7092
93+ var adaptiveCompressorPool = sync.Pool {
94+ New : func () any {
95+ return & adaptiveCompressor {zstdBackoffStep : 1 , zstdCompressor : getZstdCompressor ()}
96+ },
97+ }
98+
99+ func (a * adaptiveCompressor ) Compress (dst , src []byte ) (CompressionIndicator , []byte ) {
100+ var algo CompressionIndicator
101+ var compressedBuf []byte
102+ if a .timeTilTry == 0 {
103+ z := a .zstdCompressor
104+ algo , compressedBuf = z .Compress (dst , src )
105+ // Perform a backoff if zstd compression ratio wasn't better than 50%.
106+ if 10 * len (compressedBuf ) >= 5 * len (src ) {
107+ a .increaseBackoff ()
108+ } else {
109+ a .resetBackoff ()
110+ }
111+ } else {
112+ // Use Snappy
113+ algo , compressedBuf = (snappyCompressor {}).Compress (dst , src )
114+ }
115+ a .timeTilTry --
116+ return algo , compressedBuf
117+ }
118+
119+ func (a * adaptiveCompressor ) Close () {
120+ a .timeTilTry = 0
121+ a .zstdBackoffStep = 1
122+ adaptiveCompressorPool .Put (a )
123+ }
124+
125+ // Exponential backoff for zstd
126+ func (a * adaptiveCompressor ) increaseBackoff () {
127+ a .zstdBackoffStep *= 2
128+ a .timeTilTry += a .zstdBackoffStep
129+ }
130+
131+ func (a * adaptiveCompressor ) resetBackoff () {
132+ a .zstdBackoffStep = 1
133+ a .timeTilTry = 1
134+ }
135+
71136type Decompressor interface {
72137 // DecompressInto decompresses compressed into buf. The buf slice must have the
73138 // exact size as the decompressed value. Callers may use DecompressedLen to
0 commit comments