Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 163 additions & 56 deletions pkg/util/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,133 +220,240 @@ func TestOTLPConvertToPromTS(t *testing.T) {
}
}

// for testing
type resetReader struct {
*bytes.Reader
body []byte
}

func newResetReader(body []byte) *resetReader {
return &resetReader{
Reader: bytes.NewReader(body),
body: body,
}
}

func (r *resetReader) Reset() {
r.Reader.Reset(r.body)
}

func (r *resetReader) Close() error {
return nil
}

func getOTLPHttpRequest(otlpRequest *pmetricotlp.ExportRequest, contentType, encodingType string) (*http.Request, error) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")

var body []byte
var err error
switch contentType {
case jsonContentType:
body, err = otlpRequest.MarshalJSON()
if err != nil {
return nil, err
}
case pbContentType:
body, err = otlpRequest.MarshalProto()
if err != nil {
return nil, err
}
}

if encodingType == "gzip" {
var gzipBody bytes.Buffer
gz := gzip.NewWriter(&gzipBody)
_, err = gz.Write(body)
if err != nil {
return nil, err
}
if err = gz.Close(); err != nil {
return nil, err
}
body = gzipBody.Bytes()
}

req, err := http.NewRequestWithContext(ctx, "", "", newResetReader(body))
if err != nil {
return nil, err
}

switch contentType {
case jsonContentType:
req.Header.Set("Content-Type", jsonContentType)
case pbContentType:
req.Header.Set("Content-Type", pbContentType)
}

if encodingType != "" {
req.Header.Set("Content-Encoding", encodingType)
}
req.ContentLength = int64(len(body))

return req, nil
}

func BenchmarkOTLPWriteHandler(b *testing.B) {
cfg := distributor.OTLPConfig{
ConvertAllAttributes: false,
DisableTargetInfo: false,
}
overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil)
require.NoError(b, err)

exportRequest := generateOTLPWriteRequest()
mockPushFunc := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
return &cortexpb.WriteResponse{}, nil
}
handler := OTLPHandler(10000, overrides, cfg, nil, mockPushFunc)

b.Run("json with no compression", func(b *testing.B) {
req, err := getOTLPHttpRequest(&exportRequest, jsonContentType, "")
require.NoError(b, err)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

resp := recorder.Result()
require.Equal(b, http.StatusOK, resp.StatusCode)
req.Body.(*resetReader).Reset()
}
})
b.Run("json with gzip", func(b *testing.B) {
req, err := getOTLPHttpRequest(&exportRequest, jsonContentType, "gzip")
require.NoError(b, err)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

resp := recorder.Result()
require.Equal(b, http.StatusOK, resp.StatusCode)
req.Body.(*resetReader).Reset()
}
})
b.Run("proto with no compression", func(b *testing.B) {
req, err := getOTLPHttpRequest(&exportRequest, pbContentType, "")
require.NoError(b, err)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

resp := recorder.Result()
require.Equal(b, http.StatusOK, resp.StatusCode)
req.Body.(*resetReader).Reset()
}
})
b.Run("proto with gzip", func(b *testing.B) {
req, err := getOTLPHttpRequest(&exportRequest, pbContentType, "gzip")
require.NoError(b, err)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

resp := recorder.Result()
require.Equal(b, http.StatusOK, resp.StatusCode)
req.Body.(*resetReader).Reset()
}
})
}

func TestOTLPWriteHandler(t *testing.T) {
cfg := distributor.OTLPConfig{
ConvertAllAttributes: false,
DisableTargetInfo: false,
}

exportRequest := generateOTLPWriteRequest(t)
exportRequest := generateOTLPWriteRequest()

tests := []struct {
description string
maxRecvMsgSize int
format string
contentType string
expectedStatusCode int
expectedErrMsg string
gzipCompression bool
encodingType string
}{
{
description: "Test proto format write with no compression",
maxRecvMsgSize: 10000,
format: pbContentType,
contentType: pbContentType,
expectedStatusCode: http.StatusOK,
},
{
description: "Test proto format write with gzip",
maxRecvMsgSize: 10000,
format: pbContentType,
contentType: pbContentType,
expectedStatusCode: http.StatusOK,
encodingType: "gzip",
gzipCompression: true,
},
{
description: "Test json format write with no compression",
maxRecvMsgSize: 10000,
format: jsonContentType,
contentType: jsonContentType,
expectedStatusCode: http.StatusOK,
},
{
description: "Test json format write with gzip",
maxRecvMsgSize: 10000,
format: jsonContentType,
contentType: jsonContentType,
expectedStatusCode: http.StatusOK,
encodingType: "gzip",
gzipCompression: true,
},
{
description: "request too big than maxRecvMsgSize (proto) with no compression",
maxRecvMsgSize: 10,
format: pbContentType,
contentType: pbContentType,
expectedStatusCode: http.StatusBadRequest,
expectedErrMsg: "received message larger than max",
},
{
description: "request too big than maxRecvMsgSize (proto) with gzip",
maxRecvMsgSize: 10,
format: pbContentType,
contentType: pbContentType,
expectedStatusCode: http.StatusBadRequest,
expectedErrMsg: "received message larger than max",
encodingType: "gzip",
gzipCompression: true,
},
{
description: "request too big than maxRecvMsgSize (json) with no compression",
maxRecvMsgSize: 10,
format: jsonContentType,
contentType: jsonContentType,
expectedStatusCode: http.StatusBadRequest,
expectedErrMsg: "received message larger than max",
},
{
description: "request too big than maxRecvMsgSize (json) with gzip",
maxRecvMsgSize: 10,
format: jsonContentType,
contentType: jsonContentType,
expectedStatusCode: http.StatusBadRequest,
expectedErrMsg: "received message larger than max",
encodingType: "gzip",
gzipCompression: true,
},
{
description: "invalid encoding type: snappy",
maxRecvMsgSize: 10000,
format: jsonContentType,
contentType: jsonContentType,
expectedStatusCode: http.StatusBadRequest,
encodingType: "snappy",
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")
var req *http.Request

compressionFunc := func(t *testing.T, body []byte) []byte {
var b bytes.Buffer
gz := gzip.NewWriter(&b)
_, err := gz.Write(body)
require.NoError(t, err)
require.NoError(t, gz.Close())

return b.Bytes()
}

if test.format == pbContentType {
buf, err := exportRequest.MarshalProto()
require.NoError(t, err)

if test.gzipCompression {
buf = compressionFunc(t, buf)
}

req, err = http.NewRequestWithContext(ctx, "", "", bytes.NewReader(buf))
require.NoError(t, err)
req.Header.Set("Content-Type", pbContentType)
req.Header.Set("Content-Encoding", test.encodingType)
} else {
buf, err := exportRequest.MarshalJSON()
require.NoError(t, err)

if test.gzipCompression {
buf = compressionFunc(t, buf)
}

req, err = http.NewRequestWithContext(ctx, "", "", bytes.NewReader(buf))
require.NoError(t, err)
req.Header.Set("Content-Type", jsonContentType)
req.Header.Set("Content-Encoding", test.encodingType)
}
req, err := getOTLPHttpRequest(&exportRequest, test.contentType, test.encodingType)
require.NoError(t, err)

push := verifyOTLPWriteRequestHandler(t, cortexpb.API)
overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil)
Expand All @@ -368,7 +475,7 @@ func TestOTLPWriteHandler(t *testing.T) {
}
}

func generateOTLPWriteRequest(t *testing.T) pmetricotlp.ExportRequest {
func generateOTLPWriteRequest() pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()

// Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
Expand Down
Loading