Skip to content

Commit 07e07cb

Browse files
authored
feat(compress): add snappy (openGemini#808)
Signed-off-by: xkx <triumph_9431@qq.com>
1 parent 9618070 commit 07e07cb

File tree

8 files changed

+323
-144
lines changed

8 files changed

+323
-144
lines changed

lib/compress/compression_pool.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright 2024 openGemini Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package compress
16+
17+
import (
18+
"compress/gzip"
19+
"io"
20+
"sync"
21+
22+
"github.com/golang/snappy"
23+
"github.com/klauspost/compress/zstd"
24+
"github.com/openGemini/openGemini/lib/cpu"
25+
"github.com/openGemini/openGemini/lib/pool"
26+
)
27+
28+
// #region Gzip Writer Pool
29+
var gzipWriterPool *pool.FixedPoolV2[*gzip.Writer] = pool.NewFixedPoolV2[*gzip.Writer](func() *gzip.Writer {
30+
return gzip.NewWriter(nil)
31+
}, cpu.GetCpuNum()*2)
32+
33+
func GetGzipWriter(w io.Writer) *gzip.Writer {
34+
gz := gzipWriterPool.Get()
35+
gz.Reset(w)
36+
return gz
37+
}
38+
39+
func PutGzipWriter(gz *gzip.Writer) {
40+
gz.Close()
41+
gzipWriterPool.Put(gz)
42+
}
43+
44+
// #endregion
45+
46+
// #region Gzip Reader Pool
47+
var gzipReaderPool sync.Pool
48+
49+
func GetGzipReader(r io.Reader) (*gzip.Reader, error) {
50+
v := gzipReaderPool.Get()
51+
if v == nil {
52+
return gzip.NewReader(r)
53+
}
54+
zr := v.(*gzip.Reader)
55+
if err := zr.Reset(r); err != nil {
56+
return nil, err
57+
}
58+
return zr, nil
59+
}
60+
61+
// PutGzipReader returns back gzip reader obtained via GetGzipReader.
62+
func PutGzipReader(zr *gzip.Reader) {
63+
_ = zr.Close()
64+
gzipReaderPool.Put(zr)
65+
}
66+
67+
// #endregion
68+
69+
// #region Zstd Writer Pool
70+
var zstdWriterPool *pool.FixedPoolV2[*zstd.Encoder] = pool.NewFixedPoolV2[*zstd.Encoder](func() *zstd.Encoder {
71+
encoder, _ := zstd.NewWriter(nil)
72+
return encoder
73+
}, cpu.GetCpuNum()*2)
74+
75+
func GetZstdWriter(w io.Writer) *zstd.Encoder {
76+
zstdEncoder := zstdWriterPool.Get()
77+
zstdEncoder.Reset(w)
78+
return zstdEncoder
79+
}
80+
81+
func PutZstdWriter(zstdEncoder *zstd.Encoder) {
82+
zstdEncoder.Close()
83+
zstdWriterPool.Put(zstdEncoder)
84+
}
85+
86+
// #endregion
87+
88+
// #region Zstd Reader Pool
89+
var snappyWriterPool = pool.NewFixedPoolV2[*snappy.Writer](func() *snappy.Writer {
90+
return snappy.NewBufferedWriter(nil)
91+
}, cpu.GetCpuNum()*2)
92+
93+
func GetSnappyWriter(w io.Writer) *snappy.Writer {
94+
snappyWriter := snappyWriterPool.Get()
95+
snappyWriter.Reset(w)
96+
return snappyWriter
97+
}
98+
99+
func PutSnappyWriter(snappyWriter *snappy.Writer) {
100+
snappyWriter.Close()
101+
snappyWriterPool.Put(snappyWriter)
102+
}
103+
104+
// #endregion
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2024 openGemini Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package compress
16+
17+
import (
18+
"bytes"
19+
"compress/gzip"
20+
"io"
21+
"testing"
22+
23+
"github.com/golang/snappy"
24+
"github.com/klauspost/compress/zstd"
25+
"github.com/stretchr/testify/assert"
26+
)
27+
28+
func TestGzipWriterPool(t *testing.T) {
29+
var buf bytes.Buffer
30+
writer := GetGzipWriter(&buf)
31+
_, err := writer.Write([]byte("test data"))
32+
assert.NoError(t, err)
33+
PutGzipWriter(writer)
34+
35+
reader, err := gzip.NewReader(&buf)
36+
assert.NoError(t, err)
37+
defer reader.Close()
38+
39+
result := new(bytes.Buffer)
40+
_, err = io.Copy(result, reader)
41+
assert.NoError(t, err)
42+
assert.Equal(t, "test data", result.String())
43+
}
44+
45+
func TestZstdWriterPool(t *testing.T) {
46+
var buf bytes.Buffer
47+
writer := GetZstdWriter(&buf)
48+
_, err := writer.Write([]byte("test data"))
49+
assert.NoError(t, err)
50+
PutZstdWriter(writer)
51+
52+
reader, err := zstd.NewReader(&buf)
53+
assert.NoError(t, err)
54+
defer reader.Close()
55+
56+
result := new(bytes.Buffer)
57+
_, err = io.Copy(result, reader)
58+
assert.NoError(t, err)
59+
assert.Equal(t, "test data", result.String())
60+
}
61+
62+
func TestSnappyWriterPool(t *testing.T) {
63+
var buf bytes.Buffer
64+
writer := GetSnappyWriter(&buf)
65+
_, err := writer.Write([]byte("test data"))
66+
assert.NoError(t, err)
67+
PutSnappyWriter(writer)
68+
69+
reader := snappy.NewReader(&buf)
70+
result := new(bytes.Buffer)
71+
_, err = io.Copy(result, reader)
72+
assert.NoError(t, err)
73+
assert.Equal(t, "test data", result.String())
74+
}

lib/pool/pool.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,44 @@ func (p *FixedCachePool) Put(v interface{}) {
191191
}
192192
}
193193

194+
// FixedPoolV2 is a fixed size pool with object generic type information
195+
type FixedPoolV2[T any] struct {
196+
pool chan T
197+
new func() T
198+
}
199+
200+
func NewFixedPoolV2[T any](new func() T, size int) *FixedPoolV2[T] {
201+
return &FixedPoolV2[T]{
202+
pool: make(chan T, size),
203+
new: new,
204+
}
205+
}
206+
207+
func (p *FixedPoolV2[T]) Get() T {
208+
select {
209+
case item := <-p.pool:
210+
return item
211+
default:
212+
return p.new()
213+
}
214+
}
215+
216+
func (p *FixedPoolV2[T]) Put(item T) {
217+
select {
218+
case p.pool <- item:
219+
default:
220+
}
221+
}
222+
223+
func (p *FixedPoolV2[T]) Reset(size int, new func() T) {
224+
p.pool = make(chan T, size)
225+
p.new = new
226+
}
227+
228+
func (p *FixedPoolV2[T]) Len() int {
229+
return len(p.pool)
230+
}
231+
194232
var intSlicePool sync.Pool
195233

196234
func GetIntSlice(size int) []int {

lib/pool/pool_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,57 @@ func TestChunkMetaBuffer(t *testing.T) {
7373
buf = pool.GetChunkMetaBuffer()
7474
require.Equal(t, 10, len(buf.B))
7575
}
76+
77+
func TestNewFixedPoolV2(t *testing.T) {
78+
newFunc := func() int {
79+
return 42
80+
}
81+
testPool := pool.NewFixedPoolV2(newFunc, 2)
82+
83+
require.NotNil(t, testPool)
84+
}
85+
86+
func TestFixedPoolV2_Get(t *testing.T) {
87+
newFunc := func() int {
88+
return 42
89+
}
90+
pool := pool.NewFixedPoolV2(newFunc, 2)
91+
92+
// Test getting a new item when pool is empty
93+
item := pool.Get()
94+
require.Equal(t, 42, item)
95+
96+
// Test getting an item from the pool
97+
pool.Put(100)
98+
item = pool.Get()
99+
require.Equal(t, 100, item)
100+
}
101+
102+
func TestFixedPoolV2_Put(t *testing.T) {
103+
newFunc := func() int {
104+
return 42
105+
}
106+
pool := pool.NewFixedPoolV2(newFunc, 2)
107+
108+
// Test putting an item into the pool
109+
pool.Put(100)
110+
require.Equal(t, 1, pool.Len())
111+
112+
// Test putting an item into a full pool
113+
pool.Put(200)
114+
pool.Put(300)
115+
require.Equal(t, 2, pool.Len())
116+
}
117+
118+
func TestFixedPoolV2_Reset(t *testing.T) {
119+
newFunc := func() int {
120+
return 42
121+
}
122+
pool := pool.NewFixedPoolV2(newFunc, 2)
123+
124+
// Test resetting the pool
125+
pool.Put(100)
126+
pool.Put(200)
127+
pool.Reset(2, newFunc)
128+
require.Equal(t, 0, pool.Len())
129+
}

lib/util/lifted/influx/httpd/handler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
jsoniter "github.com/json-iterator/go"
2828
"github.com/openGemini/openGemini/app"
2929
"github.com/openGemini/openGemini/engine/hybridqp"
30+
compression "github.com/openGemini/openGemini/lib/compress"
3031
config2 "github.com/openGemini/openGemini/lib/config"
3132
"github.com/openGemini/openGemini/lib/cpu"
3233
"github.com/openGemini/openGemini/lib/errno"
@@ -1318,15 +1319,15 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta2.
13181319

13191320
// Handle gzip decoding of the body
13201321
if r.Header.Get("Content-Encoding") == "gzip" {
1321-
b, err := GetGzipReader(r.Body)
1322+
b, err := compression.GetGzipReader(r.Body)
13221323
if err != nil {
13231324
h.httpError(w, err.Error(), http.StatusBadRequest)
13241325
error := errno.NewError(errno.HttpBadRequest)
13251326
h.Logger.Error("write error:Handle gzip decoding of the body err", zap.Error(error), zap.String("db", database))
13261327
atomic.AddInt64(&statistics.HandlerStat.Write400ErrRequests, 1)
13271328
return
13281329
}
1329-
defer PutGzipReader(b)
1330+
defer compression.PutGzipReader(b)
13301331
body = b
13311332
}
13321333

0 commit comments

Comments
 (0)