Skip to content

Commit b33ae45

Browse files
Improve decompression within ParseProtoReader. (#3682)
* Improve decompression within ParseProtoReader. This uses the last weaveworks, which include a fix for the content-type size to be set correctly which allows to allocate buffers correctly instead of let it grow naturally. I've also implemented an alternative decompression from a `bytes.Buffer` which avoids allocating and reading a new buffer when using httpgrpc since the underlaying reader is already a `bytes.Buffer`. I took the liberty to also improve the validation when using RawSnappy, I'm checking the length using the snappy header. This will avoid wasting CPU when the decompressed size is too big since we will know in advance. I've added also missing tests that covers the old and new code. Signed-off-by: Cyril Tovena <[email protected]> * missing go.sum update. Signed-off-by: Cyril Tovena <[email protected]> * Update changelog. Signed-off-by: Cyril Tovena <[email protected]> * Removes FramedSnappy encoding support. Signed-off-by: Cyril Tovena <[email protected]> * update go.mod Signed-off-by: Cyril Tovena <[email protected]> * Simplify implementation. From Review feedback from @pracucci Signed-off-by: Cyril Tovena <[email protected]> * Nits & limitReader in both cases when reading from a buffer. Signed-off-by: Cyril Tovena <[email protected]> * Moves comment. Signed-off-by: Cyril Tovena <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent fdc1bb2 commit b33ae45

File tree

19 files changed

+195
-1227
lines changed

19 files changed

+195
-1227
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## master / unreleased
44

5+
* [CHANGE] FramedSnappy encoding support has been removed from Push and Remote Read APIs. This means Prometheus 1.6 support has been removed and the oldest Prometheus version supported in the remote write is 1.7. #3682
56
* [CHANGE] Ruler: removed the flag `-ruler.evaluation-delay-duration-deprecated` which was deprecated in 1.4.0. Please use the `ruler_evaluation_delay_duration` per-tenant limit instead. #3693
67
* [CHANGE] Removed the flags `-<prefix>.grpc-use-gzip-compression` which were deprecated in 1.3.0: #3693
78
* `-query-scheduler.grpc-client-config.grpc-use-gzip-compression`: use `-query-scheduler.grpc-client-config.grpc-compression` instead
@@ -23,6 +24,7 @@
2324
* `-cluster.peer-timeout` in favor of `-alertmanager.cluster.peer-timeout`
2425
* [CHANGE] Blocks storage: the default value of `-blocks-storage.bucket-store.sync-interval` has been changed from `5m` to `15m`. #3724
2526
* [FEATURE] Querier: Queries can be federated across multiple tenants. The tenants IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` request header. This is an experimental feature, which can be enabled by setting `-tenant-federation.enabled=true` on all Cortex services. #3250
27+
* [ENHANCEMENT] Allow specifying JAEGER_ENDPOINT instead of sampling server or local agent port. #3682
2628
* [FEATURE] Alertmanager: introduced the experimental option `-alertmanager.sharding-enabled` to shard tenants across multiple Alertmanager instances. This feature is still under heavy development and its usage is discouraged. The following new metrics are exported by the Alertmanager: #3664
2729
* `cortex_alertmanager_ring_check_errors_total`
2830
* `cortex_alertmanager_sync_configs_total`

go.mod

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ require (
1313
github.com/alicebob/miniredis v2.5.0+incompatible
1414
github.com/armon/go-metrics v0.3.3
1515
github.com/aws/aws-sdk-go v1.35.31
16-
github.com/blang/semver v3.5.0+incompatible
1716
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
1817
github.com/cespare/xxhash v1.1.0
1918
github.com/dustin/go-humanize v1.0.0
@@ -56,7 +55,7 @@ require (
5655
github.com/stretchr/testify v1.6.1
5756
github.com/thanos-io/thanos v0.13.1-0.20210108102609-f85e4003ba51
5857
github.com/uber/jaeger-client-go v2.25.0+incompatible
59-
github.com/weaveworks/common v0.0.0-20201119133501-0619918236ec
58+
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120
6059
go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50
6160
go.etcd.io/etcd v0.5.0-alpha.5.0.20200520232829-54ba9589114f
6261
go.uber.org/atomic v1.7.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,8 +1179,8 @@ github.com/weaveworks/common v0.0.0-20200206153930-760e36ae819a/go.mod h1:6enWAq
11791179
github.com/weaveworks/common v0.0.0-20200625145055-4b1847531bc9/go.mod h1:c98fKi5B9u8OsKGiWHLRKus6ToQ1Tubeow44ECO1uxY=
11801180
github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099 h1:MS5M2antM8wzMUqVxIfAi+yb6yjXvDINRFvLnmNXeIw=
11811181
github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099/go.mod h1:hz10LOsAdzC3K/iXaKoFxOKTDRgxJl+BTGX1GY+TzO4=
1182-
github.com/weaveworks/common v0.0.0-20201119133501-0619918236ec h1:5JmevdpzK10Z2ua0VDToj7Kg2+/t0FzdYBjsurYRE8k=
1183-
github.com/weaveworks/common v0.0.0-20201119133501-0619918236ec/go.mod h1:ykzWac1LtVfOxdCK+jD754at1Ws9dKCwFeUzkFBffPs=
1182+
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120 h1:zQtcwREXYNvW116ipgc0bRDg1avD2b6QP0RGPLlPWkc=
1183+
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120/go.mod h1:ykzWac1LtVfOxdCK+jD754at1Ws9dKCwFeUzkFBffPs=
11841184
github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZGz7M=
11851185
github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMUyS1+Ogs/KA=
11861186
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=

pkg/querier/remote_read.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@ const maxRemoteReadQuerySize = 1024 * 1024
1616
// RemoteReadHandler handles Prometheus remote read requests.
1717
func RemoteReadHandler(q storage.Queryable) http.Handler {
1818
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
19-
compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Read-Version"))
20-
2119
ctx := r.Context()
2220
var req client.ReadRequest
2321
logger := util.WithContext(r.Context(), util.Logger)
24-
if err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, compressionType); err != nil {
25-
level.Error(logger).Log("err", err.Error())
22+
if err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil {
23+
level.Error(logger).Log("msg", "failed to parse proto", "err", err.Error())
2624
http.Error(w, err.Error(), http.StatusBadRequest)
2725
return
2826
}
@@ -68,7 +66,7 @@ func RemoteReadHandler(q storage.Queryable) http.Handler {
6866
return
6967
}
7068
w.Header().Add("Content-Type", "application/x-protobuf")
71-
if err := util.SerializeProtoResponse(w, &resp, compressionType); err != nil {
69+
if err := util.SerializeProtoResponse(w, &resp, util.RawSnappy); err != nil {
7270
level.Error(logger).Log("msg", "error sending remote read response", "err", err)
7371
}
7472
})

pkg/util/http.go

Lines changed: 87 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ import (
1010
"net/http"
1111
"strings"
1212

13-
"github.com/blang/semver"
1413
"github.com/gogo/protobuf/proto"
1514
"github.com/golang/snappy"
1615
"github.com/opentracing/opentracing-go"
1716
otlog "github.com/opentracing/opentracing-go/log"
1817
"gopkg.in/yaml.v2"
1918
)
2019

20+
const messageSizeLargerErrFmt = "received message larger than max (%d vs %d)"
21+
2122
// WriteJSONResponse writes some JSON as a HTTP response.
2223
func WriteJSONResponse(w http.ResponseWriter, v interface{}) {
2324
w.Header().Set("Content-Type", "application/json")
@@ -81,71 +82,22 @@ type CompressionType int
8182
// Values for CompressionType
8283
const (
8384
NoCompression CompressionType = iota
84-
FramedSnappy
8585
RawSnappy
8686
)
8787

88-
var rawSnappyFromVersion = semver.MustParse("0.1.0")
89-
90-
// CompressionTypeFor a given version of the Prometheus remote storage protocol.
91-
// See https://github.com/prometheus/prometheus/issues/2692.
92-
func CompressionTypeFor(version string) CompressionType {
93-
ver, err := semver.Make(version)
94-
if err != nil {
95-
return FramedSnappy
96-
}
97-
98-
if ver.GTE(rawSnappyFromVersion) {
99-
return RawSnappy
100-
}
101-
return FramedSnappy
102-
}
103-
10488
// ParseProtoReader parses a compressed proto from an io.Reader.
10589
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) error {
106-
var body []byte
107-
var err error
10890
sp := opentracing.SpanFromContext(ctx)
10991
if sp != nil {
11092
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[start reading]"))
11193
}
112-
var buf bytes.Buffer
113-
if expectedSize > 0 {
114-
if expectedSize > maxSize {
115-
return fmt.Errorf("message expected size larger than max (%d vs %d)", expectedSize, maxSize)
116-
}
117-
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
118-
}
119-
switch compression {
120-
case NoCompression:
121-
// Read from LimitReader with limit max+1. So if the underlying
122-
// reader is over limit, the result will be bigger than max.
123-
_, err = buf.ReadFrom(io.LimitReader(reader, int64(maxSize)+1))
124-
body = buf.Bytes()
125-
case FramedSnappy:
126-
_, err = buf.ReadFrom(io.LimitReader(snappy.NewReader(reader), int64(maxSize)+1))
127-
body = buf.Bytes()
128-
case RawSnappy:
129-
_, err = buf.ReadFrom(reader)
130-
body = buf.Bytes()
131-
if sp != nil {
132-
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[decompress]"),
133-
otlog.Int("size", len(body)))
134-
}
135-
if err == nil && len(body) <= maxSize {
136-
body, err = snappy.Decode(nil, body)
137-
}
138-
}
94+
body, err := decompressRequest(reader, expectedSize, maxSize, compression, sp)
13995
if err != nil {
14096
return err
14197
}
142-
if len(body) > maxSize {
143-
return fmt.Errorf("received message larger than max (%d vs %d)", len(body), maxSize)
144-
}
14598

14699
if sp != nil {
147-
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[unmarshal]"),
148-
otlog.Int("size", len(body)))
100+
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[unmarshal]"), otlog.Int("size", len(body)))
149101
}
150102

151103
// We re-implement proto.Unmarshal here as it calls XXX_Unmarshal first,
@@ -163,6 +115,89 @@ func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSi
163115
return nil
164116
}
165117

118+
func decompressRequest(reader io.Reader, expectedSize, maxSize int, compression CompressionType, sp opentracing.Span) (body []byte, err error) {
119+
defer func() {
120+
if err != nil && len(body) > maxSize {
121+
err = fmt.Errorf(messageSizeLargerErrFmt, len(body), maxSize)
122+
}
123+
}()
124+
if expectedSize > maxSize {
125+
return nil, fmt.Errorf(messageSizeLargerErrFmt, expectedSize, maxSize)
126+
}
127+
buffer, ok := tryBufferFromReader(reader)
128+
if ok {
129+
body, err = decompressFromBuffer(buffer, maxSize, compression, sp)
130+
return
131+
}
132+
body, err = decompressFromReader(reader, expectedSize, maxSize, compression, sp)
133+
return
134+
}
135+
136+
func decompressFromReader(reader io.Reader, expectedSize, maxSize int, compression CompressionType, sp opentracing.Span) ([]byte, error) {
137+
var (
138+
buf bytes.Buffer
139+
body []byte
140+
err error
141+
)
142+
if expectedSize > 0 {
143+
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
144+
}
145+
// Read from LimitReader with limit max+1. So if the underlying
146+
// reader is over limit, the result will be bigger than max.
147+
reader = io.LimitReader(reader, int64(maxSize)+1)
148+
switch compression {
149+
case NoCompression:
150+
_, err = buf.ReadFrom(reader)
151+
body = buf.Bytes()
152+
case RawSnappy:
153+
_, err = buf.ReadFrom(reader)
154+
if err != nil {
155+
return nil, err
156+
}
157+
body, err = decompressFromBuffer(&buf, maxSize, RawSnappy, sp)
158+
}
159+
return body, err
160+
}
161+
162+
func decompressFromBuffer(buffer *bytes.Buffer, maxSize int, compression CompressionType, sp opentracing.Span) ([]byte, error) {
163+
if len(buffer.Bytes()) > maxSize {
164+
return nil, fmt.Errorf(messageSizeLargerErrFmt, len(buffer.Bytes()), maxSize)
165+
}
166+
switch compression {
167+
case NoCompression:
168+
return buffer.Bytes(), nil
169+
case RawSnappy:
170+
if sp != nil {
171+
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[decompress]"),
172+
otlog.Int("size", len(buffer.Bytes())))
173+
}
174+
size, err := snappy.DecodedLen(buffer.Bytes())
175+
if err != nil {
176+
return nil, err
177+
}
178+
if size > maxSize {
179+
return nil, fmt.Errorf(messageSizeLargerErrFmt, size, maxSize)
180+
}
181+
body, err := snappy.Decode(nil, buffer.Bytes())
182+
if err != nil {
183+
return nil, err
184+
}
185+
return body, nil
186+
}
187+
return nil, nil
188+
}
189+
190+
// tryBufferFromReader attempts to cast the reader to a `*bytes.Buffer` this is possible when using httpgrpc.
191+
// If it fails it will return nil and false.
192+
func tryBufferFromReader(reader io.Reader) (*bytes.Buffer, bool) {
193+
if bufReader, ok := reader.(interface {
194+
BytesBuffer() *bytes.Buffer
195+
}); ok && bufReader != nil {
196+
return bufReader.BytesBuffer(), true
197+
}
198+
return nil, false
199+
}
200+
166201
// SerializeProtoResponse serializes a protobuf response into an HTTP response.
167202
func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compression CompressionType) error {
168203
data, err := proto.Marshal(resp)
@@ -173,14 +208,6 @@ func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compressi
173208

174209
switch compression {
175210
case NoCompression:
176-
case FramedSnappy:
177-
buf := bytes.Buffer{}
178-
writer := snappy.NewBufferedWriter(&buf)
179-
if _, err := writer.Write(data); err != nil {
180-
return err
181-
}
182-
writer.Close()
183-
data = buf.Bytes()
184211
case RawSnappy:
185212
data = snappy.Encode(nil, data)
186213
}

pkg/util/http_test.go

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1-
package util
1+
package util_test
22

33
import (
4+
"bytes"
5+
"context"
46
"html/template"
57
"net/http/httptest"
68
"testing"
79

810
"github.com/stretchr/testify/assert"
11+
12+
"github.com/cortexproject/cortex/pkg/ingester/client"
13+
"github.com/cortexproject/cortex/pkg/util"
914
)
1015

1116
func TestRenderHTTPResponse(t *testing.T) {
@@ -58,7 +63,7 @@ func TestRenderHTTPResponse(t *testing.T) {
5863
request.Header.Add(k, v)
5964
}
6065

61-
RenderHTTPResponse(writer, tt.value, tmpl, request)
66+
util.RenderHTTPResponse(writer, tt.value, tmpl, request)
6267

6368
assert.Equal(t, tt.expectedContentType, writer.Header().Get("Content-Type"))
6469
assert.Equal(t, 200, writer.Code)
@@ -70,9 +75,85 @@ func TestRenderHTTPResponse(t *testing.T) {
7075
func TestWriteTextResponse(t *testing.T) {
7176
w := httptest.NewRecorder()
7277

73-
WriteTextResponse(w, "hello world")
78+
util.WriteTextResponse(w, "hello world")
7479

7580
assert.Equal(t, 200, w.Code)
7681
assert.Equal(t, "hello world", w.Body.String())
7782
assert.Equal(t, "text/plain", w.Header().Get("Content-Type"))
7883
}
84+
85+
func TestParseProtoReader(t *testing.T) {
86+
// 47 bytes compressed and 53 uncompressed
87+
req := &client.PreallocWriteRequest{
88+
WriteRequest: client.WriteRequest{
89+
Timeseries: []client.PreallocTimeseries{
90+
{
91+
TimeSeries: &client.TimeSeries{
92+
Labels: []client.LabelAdapter{
93+
{Name: "foo", Value: "bar"},
94+
},
95+
Samples: []client.Sample{
96+
{Value: 10, TimestampMs: 1},
97+
{Value: 20, TimestampMs: 2},
98+
{Value: 30, TimestampMs: 3},
99+
},
100+
},
101+
},
102+
},
103+
},
104+
}
105+
106+
for _, tt := range []struct {
107+
name string
108+
compression util.CompressionType
109+
maxSize int
110+
expectErr bool
111+
useBytesBuffer bool
112+
}{
113+
{"rawSnappy", util.RawSnappy, 53, false, false},
114+
{"noCompression", util.NoCompression, 53, false, false},
115+
{"too big rawSnappy", util.RawSnappy, 10, true, false},
116+
{"too big decoded rawSnappy", util.RawSnappy, 50, true, false},
117+
{"too big noCompression", util.NoCompression, 10, true, false},
118+
119+
{"bytesbuffer rawSnappy", util.RawSnappy, 53, false, true},
120+
{"bytesbuffer noCompression", util.NoCompression, 53, false, true},
121+
{"bytesbuffer too big rawSnappy", util.RawSnappy, 10, true, true},
122+
{"bytesbuffer too big decoded rawSnappy", util.RawSnappy, 50, true, true},
123+
{"bytesbuffer too big noCompression", util.NoCompression, 10, true, true},
124+
} {
125+
t.Run(tt.name, func(t *testing.T) {
126+
w := httptest.NewRecorder()
127+
assert.Nil(t, util.SerializeProtoResponse(w, req, tt.compression))
128+
var fromWire client.PreallocWriteRequest
129+
130+
reader := w.Result().Body
131+
if tt.useBytesBuffer {
132+
buf := bytes.Buffer{}
133+
_, err := buf.ReadFrom(reader)
134+
assert.Nil(t, err)
135+
reader = bytesBuffered{Buffer: &buf}
136+
}
137+
138+
err := util.ParseProtoReader(context.Background(), reader, 0, tt.maxSize, &fromWire, tt.compression)
139+
if tt.expectErr {
140+
assert.NotNil(t, err)
141+
return
142+
}
143+
assert.Nil(t, err)
144+
assert.Equal(t, req, &fromWire)
145+
})
146+
}
147+
}
148+
149+
type bytesBuffered struct {
150+
*bytes.Buffer
151+
}
152+
153+
func (b bytesBuffered) Close() error {
154+
return nil
155+
}
156+
157+
func (b bytesBuffered) BytesBuffer() *bytes.Buffer {
158+
return b.Buffer
159+
}

pkg/util/push/push.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,8 @@ func Handler(cfg distributor.Config, sourceIPs *middleware.SourceIPExtractor, pu
2525
logger = util.WithSourceIPs(source, logger)
2626
}
2727
}
28-
compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Write-Version"))
2928
var req client.PreallocWriteRequest
30-
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType)
29+
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, util.RawSnappy)
3130
if err != nil {
3231
level.Error(logger).Log("err", err.Error())
3332
http.Error(w, err.Error(), http.StatusBadRequest)

0 commit comments

Comments
 (0)