Skip to content
This repository was archived by the owner on Oct 30, 2025. It is now read-only.
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 113 additions & 9 deletions groot/internal/rcompress/rcompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"errors"
"fmt"
"io"
"sync"

"github.com/klauspost/compress/flate"
"github.com/klauspost/compress/zlib"
Expand Down Expand Up @@ -206,17 +207,17 @@ func compressBlock(alg Kind, lvl int, tgt, src []byte) (int, error) {
hdr[0] = 'Z'
hdr[1] = 'L'
hdr[2] = 8 // zlib deflated
w, err := zlib.NewWriterLevel(buf, lvl)
w, err := zlibGetWriterLevel(buf, lvl)
if err != nil {
return 0, fmt.Errorf("rcompress: could not create ZLIB compressor: %w", err)
}

_, err = w.Write(src)
if err != nil {
_ = w.Close()
_ = zlibPutWriterLevel(w, lvl)
return 0, fmt.Errorf("rcompress: could not write ZLIB compressed bytes: %w", err)
}
err = w.Close()
err = zlibPutWriterLevel(w, lvl)
switch {
case err == nil:
// ok.
Expand Down Expand Up @@ -268,7 +269,7 @@ func compressBlock(alg Kind, lvl int, tgt, src []byte) (int, error) {

const chksum = 8
var room = int(float64(srcsz) * 2e-4) // lz4 needs some extra scratch space
dst := make([]byte, HeaderSize+chksum+len(src)+room)
dst := lz4GetBuffer(HeaderSize + chksum + len(src) + room)
wrk := dst[HeaderSize:]
var n int
switch {
Expand All @@ -283,18 +284,21 @@ func compressBlock(alg Kind, lvl int, tgt, src []byte) (int, error) {
n, err = lz4.CompressBlock(src, wrk[chksum:], ht)
}
if err != nil {
lz4PutBuffer(dst)
return 0, fmt.Errorf("rcompress: could not compress with LZ4: %w", err)
}

if n == 0 {
// not compressible.
lz4PutBuffer(dst)
return len(src), errNoCompression
}

wrk = wrk[:n+chksum]
binary.BigEndian.PutUint64(wrk[:chksum], xxHash64.Checksum(wrk[chksum:], 0))
dstsz = int32(n + chksum)
n = copy(buf.p, wrk)
lz4PutBuffer(dst)
buf.c += n

case ZSTD:
Expand Down Expand Up @@ -371,21 +375,21 @@ func Decompress(dst []byte, src io.Reader) error {
lr := &io.LimitedReader{R: src, N: srcsz}
switch kindOf(hdr) {
case ZLIB:
rc, err := zlib.NewReader(lr)
rc, err := zlibGetReader(lr)
if err != nil {
return fmt.Errorf("rcompress: could not create ZLIB reader: %w", err)
}
defer rc.Close()

_, err = io.ReadFull(rc, dst[beg:end])
zlibPutReader(rc)
if err != nil {
return fmt.Errorf("rcompress: could not decompress ZLIB buffer: %w", err)
}

case LZ4:
src := make([]byte, srcsz)
src := lz4GetBuffer(int(srcsz))
_, err = io.ReadFull(lr, src)
if err != nil {
lz4PutBuffer(src)
return fmt.Errorf("rcompress: could not read LZ4 block: %w", err)
}
const chksum = 8
Expand All @@ -396,7 +400,9 @@ func Decompress(dst []byte, src io.Reader) error {
case srcsz > tgtsz:
// no compression
copy(dst[beg:end], src[chksum:])
lz4PutBuffer(src)
default:
lz4PutBuffer(src)
return fmt.Errorf("rcompress: could not decompress LZ4 block: %w", err)
}
}
Expand All @@ -419,11 +425,12 @@ func Decompress(dst []byte, src io.Reader) error {
}

case ZSTD:
rc, err := zstd.NewReader(lr)
rc, err := zstdGetReader(lr)
if err != nil {
return fmt.Errorf("rcompress: could not create ZSTD reader: %w", err)
}
_, err = io.ReadFull(rc, dst[beg:end])
zstdPutReader(rc)
if err != nil {
return fmt.Errorf("rcompress: could not decompress ZSTD block: %w", err)
}
Expand Down Expand Up @@ -457,3 +464,100 @@ func (w *wbuff) Write(p []byte) (int, error) {
var (
_ io.Writer = (*wbuff)(nil)
)

var (
lz4BufferPool sync.Pool
zlibReaderPool sync.Pool
zstdReaderPool sync.Pool
zlibWriterPools sync.Map // map[lvl]*pool
zstdWriterPools sync.Map // map[lvl]*pool
)

func lz4GetBuffer(size int) []byte {
var b []byte
if bi := lz4BufferPool.Get(); bi != nil {
b = bi.([]byte)
}
if cap(b) >= size {
return b[:size]
}
return make([]byte, size)
}

func lz4PutBuffer(b []byte) {
lz4BufferPool.Put(b)
}

func zlibGetReader(r io.Reader) (io.ReadCloser, error) {
if ri := zlibReaderPool.Get(); ri != nil {
ri.(zlib.Resetter).Reset(r, nil)
return ri.(io.ReadCloser), nil
}
return zlib.NewReader(r)
}

func zlibPutReader(r io.ReadCloser) error {
// Note that zlib readers should be closed (but not reset)
err := r.Close()
zlibReaderPool.Put(r)
return err
}

func zstdGetReader(r io.Reader) (*zstd.Decoder, error) {
if ri := zstdReaderPool.Get(); ri != nil {
rd := ri.(*zstd.Decoder)
rd.Reset(r)
return rd, nil
}
return zstd.NewReader(r)
}

func zstdPutReader(r *zstd.Decoder) {
// Note that zstd decoders should be reset (but not closed)
r.Reset(nil)
zstdReaderPool.Put(r)
}

func zlibGetWriterLevel(w io.Writer, lvl int) (*zlib.Writer, error) {
if pi, ok := zlibWriterPools.Load(lvl); ok {
if wi := pi.(*sync.Pool).Get(); wi != nil {
z := wi.(*zlib.Writer)
z.Reset(w)
return z, nil
}
}
return zlib.NewWriterLevel(w, lvl)
}

func zlibPutWriterLevel(w *zlib.Writer, lvl int) error {
err := w.Close()
if pi, ok := zlibWriterPools.Load(lvl); ok {
pi.(*sync.Pool).Put(w)
} else {
pi, _ = zlibWriterPools.LoadOrStore(lvl, new(sync.Pool))
pi.(*sync.Pool).Put(w)
}
return err
}

func zstdGetWriterLevel(w io.Writer, lvl int) (*zstd.Encoder, error) {
if pi, ok := zstdWriterPools.Load(lvl); ok {
if wi := pi.(*sync.Pool).Get(); wi != nil {
z := wi.(*zstd.Encoder)
z.Reset(w)
return z, nil
}
}
return zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.EncoderLevel(lvl)))
}

func zstdPutWriterLevel(w *zstd.Encoder, lvl int) error {
err := w.Close()
if pi, ok := zstdWriterPools.Load(lvl); ok {
pi.(*sync.Pool).Put(w)
} else {
pi, _ = zstdWriterPools.LoadOrStore(lvl, new(sync.Pool))
pi.(*sync.Pool).Put(w)
}
return err
}