Skip to content

Commit 64f7c58

Browse files
committed
Support remote write v2 by converting request
Signed-off-by: SungJin1212 <[email protected]>
1 parent a8d1fbf commit 64f7c58

File tree

8 files changed

+178
-42
lines changed

8 files changed

+178
-42
lines changed

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3135,6 +3135,11 @@ ha_tracker:
31353135
# CLI flag: -distributor.sign-write-requests
31363136
[sign_write_requests: <boolean> | default = false]
31373137
3138+
# EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push
3139+
# request.
3140+
# CLI flag: -distributor.remote-write2-enabled
3141+
[remote_write2_enabled: <boolean> | default = false]
3142+
31383143
# EXPERIMENTAL: If enabled, distributor would use stream connection to send
31393144
# requests to ingesters.
31403145
# CLI flag: -distributor.use-stream-push

integration/remote_write_v2_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import (
2525
)
2626

2727
func TestIngesterRollingUpdate(t *testing.T) {
28-
// Test ingester rolling update situation: when -distributor.remote-writev2-enabled is true, and ingester uses the v1.19.0 image.
29-
// Expected: remote write 2.0 push success
28+
// Test ingester rolling update situation: when -distributor.remote-write2-enabled is true, and ingester uses the v1.19.0 image.
29+
// Expected: remote write 2.0 push success, but response header values are set to "0".
3030
const blockRangePeriod = 5 * time.Second
3131
ingesterImage := "quay.io/cortexproject/cortex:v1.19.0"
3232

pkg/cortexpb/cortex.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ message WriteResponse {
4343
// Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars
4444
int64 Exemplars = 5;
4545
}
46+
message WriteResponse {
47+
// Samples represents X-Prometheus-Remote-Write-Written-Samples
48+
int64 Samples = 1;
49+
// Histograms represents X-Prometheus-Remote-Write-Written-Histograms
50+
int64 Histograms = 2;
51+
// Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars
52+
int64 Exemplars = 3;
53+
}
4654

4755
message TimeSeries {
4856
repeated LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"];

pkg/distributor/write_stats.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package distributor
2+
3+
import (
4+
"go.uber.org/atomic"
5+
)
6+
7+
type WriteStats struct {
8+
// Samples represents X-Prometheus-Remote-Write-Written-Samples
9+
Samples atomic.Int64
10+
// Histograms represents X-Prometheus-Remote-Write-Written-Histograms
11+
Histograms atomic.Int64
12+
// Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars
13+
Exemplars atomic.Int64
14+
}
15+
16+
func (w *WriteStats) SetSamples(samples int64) {
17+
if w == nil {
18+
return
19+
}
20+
21+
w.Samples.Store(samples)
22+
}
23+
24+
func (w *WriteStats) SetHistograms(histograms int64) {
25+
if w == nil {
26+
return
27+
}
28+
29+
w.Histograms.Store(histograms)
30+
}
31+
32+
func (w *WriteStats) SetExemplars(exemplars int64) {
33+
if w == nil {
34+
return
35+
}
36+
37+
w.Exemplars.Store(exemplars)
38+
}
39+
40+
func (w *WriteStats) LoadSamples() int64 {
41+
if w == nil {
42+
return 0
43+
}
44+
45+
return w.Samples.Load()
46+
}
47+
48+
func (w *WriteStats) LoadHistogram() int64 {
49+
if w == nil {
50+
return 0
51+
}
52+
53+
return w.Histograms.Load()
54+
}
55+
56+
func (w *WriteStats) LoadExemplars() int64 {
57+
if w == nil {
58+
return 0
59+
}
60+
61+
return w.Exemplars.Load()
62+
}

pkg/distributor/write_stats_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package distributor
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func Test_SetAndLoad(t *testing.T) {
10+
ws := &WriteStats{}
11+
12+
t.Run("Samples", func(t *testing.T) {
13+
ws.SetSamples(3)
14+
assert.Equal(t, int64(3), ws.LoadSamples())
15+
})
16+
t.Run("Histograms", func(t *testing.T) {
17+
ws.SetHistograms(10)
18+
assert.Equal(t, int64(10), ws.LoadHistogram())
19+
})
20+
t.Run("Exemplars", func(t *testing.T) {
21+
ws.SetExemplars(2)
22+
assert.Equal(t, int64(2), ws.LoadExemplars())
23+
})
24+
}
25+
26+
func Test_NilReceiver(t *testing.T) {
27+
var ws *WriteStats
28+
29+
t.Run("Samples", func(t *testing.T) {
30+
ws.SetSamples(3)
31+
assert.Equal(t, int64(0), ws.LoadSamples())
32+
})
33+
t.Run("Histograms", func(t *testing.T) {
34+
ws.SetHistograms(10)
35+
assert.Equal(t, int64(0), ws.LoadHistogram())
36+
})
37+
t.Run("Exemplars", func(t *testing.T) {
38+
ws.SetExemplars(2)
39+
assert.Equal(t, int64(0), ws.LoadExemplars())
40+
})
41+
}

pkg/ingester/ingester.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1569,7 +1569,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
15691569
return &cortexpb.WriteResponse{}, httpgrpc.Errorf(code, "%s", wrapWithUser(firstPartialErr, userID).Error())
15701570
}
15711571

1572-
return &cortexpb.WriteResponse{}, nil
1572+
writeResponse := &cortexpb.WriteResponse{
1573+
Samples: int64(succeededSamplesCount),
1574+
Histograms: int64(succeededHistogramsCount),
1575+
Exemplars: int64(succeededExemplarsCount),
1576+
}
1577+
1578+
return writeResponse, nil
15731579
}
15741580

15751581
func (i *Ingester) PushStream(srv client.Ingester_PushStreamServer) error {

pkg/util/push/push.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ const (
3232
rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written"
3333
rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written"
3434
rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written"
35+
36+
errMsgNotEnabledPRW2 = "Not enabled prometheus remote write v2 push request"
3537
)
3638

3739
// Func defines the type of the push. It is similar to http.HandlerFunc.

pkg/util/push/push_test.go

Lines changed: 51 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -35,30 +35,34 @@ var (
3535
}
3636
)
3737

38-
func makeV2ReqWithSeries(num int) *writev2.Request {
39-
ts := make([]writev2.TimeSeries, 0, num)
38+
func makeV2ReqWithSeries(num int) *cortexpb.PreallocWriteRequestV2 {
39+
ts := make([]cortexpb.PreallocTimeseriesV2, 0, num)
4040
symbols := []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}
4141
for i := 0; i < num; i++ {
42-
ts = append(ts, writev2.TimeSeries{
43-
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
44-
Metadata: writev2.Metadata{
45-
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
46-
47-
HelpRef: 15,
48-
UnitRef: 16,
49-
},
50-
Samples: []writev2.Sample{{Value: 1, Timestamp: 10}},
51-
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 10}},
52-
Histograms: []writev2.Histogram{
53-
writev2.FromIntHistogram(10, &testHistogram),
54-
writev2.FromFloatHistogram(20, testHistogram.ToFloat(nil)),
42+
ts = append(ts, cortexpb.PreallocTimeseriesV2{
43+
TimeSeriesV2: &cortexpb.TimeSeriesV2{
44+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
45+
Metadata: cortexpb.MetadataV2{
46+
Type: cortexpb.METRIC_TYPE_GAUGE,
47+
48+
HelpRef: 15,
49+
UnitRef: 16,
50+
},
51+
Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 10}},
52+
Exemplars: []cortexpb.ExemplarV2{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 10}},
53+
Histograms: []cortexpb.Histogram{
54+
cortexpb.HistogramToHistogramProto(10, &testHistogram),
55+
cortexpb.FloatHistogramToHistogramProto(20, testHistogram.ToFloat(nil)),
56+
},
5557
},
5658
})
5759
}
5860

59-
return &writev2.Request{
60-
Symbols: symbols,
61-
Timeseries: ts,
61+
return &cortexpb.PreallocWriteRequestV2{
62+
WriteRequestV2: cortexpb.WriteRequestV2{
63+
Symbols: symbols,
64+
Timeseries: ts,
65+
},
6266
}
6367
}
6468

@@ -164,36 +168,44 @@ func Benchmark_convertV2RequestToV1(b *testing.B) {
164168
}
165169

166170
func Test_convertV2RequestToV1(t *testing.T) {
167-
var v2Req writev2.Request
171+
var v2Req cortexpb.PreallocWriteRequestV2
168172

169173
fh := tsdbutil.GenerateTestFloatHistogram(1)
170-
ph := writev2.FromFloatHistogram(4, fh)
174+
ph := cortexpb.FloatHistogramToHistogramProto(4, fh)
171175

172176
symbols := []string{"", "__name__", "test_metric", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}
173-
timeseries := []writev2.TimeSeries{
177+
timeseries := []cortexpb.PreallocTimeseriesV2{
174178
{
175-
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
176-
Metadata: writev2.Metadata{
177-
Type: writev2.Metadata_METRIC_TYPE_COUNTER,
179+
TimeSeriesV2: &cortexpb.TimeSeriesV2{
180+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
181+
Metadata: cortexpb.MetadataV2{
182+
Type: cortexpb.METRIC_TYPE_COUNTER,
178183

179-
HelpRef: 15,
180-
UnitRef: 16,
184+
HelpRef: 15,
185+
UnitRef: 16,
186+
},
187+
Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 1}},
188+
Exemplars: []cortexpb.ExemplarV2{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}},
181189
},
182-
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
183-
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}},
184190
},
185191
{
186-
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
187-
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
192+
TimeSeriesV2: &cortexpb.TimeSeriesV2{
193+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
194+
Samples: []cortexpb.Sample{{Value: 2, TimestampMs: 2}},
195+
},
188196
},
189197
{
190-
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
191-
Samples: []writev2.Sample{{Value: 3, Timestamp: 3}},
198+
TimeSeriesV2: &cortexpb.TimeSeriesV2{
199+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
200+
Samples: []cortexpb.Sample{{Value: 3, TimestampMs: 3}},
201+
},
192202
},
193203
{
194-
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
195-
Histograms: []writev2.Histogram{ph, ph},
196-
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}},
204+
TimeSeriesV2: &cortexpb.TimeSeriesV2{
205+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
206+
Histograms: []cortexpb.Histogram{ph, ph},
207+
Exemplars: []cortexpb.ExemplarV2{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}},
208+
},
197209
},
198210
}
199211

@@ -394,7 +406,7 @@ func TestHandler_ignoresSkipLabelNameValidationIfSet(t *testing.T) {
394406
}
395407
}
396408

397-
func verifyWriteRequestHandler(t *testing.T, expectSource cortexpb.WriteRequest_SourceEnum) func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) {
409+
func verifyWriteRequestHandler(t *testing.T, expectSource cortexpb.SourceEnum) func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) {
398410
t.Helper()
399411
return func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) {
400412
assert.Len(t, request.Timeseries, 1)
@@ -444,7 +456,7 @@ func createRequest(t *testing.T, protobuf []byte, isV2 bool) *http.Request {
444456
return req
445457
}
446458

447-
func createCortexRemoteWriteV2Protobuf(t *testing.T, skipLabelNameValidation bool, source cortexpb.WriteRequest_SourceEnum) []byte {
459+
func createCortexRemoteWriteV2Protobuf(t *testing.T, skipLabelNameValidation bool, source cortexpb.SourceEnum) []byte {
448460
t.Helper()
449461
input := writev2.Request{
450462
Symbols: []string{"", "__name__", "foo"},
@@ -500,7 +512,7 @@ func createPrometheusRemoteWriteProtobuf(t *testing.T) []byte {
500512
require.NoError(t, err)
501513
return inoutBytes
502514
}
503-
func createCortexWriteRequestProtobuf(t *testing.T, skipLabelNameValidation bool, source cortexpb.WriteRequest_SourceEnum) []byte {
515+
func createCortexWriteRequestProtobuf(t *testing.T, skipLabelNameValidation bool, source cortexpb.SourceEnum) []byte {
504516
t.Helper()
505517
ts := cortexpb.PreallocTimeseries{
506518
TimeSeries: &cortexpb.TimeSeries{
@@ -520,4 +532,4 @@ func createCortexWriteRequestProtobuf(t *testing.T, skipLabelNameValidation bool
520532
inoutBytes, err := input.Marshal()
521533
require.NoError(t, err)
522534
return inoutBytes
523-
}
535+
}

0 commit comments

Comments
 (0)