diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index cd0d3059ab2..cdf1259d122 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -73,7 +73,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri } // convert prompb to cortexpb TimeSeries - tsList := []cortexpb.PreallocTimeseries(nil) + tsList := make([]cortexpb.PreallocTimeseries, 0, len(promTsList)) for _, v := range promTsList { tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{ Labels: makeLabels(v.Labels), diff --git a/pkg/util/push/otlp_test.go b/pkg/util/push/otlp_test.go index c02b98d68ef..84029f76661 100644 --- a/pkg/util/push/otlp_test.go +++ b/pkg/util/push/otlp_test.go @@ -620,7 +620,7 @@ func getOTLPHttpRequest(otlpRequest *pmetricotlp.ExportRequest, contentType, enc return req, nil } -func BenchmarkOTLPWriteHandler(b *testing.B) { +func BenchmarkOTLPWriteHandlerCompression(b *testing.B) { cfg := distributor.OTLPConfig{ ConvertAllAttributes: false, DisableTargetInfo: false, @@ -695,6 +695,90 @@ func BenchmarkOTLPWriteHandler(b *testing.B) { }) } +func BenchmarkOTLPWriteHandlerPush(b *testing.B) { + cfg := distributor.OTLPConfig{ + ConvertAllAttributes: false, + DisableTargetInfo: false, + } + overrides := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) + + mockPushFunc := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { + return &cortexpb.WriteResponse{}, nil + } + handler := OTLPHandler(1000000, overrides, cfg, nil, mockPushFunc) + + tests := []struct { + description string + numSeries int + samplesPerSeries int + numHistograms int + }{ + { + numSeries: 1, + samplesPerSeries: 10, + numHistograms: 1, + }, + { + numSeries: 1, + samplesPerSeries: 100, + numHistograms: 1, + }, + { + numSeries: 1, + samplesPerSeries: 1000, + numHistograms: 1, + }, + { + numSeries: 1, + samplesPerSeries: 1, + numHistograms: 10, + }, + { + numSeries: 1, + samplesPerSeries: 1, + numHistograms: 100, + }, + { + numSeries: 1, + samplesPerSeries: 1, + numHistograms: 1000, + }, + { + numSeries: 10, + samplesPerSeries: 1, + numHistograms: 1, + }, + { + numSeries: 100, + samplesPerSeries: 1, + numHistograms: 1, + }, + { + numSeries: 1000, + samplesPerSeries: 1, + numHistograms: 1, + }, + } + + for _, test := range tests { + b.Run(fmt.Sprintf("numSeries:%d, samplesPerSeries:%d, numHistograms:%d", test.numSeries, test.samplesPerSeries, test.numHistograms), func(b *testing.B) { + exportRequest := generateOTLPWriteRequestWithSeries(test.numSeries, test.samplesPerSeries, test.numHistograms) + req, err := getOTLPHttpRequest(&exportRequest, pbContentType, "gzip") + require.NoError(b, err) + + b.ReportAllocs() + for b.Loop() { + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(b, http.StatusOK, resp.StatusCode) + req.Body.(*resetReader).Reset() + } + }) + } +} + func TestOTLPWriteHandler(t *testing.T) { cfg := distributor.OTLPConfig{ ConvertAllAttributes: false, @@ -800,6 +884,87 @@ func TestOTLPWriteHandler(t *testing.T) { } } +func generateOTLPWriteRequestWithSeries(numSeries, samplesPerSeries, numHistogram int) pmetricotlp.ExportRequest { + d := pmetric.NewMetrics() + + attributes := pcommon.NewMap() + attributes.PutStr("label1", "value1") + attributes.PutStr("label2", "value2") + attributes.PutStr("label3", "value3") + + for i := 0; i < numSeries; i++ { + metricName := fmt.Sprintf("series_%d", i) + metricUnit := fmt.Sprintf("unit_%d", i) + metricDescription := fmt.Sprintf("description_%d", i) + + resourceMetric := d.ResourceMetrics().AppendEmpty() + resourceMetric.Resource().Attributes().PutStr("service.name", "test-service") + resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance") + resourceMetric.Resource().Attributes().PutStr("host.name", "test-host") + + scopeMetric := resourceMetric.ScopeMetrics() + metric := scopeMetric.AppendEmpty().Metrics().AppendEmpty() + + // set metadata + metric.SetName(metricName) + metric.SetDescription(metricDescription) + metric.SetUnit(metricUnit) + metric.SetEmptyGauge() + + for j := 0; j < samplesPerSeries; j++ { + v := float64(j + i) + ts := time.Now().Add(time.Second * 30 * time.Duration(samplesPerSeries-j+1)) + dataPoint := metric.Gauge().DataPoints().AppendEmpty() + dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + dataPoint.SetDoubleValue(v) + attributes.CopyTo(dataPoint.Attributes()) + + // exemplar + exemplar := dataPoint.Exemplars().AppendEmpty() + exemplar.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + exemplar.SetDoubleValue(v) + exemplar.SetSpanID(pcommon.SpanID{0, 1, 2, 3, 4, 5, 6, 7}) + exemplar.SetTraceID(pcommon.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}) + } + + for j := 0; j < numHistogram; j++ { + ts := time.Now().Add(time.Second * 30 * time.Duration(numHistogram-j+1)) + // Generate One Histogram + histogramMetric := scopeMetric.AppendEmpty().Metrics().AppendEmpty() + histogramMetric.SetName(fmt.Sprintf("test-histogram_%d", j)) + histogramMetric.SetDescription(fmt.Sprintf("test-histogram-description_%d", j)) + histogramMetric.SetEmptyHistogram() + histogramMetric.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + histogramDataPoint := histogramMetric.Histogram().DataPoints().AppendEmpty() + histogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + histogramDataPoint.ExplicitBounds().FromRaw([]float64{0.0, 1.0, 2.0, 3.0, 4.0, 5.0}) + histogramDataPoint.BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2, 2}) + histogramDataPoint.SetCount(10) + histogramDataPoint.SetSum(30.0) + attributes.CopyTo(histogramDataPoint.Attributes()) + + // Generate One Exponential-Histogram + exponentialHistogramMetric := scopeMetric.AppendEmpty().Metrics().AppendEmpty() + exponentialHistogramMetric.SetName(fmt.Sprintf("test-exponential-histogram_%d", j)) + exponentialHistogramMetric.SetDescription(fmt.Sprintf("test-exponential-histogram-description_%d", j)) + exponentialHistogramMetric.SetEmptyExponentialHistogram() + exponentialHistogramMetric.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + exponentialHistogramDataPoint := exponentialHistogramMetric.ExponentialHistogram().DataPoints().AppendEmpty() + exponentialHistogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + exponentialHistogramDataPoint.SetScale(2.0) + exponentialHistogramDataPoint.Positive().BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2}) + exponentialHistogramDataPoint.SetZeroCount(2) + exponentialHistogramDataPoint.SetCount(10) + exponentialHistogramDataPoint.SetSum(30.0) + attributes.CopyTo(exponentialHistogramDataPoint.Attributes()) + } + } + + return pmetricotlp.NewExportRequestFromMetrics(d) +} + func generateOTLPWriteRequest() pmetricotlp.ExportRequest { d := pmetric.NewMetrics()