Skip to content

Commit f23d591

Browse files
authored
Implementing Snappy Block Encoding (#5215)
* implementing tests for zstd Signed-off-by: Alan Protasio <[email protected]> * implementing snappy-block encoding Signed-off-by: Alan Protasio <[email protected]> * lint Signed-off-by: Alan Protasio <[email protected]> * go imports Signed-off-by: Alan Protasio <[email protected]> * doc Signed-off-by: Alan Protasio <[email protected]> * reuse slice with max cap Signed-off-by: Alan Protasio <[email protected]> * reuse dst on decode as well Signed-off-by: Alan Protasio <[email protected]> * changelog Signed-off-by: Alan Protasio <[email protected]> --------- Signed-off-by: Alan Protasio <[email protected]>
1 parent 88eb7a2 commit f23d591

File tree

8 files changed

+335
-92
lines changed

8 files changed

+335
-92
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
* [FEATURE] Querier/Store-Gateway: Added `-blocks-storage.bucket-store.ignore-blocks-within` allowing to filter out the recently created blocks from being synced by queriers and store-gateways. #5166
2424
* [FEATURE] AlertManager/Ruler: Added support for `keep_firing_for` on alerting rulers.
2525
* [FEATURE] Alertmanager: Add support for time_intervals. #5102
26+
* [FEATURE] Added `snappy-block` as an option for grpc compression #5215
2627
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
2728
* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005
2829
* [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ query_scheduler:
198198
[max_send_msg_size: <int> | default = 16777216]
199199

200200
# Use compression when sending messages. Supported values are: 'gzip',
201-
# 'snappy', 'zstd' and '' (disable compression)
201+
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
202202
# CLI flag: -query-scheduler.grpc-client-config.grpc-compression
203203
[grpc_compression: <string> | default = ""]
204204

@@ -2381,7 +2381,7 @@ grpc_client_config:
23812381
[max_send_msg_size: <int> | default = 16777216]
23822382
23832383
# Use compression when sending messages. Supported values are: 'gzip',
2384-
# 'snappy', 'zstd' and '' (disable compression)
2384+
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
23852385
# CLI flag: -querier.frontend-client.grpc-compression
23862386
[grpc_compression: <string> | default = ""]
23872387
@@ -2642,7 +2642,7 @@ grpc_client_config:
26422642
[max_send_msg_size: <int> | default = 16777216]
26432643
26442644
# Use compression when sending messages. Supported values are: 'gzip',
2645-
# 'snappy', 'zstd' and '' (disable compression)
2645+
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
26462646
# CLI flag: -ingester.client.grpc-compression
26472647
[grpc_compression: <string> | default = ""]
26482648
@@ -3435,7 +3435,7 @@ grpc_client_config:
34353435
[max_send_msg_size: <int> | default = 16777216]
34363436
34373437
# Use compression when sending messages. Supported values are: 'gzip',
3438-
# 'snappy', 'zstd' and '' (disable compression)
3438+
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
34393439
# CLI flag: -frontend.grpc-client-config.grpc-compression
34403440
[grpc_compression: <string> | default = ""]
34413441
@@ -3651,7 +3651,7 @@ ruler_client:
36513651
[max_send_msg_size: <int> | default = 16777216]
36523652
36533653
# Use compression when sending messages. Supported values are: 'gzip',
3654-
# 'snappy', 'zstd' and '' (disable compression)
3654+
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
36553655
# CLI flag: -ruler.client.grpc-compression
36563656
[grpc_compression: <string> | default = ""]
36573657

pkg/util/grpcclient/grpcclient.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/cortexproject/cortex/pkg/util/backoff"
1515
"github.com/cortexproject/cortex/pkg/util/grpcencoding/snappy"
16+
"github.com/cortexproject/cortex/pkg/util/grpcencoding/snappyblock"
1617
"github.com/cortexproject/cortex/pkg/util/grpcencoding/zstd"
1718
"github.com/cortexproject/cortex/pkg/util/tls"
1819
)
@@ -41,7 +42,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
4142
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
4243
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
4344
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).")
44-
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 'zstd' and '' (disable compression)")
45+
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)")
4546
f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.")
4647
f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.")
4748
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.")
@@ -54,7 +55,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
5455

5556
func (cfg *Config) Validate(log log.Logger) error {
5657
switch cfg.GRPCCompression {
57-
case gzip.Name, snappy.Name, zstd.Name, "":
58+
case gzip.Name, snappy.Name, zstd.Name, snappyblock.Name, "":
5859
// valid
5960
default:
6061
return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression)
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package grpcencoding
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"io/ioutil"
7+
"strings"
8+
"testing"
9+
10+
"github.com/cortexproject/cortex/pkg/util/grpcencoding/snappy"
11+
"github.com/cortexproject/cortex/pkg/util/grpcencoding/snappyblock"
12+
"github.com/cortexproject/cortex/pkg/util/grpcencoding/zstd"
13+
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
"google.golang.org/grpc/encoding"
17+
)
18+
19+
func TestCompressors(t *testing.T) {
20+
testCases := []struct {
21+
name string
22+
}{
23+
{
24+
name: snappy.Name,
25+
},
26+
{
27+
name: snappyblock.Name,
28+
},
29+
{
30+
name: zstd.Name,
31+
},
32+
}
33+
34+
for _, tc := range testCases {
35+
t.Run(tc.name, func(t *testing.T) {
36+
testCompress(tc.name, t)
37+
})
38+
}
39+
}
40+
41+
func testCompress(name string, t *testing.T) {
42+
c := encoding.GetCompressor(name)
43+
assert.Equal(t, name, c.Name())
44+
45+
tests := []struct {
46+
test string
47+
input string
48+
}{
49+
{"empty", ""},
50+
{"short", "hello world"},
51+
{"long", strings.Repeat("123456789", 2024)},
52+
}
53+
for _, test := range tests {
54+
t.Run(test.test, func(t *testing.T) {
55+
buf := &bytes.Buffer{}
56+
// Compress
57+
w, err := c.Compress(buf)
58+
require.NoError(t, err)
59+
n, err := w.Write([]byte(test.input))
60+
require.NoError(t, err)
61+
assert.Len(t, test.input, n)
62+
err = w.Close()
63+
require.NoError(t, err)
64+
compressedBytes := buf.Bytes()
65+
buf = bytes.NewBuffer(compressedBytes)
66+
67+
// Decompress
68+
r, err := c.Decompress(buf)
69+
require.NoError(t, err)
70+
out, err := io.ReadAll(r)
71+
require.NoError(t, err)
72+
assert.Equal(t, test.input, string(out))
73+
74+
if sizer, ok := c.(interface {
75+
DecompressedSize(compressedBytes []byte) int
76+
}); ok {
77+
buf = bytes.NewBuffer(compressedBytes)
78+
r, err := c.Decompress(buf)
79+
require.NoError(t, err)
80+
out, err := io.ReadAll(r)
81+
require.NoError(t, err)
82+
assert.Equal(t, len(out), sizer.DecompressedSize(compressedBytes))
83+
}
84+
})
85+
}
86+
}
87+
88+
func BenchmarkCompress(b *testing.B) {
89+
data := []byte(strings.Repeat("123456789", 1024))
90+
91+
testCases := []struct {
92+
name string
93+
}{
94+
{
95+
name: snappy.Name,
96+
},
97+
{
98+
name: snappyblock.Name,
99+
},
100+
{
101+
name: zstd.Name,
102+
},
103+
}
104+
105+
for _, tc := range testCases {
106+
b.Run(tc.name, func(b *testing.B) {
107+
c := encoding.GetCompressor("snappy")
108+
b.ResetTimer()
109+
for i := 0; i < b.N; i++ {
110+
w, _ := c.Compress(io.Discard)
111+
_, _ = w.Write(data)
112+
_ = w.Close()
113+
}
114+
b.ReportAllocs()
115+
})
116+
}
117+
}
118+
119+
func BenchmarkDecompress(b *testing.B) {
120+
data := []byte(strings.Repeat("123456789", 1024))
121+
122+
testCases := []struct {
123+
name string
124+
}{
125+
{
126+
name: snappy.Name,
127+
},
128+
{
129+
name: snappyblock.Name,
130+
},
131+
{
132+
name: zstd.Name,
133+
},
134+
}
135+
136+
for _, tc := range testCases {
137+
b.Run(tc.name, func(b *testing.B) {
138+
c := encoding.GetCompressor(tc.name)
139+
var buf bytes.Buffer
140+
w, _ := c.Compress(&buf)
141+
_, _ = w.Write(data)
142+
w.Close()
143+
b.ResetTimer()
144+
for i := 0; i < b.N; i++ {
145+
_, _, err := decompress(c, buf.Bytes(), 10000)
146+
require.NoError(b, err)
147+
}
148+
b.ReportAllocs()
149+
})
150+
}
151+
}
152+
153+
// This function was copied from: https://github.com/grpc/grpc-go/blob/70c52915099a3b30848d0cb22e2f8951dd5aed7f/rpc_util.go#L765
154+
func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize int) ([]byte, int, error) {
155+
dcReader, err := compressor.Decompress(bytes.NewReader(d))
156+
if err != nil {
157+
return nil, 0, err
158+
}
159+
if sizer, ok := compressor.(interface {
160+
DecompressedSize(compressedBytes []byte) int
161+
}); ok {
162+
if size := sizer.DecompressedSize(d); size >= 0 {
163+
if size > maxReceiveMessageSize {
164+
return nil, size, nil
165+
}
166+
// size is used as an estimate to size the buffer, but we
167+
// will read more data if available.
168+
// +MinRead so ReadFrom will not reallocate if size is correct.
169+
buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
170+
bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
171+
return buf.Bytes(), int(bytesRead), err
172+
}
173+
}
174+
// Read from LimitReader with limit max+1. So if the underlying
175+
// reader is over limit, the result will be bigger than max.
176+
d, err = ioutil.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
177+
return d, len(d), err
178+
}

pkg/util/grpcencoding/snappy/snappy.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,6 @@ func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
5151
return reader{dr, &c.readersPool}, nil
5252
}
5353

54-
// If a Compressor implements DecompressedSize(compressedBytes []byte) int,
55-
// gRPC will call it to determine the size of the buffer allocated for the
56-
// result of decompression.
57-
// Return -1 to indicate unknown size.
58-
//
59-
// This is an EXPERIMENTAL feature of grpc-go.
60-
func (c *compressor) DecompressedSize(compressedBytes []byte) int {
61-
decompressedSize, err := snappy.DecodedLen(compressedBytes)
62-
if err != nil {
63-
return -1
64-
}
65-
return decompressedSize
66-
}
67-
6854
type writeCloser struct {
6955
writer *snappy.Writer
7056
pool *sync.Pool

pkg/util/grpcencoding/snappy/snappy_test.go

Lines changed: 0 additions & 70 deletions
This file was deleted.

0 commit comments

Comments
 (0)