@@ -3,12 +3,14 @@ package push
33import (
44 "bytes"
55 "context"
6+ "fmt"
67 "net/http"
78 "net/http/httptest"
89 "testing"
910 "time"
1011
1112 "github.com/golang/snappy"
13+ "github.com/prometheus/prometheus/model/histogram"
1214 "github.com/prometheus/prometheus/prompb"
1315 writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
1416 "github.com/prometheus/prometheus/tsdb/tsdbutil"
@@ -19,6 +21,148 @@ import (
1921 "github.com/cortexproject/cortex/pkg/cortexpb"
2022)
2123
24+ var (
25+ testHistogram = histogram.Histogram {
26+ Schema : 2 ,
27+ ZeroThreshold : 1e-128 ,
28+ ZeroCount : 0 ,
29+ Count : 3 ,
30+ Sum : 20 ,
31+ PositiveSpans : []histogram.Span {{Offset : 0 , Length : 1 }},
32+ PositiveBuckets : []int64 {1 },
33+ NegativeSpans : []histogram.Span {{Offset : 0 , Length : 1 }},
34+ NegativeBuckets : []int64 {2 },
35+ }
36+ )
37+
38+ func makeV2ReqWithSeries (num int ) * writev2.Request {
39+ ts := make ([]writev2.TimeSeries , 0 , num )
40+ symbols := []string {"" , "__name__" , "test_metric1" , "b" , "c" , "baz" , "qux" , "d" , "e" , "foo" , "bar" , "f" , "g" , "h" , "i" , "Test gauge for test purposes" , "Maybe op/sec who knows (:" , "Test counter for test purposes" }
41+ for i := 0 ; i < num ; i ++ {
42+ ts = append (ts , writev2.TimeSeries {
43+ LabelsRefs : []uint32 {1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 },
44+ Metadata : writev2.Metadata {
45+ Type : writev2 .Metadata_METRIC_TYPE_GAUGE ,
46+
47+ HelpRef : 15 ,
48+ UnitRef : 16 ,
49+ },
50+ Samples : []writev2.Sample {{Value : 1 , Timestamp : 10 }},
51+ Exemplars : []writev2.Exemplar {{LabelsRefs : []uint32 {11 , 12 }, Value : 1 , Timestamp : 10 }},
52+ Histograms : []writev2.Histogram {
53+ writev2 .FromIntHistogram (10 , & testHistogram ),
54+ writev2 .FromFloatHistogram (20 , testHistogram .ToFloat (nil )),
55+ },
56+ })
57+ }
58+
59+ return & writev2.Request {
60+ Symbols : symbols ,
61+ Timeseries : ts ,
62+ }
63+ }
64+
65+ func createPRW1HTTPRequest (seriesNum int ) (* http.Request , error ) {
66+ series := makeV2ReqWithSeries (seriesNum )
67+ v1Req , err := convertV2RequestToV1 (series )
68+ if err != nil {
69+ return nil , err
70+ }
71+ protobuf , err := v1Req .Marshal ()
72+ if err != nil {
73+ return nil , err
74+ }
75+
76+ body := snappy .Encode (nil , protobuf )
77+ req , err := http .NewRequest ("POST" , "http://localhost/" , newResetReader (body ))
78+ if err != nil {
79+ return nil , err
80+ }
81+
82+ req .Header .Add ("Content-Encoding" , "snappy" )
83+ req .Header .Set ("Content-Type" , appProtoContentType )
84+ req .Header .Set ("X-Prometheus-Remote-Write-Version" , remoteWriteVersion1HeaderValue )
85+ req .ContentLength = int64 (len (body ))
86+ return req , nil
87+ }
88+
89+ func createPRW2HTTPRequest (seriesNum int ) (* http.Request , error ) {
90+ series := makeV2ReqWithSeries (seriesNum )
91+ protobuf , err := series .Marshal ()
92+ if err != nil {
93+ return nil , err
94+ }
95+
96+ body := snappy .Encode (nil , protobuf )
97+ req , err := http .NewRequest ("POST" , "http://localhost/" , newResetReader (body ))
98+ if err != nil {
99+ return nil , err
100+ }
101+
102+ req .Header .Add ("Content-Encoding" , "snappy" )
103+ req .Header .Set ("Content-Type" , appProtoV2ContentType )
104+ req .Header .Set ("X-Prometheus-Remote-Write-Version" , remoteWriteVersion20HeaderValue )
105+ req .ContentLength = int64 (len (body ))
106+ return req , nil
107+ }
108+
109+ func Benchmark_Handler (b * testing.B ) {
110+ mockHandler := func (context.Context , * cortexpb.WriteRequest ) (* cortexpb.WriteResponse , error ) {
111+ // Nothing to do.
112+ return & cortexpb.WriteResponse {}, nil
113+ }
114+ testSeriesNums := []int {10 , 100 , 500 , 1000 }
115+ for _ , seriesNum := range testSeriesNums {
116+ b .Run (fmt .Sprintf ("PRW1 with %d series" , seriesNum ), func (b * testing.B ) {
117+ handler := Handler (true , 1000000 , nil , mockHandler )
118+ req , err := createPRW1HTTPRequest (seriesNum )
119+ require .NoError (b , err )
120+
121+ b .ResetTimer ()
122+ b .ReportAllocs ()
123+
124+ for i := 0 ; i < b .N ; i ++ {
125+ resp := httptest .NewRecorder ()
126+ handler .ServeHTTP (resp , req )
127+ assert .Equal (b , http .StatusOK , resp .Code )
128+ req .Body .(* resetReader ).Reset ()
129+ }
130+ })
131+ b .Run (fmt .Sprintf ("PRW2 with %d series" , seriesNum ), func (b * testing.B ) {
132+ handler := Handler (true , 1000000 , nil , mockHandler )
133+ req , err := createPRW2HTTPRequest (seriesNum )
134+ require .NoError (b , err )
135+
136+ b .ResetTimer ()
137+ b .ReportAllocs ()
138+
139+ for i := 0 ; i < b .N ; i ++ {
140+ resp := httptest .NewRecorder ()
141+ handler .ServeHTTP (resp , req )
142+ assert .Equal (b , http .StatusOK , resp .Code )
143+ req .Body .(* resetReader ).Reset ()
144+ }
145+ })
146+ }
147+ }
148+
149+ func Benchmark_convertV2RequestToV1 (b * testing.B ) {
150+ testSeriesNums := []int {100 , 500 , 1000 }
151+
152+ for _ , seriesNum := range testSeriesNums {
153+ b .Run (fmt .Sprintf ("%d series" , seriesNum ), func (b * testing.B ) {
154+ series := makeV2ReqWithSeries (seriesNum )
155+
156+ b .ResetTimer ()
157+ b .ReportAllocs ()
158+ for i := 0 ; i < b .N ; i ++ {
159+ _ , err := convertV2RequestToV1 (series )
160+ require .NoError (b , err )
161+ }
162+ })
163+ }
164+ }
165+
22166func Test_convertV2RequestToV1 (t * testing.T ) {
23167 var v2Req writev2.Request
24168
0 commit comments