Skip to content

Commit 66948fd

Browse files
authored
Cherry-pick fixes to release 1.15 for new RC (#5259)
* fix remote read error in query frontend (#5257) * fix remote read error in query frontend Signed-off-by: Ben Ye <[email protected]> * fix integration test Signed-off-by: Ben Ye <[email protected]> * add extra one query Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]> * bump RC version Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent af49d70 commit 66948fd

File tree

5 files changed

+108
-5
lines changed

5 files changed

+108
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* [BUGFIX] Fix S3 BucketWithRetries upload empty content issue #5217
6161
* [BUGFIX] Query Frontend: Disable `absent`, `absent_over_time` and `scalar` for vertical sharding. #5221
6262
* [BUGFIX] Catch context error in the s3 bucket client. #5240
63+
* [BUGFIX] Fix query frontend remote read empty body. #5257
6364

6465
## 1.14.0 2022-12-02
6566

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.15.0-rc.1
1+
1.15.0-rc.2

integration/e2ecortex/client.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ import (
1919
promapi "github.com/prometheus/client_golang/api"
2020
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
2121
"github.com/prometheus/common/model"
22+
"github.com/prometheus/prometheus/model/labels"
2223
"github.com/prometheus/prometheus/model/rulefmt"
2324
"github.com/prometheus/prometheus/prompb"
25+
"github.com/prometheus/prometheus/storage"
26+
"github.com/prometheus/prometheus/storage/remote"
2427
yaml "gopkg.in/yaml.v3"
2528

2629
"github.com/cortexproject/cortex/pkg/ruler"
@@ -153,6 +156,72 @@ func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) {
153156
return c.query(addr)
154157
}
155158

159+
// RemoteRead runs a remote read query.
160+
func (c *Client) RemoteRead(matchers []*labels.Matcher, start, end time.Time, step time.Duration) (*prompb.ReadResponse, error) {
161+
startMs := start.UnixMilli()
162+
endMs := end.UnixMilli()
163+
stepMs := step.Milliseconds()
164+
165+
q, err := remote.ToQuery(startMs, endMs, matchers, &storage.SelectHints{
166+
Step: stepMs,
167+
Start: startMs,
168+
End: endMs,
169+
})
170+
if err != nil {
171+
return nil, err
172+
}
173+
174+
req := &prompb.ReadRequest{
175+
Queries: []*prompb.Query{q},
176+
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
177+
}
178+
179+
data, err := proto.Marshal(req)
180+
if err != nil {
181+
return nil, err
182+
}
183+
compressed := snappy.Encode(nil, data)
184+
185+
// Call the remote read API endpoint with a timeout.
186+
httpReqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
187+
defer cancel()
188+
189+
httpReq, err := http.NewRequestWithContext(httpReqCtx, "POST", "http://"+c.querierAddress+"/prometheus/api/v1/read", bytes.NewReader(compressed))
190+
if err != nil {
191+
return nil, err
192+
}
193+
httpReq.Header.Set("X-Scope-OrgID", "user-1")
194+
httpReq.Header.Add("Content-Encoding", "snappy")
195+
httpReq.Header.Add("Accept-Encoding", "snappy")
196+
httpReq.Header.Set("Content-Type", "application/x-protobuf")
197+
httpReq.Header.Set("User-Agent", "Prometheus/1.8.2")
198+
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")
199+
200+
httpResp, err := c.httpClient.Do(httpReq)
201+
if err != nil {
202+
return nil, err
203+
}
204+
if httpResp.StatusCode != http.StatusOK {
205+
return nil, fmt.Errorf("unexpected status code %d", httpResp.StatusCode)
206+
}
207+
208+
compressed, err = io.ReadAll(httpResp.Body)
209+
if err != nil {
210+
return nil, err
211+
}
212+
213+
uncompressed, err := snappy.Decode(nil, compressed)
214+
if err != nil {
215+
return nil, err
216+
}
217+
218+
var resp prompb.ReadResponse
219+
if err = proto.Unmarshal(uncompressed, &resp); err != nil {
220+
return nil, err
221+
}
222+
return &resp, nil
223+
}
224+
156225
func (c *Client) query(addr string) (*http.Response, []byte, error) {
157226
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
158227
defer cancel()

integration/query_frontend_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type queryFrontendTestConfig struct {
3131
testMissingMetricName bool
3232
querySchedulerEnabled bool
3333
queryStatsEnabled bool
34+
remoteReadEnabled bool
3435
setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string)
3536
}
3637

@@ -194,6 +195,19 @@ func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) {
194195
})
195196
}
196197

198+
func TestQueryFrontendRemoteRead(t *testing.T) {
199+
runQueryFrontendTest(t, queryFrontendTestConfig{
200+
remoteReadEnabled: true,
201+
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
202+
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
203+
204+
minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"])
205+
require.NoError(t, s.StartAndWaitReady(minio))
206+
return cortexConfigFile, flags
207+
},
208+
})
209+
}
210+
197211
func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
198212
const numUsers = 10
199213
const numQueriesPerUser = 10
@@ -307,6 +321,18 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
307321
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0])
308322
}
309323

324+
// No need to repeat the test on remote read for each user.
325+
if userID == 0 && cfg.remoteReadEnabled {
326+
start := now.Add(-1 * time.Hour)
327+
end := now.Add(1 * time.Hour)
328+
res, err := c.RemoteRead([]*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "series_1")}, start, end, time.Second)
329+
require.NoError(t, err)
330+
require.True(t, len(res.Results) > 0)
331+
require.True(t, len(res.Results[0].Timeseries) > 0)
332+
require.True(t, len(res.Results[0].Timeseries[0].Samples) > 0)
333+
require.True(t, len(res.Results[0].Timeseries[0].Labels) > 0)
334+
}
335+
310336
// In this test we do ensure that the /series start/end time is ignored and Cortex
311337
// always returns series in ingesters memory. No need to repeat it for each user.
312338
if userID == 0 {
@@ -342,6 +368,10 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
342368
extra++
343369
}
344370

371+
if cfg.remoteReadEnabled {
372+
extra++
373+
}
374+
345375
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total"))
346376

347377
// The number of received request is greater than the query requests because include

pkg/frontend/transport/handler.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,14 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
139139
r.Body = io.NopCloser(io.TeeReader(r.Body, &buf))
140140
// We parse form here so that we can use buf as body, in order to
141141
// prevent https://github.com/cortexproject/cortex/issues/5201.
142-
if err := r.ParseForm(); err != nil {
143-
writeError(w, err)
144-
return
142+
// Exclude remote read here as we don't have to buffer its body.
143+
if !strings.Contains(r.URL.Path, "api/v1/read") {
144+
if err := r.ParseForm(); err != nil {
145+
writeError(w, err)
146+
return
147+
}
148+
r.Body = io.NopCloser(&buf)
145149
}
146-
r.Body = io.NopCloser(&buf)
147150

148151
startTime := time.Now()
149152
resp, err := f.roundTripper.RoundTrip(r)

0 commit comments

Comments
 (0)