Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
}

// convert prompb to cortexpb TimeSeries
tsList := []cortexpb.PreallocTimeseries(nil)
tsList := make([]cortexpb.PreallocTimeseries, 0, len(promTsList))
for _, v := range promTsList {
tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{
Labels: makeLabels(v.Labels),
Expand Down
172 changes: 171 additions & 1 deletion pkg/util/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,11 @@ func getOTLPHttpRequest(otlpRequest *pmetricotlp.ExportRequest, contentType, enc
return req, nil
}

func BenchmarkOTLPWriteHandler(b *testing.B) {
func BenchmarkOTLPHandlerWithNumOfSeries(b *testing.B) {

}

func BenchmarkOTLPWriteHandlerCompression(b *testing.B) {
cfg := distributor.OTLPConfig{
ConvertAllAttributes: false,
DisableTargetInfo: false,
Expand Down Expand Up @@ -695,6 +699,91 @@ func BenchmarkOTLPWriteHandler(b *testing.B) {
})
}

func BenchmarkOTLPWriteHandlerPush(b *testing.B) {
cfg := distributor.OTLPConfig{
ConvertAllAttributes: false,
DisableTargetInfo: false,
}
overrides := validation.NewOverrides(querier.DefaultLimitsConfig(), nil)

mockPushFunc := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
return &cortexpb.WriteResponse{}, nil
}
handler := OTLPHandler(1000000, overrides, cfg, nil, mockPushFunc)

tests := []struct {
description string
numSeries int
samplesPerSeries int
numHistograms int
}{
{
numSeries: 1,
samplesPerSeries: 10,
numHistograms: 1,
},
{
numSeries: 1,
samplesPerSeries: 100,
numHistograms: 1,
},
{
numSeries: 1,
samplesPerSeries: 1000,
numHistograms: 1,
},
{
numSeries: 1,
samplesPerSeries: 1,
numHistograms: 10,
},
{
numSeries: 1,
samplesPerSeries: 1,
numHistograms: 100,
},
{
numSeries: 1,
samplesPerSeries: 1,
numHistograms: 1000,
},
{
numSeries: 10,
samplesPerSeries: 1,
numHistograms: 1,
},
{
numSeries: 100,
samplesPerSeries: 1,
numHistograms: 1,
},
{
numSeries: 1000,
samplesPerSeries: 1,
numHistograms: 1,
},
}

for _, test := range tests {
b.Run(fmt.Sprintf("numSeries:%d, samplesPerSeries:%d, numHistograms:%d", test.numSeries, test.samplesPerSeries, test.numHistograms), func(b *testing.B) {
exportRequest := generateOTLPWriteRequestWithSeries(test.numSeries, test.samplesPerSeries, test.numHistograms)
req, err := getOTLPHttpRequest(&exportRequest, pbContentType, "gzip")
require.NoError(b, err)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

resp := recorder.Result()
require.Equal(b, http.StatusOK, resp.StatusCode)
req.Body.(*resetReader).Reset()
}
})
}
}

func TestOTLPWriteHandler(t *testing.T) {
cfg := distributor.OTLPConfig{
ConvertAllAttributes: false,
Expand Down Expand Up @@ -800,6 +889,87 @@ func TestOTLPWriteHandler(t *testing.T) {
}
}

func generateOTLPWriteRequestWithSeries(numSeries, samplesPerSeries, numHistogram int) pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()

attributes := pcommon.NewMap()
attributes.PutStr("label1", "value1")
attributes.PutStr("label2", "value2")
attributes.PutStr("label3", "value3")

for i := 0; i < numSeries; i++ {
metricName := fmt.Sprintf("series_%d", i)
metricUnit := fmt.Sprintf("unit_%d", i)
metricDescription := fmt.Sprintf("description_%d", i)

resourceMetric := d.ResourceMetrics().AppendEmpty()
resourceMetric.Resource().Attributes().PutStr("service.name", "test-service")
resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance")
resourceMetric.Resource().Attributes().PutStr("host.name", "test-host")

scopeMetric := resourceMetric.ScopeMetrics()
metric := scopeMetric.AppendEmpty().Metrics().AppendEmpty()

// set metadata
metric.SetName(metricName)
metric.SetDescription(metricDescription)
metric.SetUnit(metricUnit)
metric.SetEmptyGauge()

for j := 0; j < samplesPerSeries; j++ {
v := float64(j + i)
ts := time.Now().Add(time.Second * 30 * time.Duration(samplesPerSeries-j+1))
dataPoint := metric.Gauge().DataPoints().AppendEmpty()
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dataPoint.SetDoubleValue(v)
attributes.CopyTo(dataPoint.Attributes())

// exemplar
exemplar := dataPoint.Exemplars().AppendEmpty()
exemplar.SetTimestamp(pcommon.NewTimestampFromTime(ts))
exemplar.SetDoubleValue(v)
exemplar.SetSpanID(pcommon.SpanID{0, 1, 2, 3, 4, 5, 6, 7})
exemplar.SetTraceID(pcommon.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
}

for j := 0; j < numHistogram; j++ {
ts := time.Now().Add(time.Second * 30 * time.Duration(numHistogram-j+1))
// Generate One Histogram
histogramMetric := scopeMetric.AppendEmpty().Metrics().AppendEmpty()
histogramMetric.SetName(fmt.Sprintf("test-histogram_%d", j))
histogramMetric.SetDescription(fmt.Sprintf("test-histogram-description_%d", j))
histogramMetric.SetEmptyHistogram()
histogramMetric.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

histogramDataPoint := histogramMetric.Histogram().DataPoints().AppendEmpty()
histogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
histogramDataPoint.ExplicitBounds().FromRaw([]float64{0.0, 1.0, 2.0, 3.0, 4.0, 5.0})
histogramDataPoint.BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2, 2})
histogramDataPoint.SetCount(10)
histogramDataPoint.SetSum(30.0)
attributes.CopyTo(histogramDataPoint.Attributes())

// Generate One Exponential-Histogram
exponentialHistogramMetric := scopeMetric.AppendEmpty().Metrics().AppendEmpty()
exponentialHistogramMetric.SetName(fmt.Sprintf("test-exponential-histogram_%d", j))
exponentialHistogramMetric.SetDescription(fmt.Sprintf("test-exponential-histogram-description_%d", j))
exponentialHistogramMetric.SetEmptyExponentialHistogram()
exponentialHistogramMetric.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

exponentialHistogramDataPoint := exponentialHistogramMetric.ExponentialHistogram().DataPoints().AppendEmpty()
exponentialHistogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
exponentialHistogramDataPoint.SetScale(2.0)
exponentialHistogramDataPoint.Positive().BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2})
exponentialHistogramDataPoint.SetZeroCount(2)
exponentialHistogramDataPoint.SetCount(10)
exponentialHistogramDataPoint.SetSum(30.0)
attributes.CopyTo(exponentialHistogramDataPoint.Attributes())
}
}

return pmetricotlp.NewExportRequestFromMetrics(d)
}

func generateOTLPWriteRequest() pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()

Expand Down
Loading