@@ -220,133 +220,240 @@ func TestOTLPConvertToPromTS(t *testing.T) {
220220 }
221221}
222222
223+ // for testing
224+ type resetReader struct {
225+ * bytes.Reader
226+ body []byte
227+ }
228+
229+ func newResetReader (body []byte ) * resetReader {
230+ return & resetReader {
231+ Reader : bytes .NewReader (body ),
232+ body : body ,
233+ }
234+ }
235+
236+ func (r * resetReader ) Reset () {
237+ r .Reader .Reset (r .body )
238+ }
239+
240+ func (r * resetReader ) Close () error {
241+ return nil
242+ }
243+
244+ func getOTLPHttpRequest (otlpRequest * pmetricotlp.ExportRequest , contentType , encodingType string ) (* http.Request , error ) {
245+ ctx := context .Background ()
246+ ctx = user .InjectOrgID (ctx , "user-1" )
247+
248+ var body []byte
249+ var err error
250+ switch contentType {
251+ case jsonContentType :
252+ body , err = otlpRequest .MarshalJSON ()
253+ if err != nil {
254+ return nil , err
255+ }
256+ case pbContentType :
257+ body , err = otlpRequest .MarshalProto ()
258+ if err != nil {
259+ return nil , err
260+ }
261+ }
262+
263+ if encodingType == "gzip" {
264+ var gzipBody bytes.Buffer
265+ gz := gzip .NewWriter (& gzipBody )
266+ _ , err = gz .Write (body )
267+ if err != nil {
268+ return nil , err
269+ }
270+ if err = gz .Close (); err != nil {
271+ return nil , err
272+ }
273+ body = gzipBody .Bytes ()
274+ }
275+
276+ req , err := http .NewRequestWithContext (ctx , "" , "" , newResetReader (body ))
277+ if err != nil {
278+ return nil , err
279+ }
280+
281+ switch contentType {
282+ case jsonContentType :
283+ req .Header .Set ("Content-Type" , jsonContentType )
284+ case pbContentType :
285+ req .Header .Set ("Content-Type" , pbContentType )
286+ }
287+
288+ if encodingType != "" {
289+ req .Header .Set ("Content-Encoding" , encodingType )
290+ }
291+ req .ContentLength = int64 (len (body ))
292+
293+ return req , nil
294+ }
295+
296+ func BenchmarkOTLPWriteHandler (b * testing.B ) {
297+ cfg := distributor.OTLPConfig {
298+ ConvertAllAttributes : false ,
299+ DisableTargetInfo : false ,
300+ }
301+ overrides , err := validation .NewOverrides (querier .DefaultLimitsConfig (), nil )
302+ require .NoError (b , err )
303+
304+ exportRequest := generateOTLPWriteRequest ()
305+ mockPushFunc := func (context.Context , * cortexpb.WriteRequest ) (* cortexpb.WriteResponse , error ) {
306+ return & cortexpb.WriteResponse {}, nil
307+ }
308+ handler := OTLPHandler (10000 , overrides , cfg , nil , mockPushFunc )
309+
310+ b .Run ("json with no compression" , func (b * testing.B ) {
311+ req , err := getOTLPHttpRequest (& exportRequest , jsonContentType , "" )
312+ require .NoError (b , err )
313+
314+ b .ResetTimer ()
315+ b .ReportAllocs ()
316+ for i := 0 ; i < b .N ; i ++ {
317+ recorder := httptest .NewRecorder ()
318+ handler .ServeHTTP (recorder , req )
319+
320+ resp := recorder .Result ()
321+ require .Equal (b , http .StatusOK , resp .StatusCode )
322+ req .Body .(* resetReader ).Reset ()
323+ }
324+ })
325+ b .Run ("json with gzip" , func (b * testing.B ) {
326+ req , err := getOTLPHttpRequest (& exportRequest , jsonContentType , "gzip" )
327+ require .NoError (b , err )
328+
329+ b .ResetTimer ()
330+ b .ReportAllocs ()
331+ for i := 0 ; i < b .N ; i ++ {
332+ recorder := httptest .NewRecorder ()
333+ handler .ServeHTTP (recorder , req )
334+
335+ resp := recorder .Result ()
336+ require .Equal (b , http .StatusOK , resp .StatusCode )
337+ req .Body .(* resetReader ).Reset ()
338+ }
339+ })
340+ b .Run ("proto with no compression" , func (b * testing.B ) {
341+ req , err := getOTLPHttpRequest (& exportRequest , pbContentType , "" )
342+ require .NoError (b , err )
343+
344+ b .ResetTimer ()
345+ b .ReportAllocs ()
346+ for i := 0 ; i < b .N ; i ++ {
347+ recorder := httptest .NewRecorder ()
348+ handler .ServeHTTP (recorder , req )
349+
350+ resp := recorder .Result ()
351+ require .Equal (b , http .StatusOK , resp .StatusCode )
352+ req .Body .(* resetReader ).Reset ()
353+ }
354+ })
355+ b .Run ("proto with gzip" , func (b * testing.B ) {
356+ req , err := getOTLPHttpRequest (& exportRequest , pbContentType , "gzip" )
357+ require .NoError (b , err )
358+
359+ b .ResetTimer ()
360+ b .ReportAllocs ()
361+ for i := 0 ; i < b .N ; i ++ {
362+ recorder := httptest .NewRecorder ()
363+ handler .ServeHTTP (recorder , req )
364+
365+ resp := recorder .Result ()
366+ require .Equal (b , http .StatusOK , resp .StatusCode )
367+ req .Body .(* resetReader ).Reset ()
368+ }
369+ })
370+ }
371+
223372func TestOTLPWriteHandler (t * testing.T ) {
224373 cfg := distributor.OTLPConfig {
225374 ConvertAllAttributes : false ,
226375 DisableTargetInfo : false ,
227376 }
228377
229- exportRequest := generateOTLPWriteRequest (t )
378+ exportRequest := generateOTLPWriteRequest ()
230379
231380 tests := []struct {
232381 description string
233382 maxRecvMsgSize int
234- format string
383+ contentType string
235384 expectedStatusCode int
236385 expectedErrMsg string
237- gzipCompression bool
238386 encodingType string
239387 }{
240388 {
241389 description : "Test proto format write with no compression" ,
242390 maxRecvMsgSize : 10000 ,
243- format : pbContentType ,
391+ contentType : pbContentType ,
244392 expectedStatusCode : http .StatusOK ,
245393 },
246394 {
247395 description : "Test proto format write with gzip" ,
248396 maxRecvMsgSize : 10000 ,
249- format : pbContentType ,
397+ contentType : pbContentType ,
250398 expectedStatusCode : http .StatusOK ,
251399 encodingType : "gzip" ,
252- gzipCompression : true ,
253400 },
254401 {
255402 description : "Test json format write with no compression" ,
256403 maxRecvMsgSize : 10000 ,
257- format : jsonContentType ,
404+ contentType : jsonContentType ,
258405 expectedStatusCode : http .StatusOK ,
259406 },
260407 {
261408 description : "Test json format write with gzip" ,
262409 maxRecvMsgSize : 10000 ,
263- format : jsonContentType ,
410+ contentType : jsonContentType ,
264411 expectedStatusCode : http .StatusOK ,
265412 encodingType : "gzip" ,
266- gzipCompression : true ,
267413 },
268414 {
269415 description : "request too big than maxRecvMsgSize (proto) with no compression" ,
270416 maxRecvMsgSize : 10 ,
271- format : pbContentType ,
417+ contentType : pbContentType ,
272418 expectedStatusCode : http .StatusBadRequest ,
273419 expectedErrMsg : "received message larger than max" ,
274420 },
275421 {
276422 description : "request too big than maxRecvMsgSize (proto) with gzip" ,
277423 maxRecvMsgSize : 10 ,
278- format : pbContentType ,
424+ contentType : pbContentType ,
279425 expectedStatusCode : http .StatusBadRequest ,
280426 expectedErrMsg : "received message larger than max" ,
281427 encodingType : "gzip" ,
282- gzipCompression : true ,
283428 },
284429 {
285430 description : "request too big than maxRecvMsgSize (json) with no compression" ,
286431 maxRecvMsgSize : 10 ,
287- format : jsonContentType ,
432+ contentType : jsonContentType ,
288433 expectedStatusCode : http .StatusBadRequest ,
289434 expectedErrMsg : "received message larger than max" ,
290435 },
291436 {
292437 description : "request too big than maxRecvMsgSize (json) with gzip" ,
293438 maxRecvMsgSize : 10 ,
294- format : jsonContentType ,
439+ contentType : jsonContentType ,
295440 expectedStatusCode : http .StatusBadRequest ,
296441 expectedErrMsg : "received message larger than max" ,
297442 encodingType : "gzip" ,
298- gzipCompression : true ,
299443 },
300444 {
301445 description : "invalid encoding type: snappy" ,
302446 maxRecvMsgSize : 10000 ,
303- format : jsonContentType ,
447+ contentType : jsonContentType ,
304448 expectedStatusCode : http .StatusBadRequest ,
305449 encodingType : "snappy" ,
306450 },
307451 }
308452
309453 for _ , test := range tests {
310454 t .Run (test .description , func (t * testing.T ) {
311- ctx := context .Background ()
312- ctx = user .InjectOrgID (ctx , "user-1" )
313- var req * http.Request
314-
315- compressionFunc := func (t * testing.T , body []byte ) []byte {
316- var b bytes.Buffer
317- gz := gzip .NewWriter (& b )
318- _ , err := gz .Write (body )
319- require .NoError (t , err )
320- require .NoError (t , gz .Close ())
321-
322- return b .Bytes ()
323- }
324-
325- if test .format == pbContentType {
326- buf , err := exportRequest .MarshalProto ()
327- require .NoError (t , err )
328-
329- if test .gzipCompression {
330- buf = compressionFunc (t , buf )
331- }
332-
333- req , err = http .NewRequestWithContext (ctx , "" , "" , bytes .NewReader (buf ))
334- require .NoError (t , err )
335- req .Header .Set ("Content-Type" , pbContentType )
336- req .Header .Set ("Content-Encoding" , test .encodingType )
337- } else {
338- buf , err := exportRequest .MarshalJSON ()
339- require .NoError (t , err )
340-
341- if test .gzipCompression {
342- buf = compressionFunc (t , buf )
343- }
344-
345- req , err = http .NewRequestWithContext (ctx , "" , "" , bytes .NewReader (buf ))
346- require .NoError (t , err )
347- req .Header .Set ("Content-Type" , jsonContentType )
348- req .Header .Set ("Content-Encoding" , test .encodingType )
349- }
455+ req , err := getOTLPHttpRequest (& exportRequest , test .contentType , test .encodingType )
456+ require .NoError (t , err )
350457
351458 push := verifyOTLPWriteRequestHandler (t , cortexpb .API )
352459 overrides , err := validation .NewOverrides (querier .DefaultLimitsConfig (), nil )
@@ -368,7 +475,7 @@ func TestOTLPWriteHandler(t *testing.T) {
368475 }
369476}
370477
371- func generateOTLPWriteRequest (t * testing. T ) pmetricotlp.ExportRequest {
478+ func generateOTLPWriteRequest () pmetricotlp.ExportRequest {
372479 d := pmetric .NewMetrics ()
373480
374481 // Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
0 commit comments