Skip to content

Commit ce9c4ba

Browse files
authored
Fix OTLP native histogram push (#6135)
1 parent b2ce96c commit ce9c4ba

File tree

6 files changed

+58
-7
lines changed

6 files changed

+58
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
* [FEATURE] Ruler: Support sending native histogram samples to Ingester. #6029
1313
* [FEATURE] Ruler: Add support for filtering out alerts in ListRules API. #6011
1414
* [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005
15-
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
15+
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071 #6135
1616
* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081
1717
* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104
1818
* [FEATURE] Store Gateway: Token bucket limiter. #6016

integration/e2ecortex/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func createDataPointsGauge(newMetric pmetric.Metric, attributes map[string]any,
173173
}
174174

175175
func createDataPointsExponentialHistogram(newMetric pmetric.Metric, attributes map[string]any, histograms []prompb.Histogram) {
176-
newMetric.SetEmptyExponentialHistogram()
176+
newMetric.SetEmptyExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
177177
for _, h := range histograms {
178178
datapoint := newMetric.ExponentialHistogram().DataPoints().AppendEmpty()
179179
datapoint.SetTimestamp(pcommon.Timestamp(h.Timestamp * time.Millisecond.Nanoseconds()))

integration/otlp_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func TestOTLP(t *testing.T) {
7777

7878
i := rand.Uint32()
7979
histogramSeries := e2e.GenerateHistogramSeries("histogram_series", now, i, false, prompb.Label{Name: "job", Value: "test"})
80-
res, err = c.Push(histogramSeries)
80+
res, err = c.OTLP(histogramSeries)
8181
require.NoError(t, err)
8282
require.Equal(t, 200, res.StatusCode)
8383

pkg/cortexpb/histograms.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,41 @@
1313

1414
package cortexpb
1515

16-
import "github.com/prometheus/prometheus/model/histogram"
16+
import (
17+
"github.com/prometheus/prometheus/model/histogram"
18+
"github.com/prometheus/prometheus/prompb"
19+
)
1720

1821
func (h Histogram) IsFloatHistogram() bool {
1922
_, ok := h.GetCount().(*Histogram_CountFloat)
2023
return ok
2124
}
2225

26+
// HistogramPromProtoToHistogramProto converts a prometheus protobuf Histogram to cortex protobuf Histogram.
27+
func HistogramPromProtoToHistogramProto(h prompb.Histogram) Histogram {
28+
ph := Histogram{
29+
Sum: h.Sum,
30+
Schema: h.Schema,
31+
ZeroThreshold: h.ZeroThreshold,
32+
NegativeSpans: spansPromProtoToSpansProto(h.NegativeSpans),
33+
NegativeDeltas: h.NegativeDeltas,
34+
NegativeCounts: h.NegativeCounts,
35+
PositiveSpans: spansPromProtoToSpansProto(h.PositiveSpans),
36+
PositiveDeltas: h.PositiveDeltas,
37+
PositiveCounts: h.PositiveCounts,
38+
ResetHint: Histogram_ResetHint(h.ResetHint),
39+
TimestampMs: h.Timestamp,
40+
}
41+
if h.IsFloatHistogram() {
42+
ph.Count = &Histogram_CountFloat{CountFloat: h.GetCountFloat()}
43+
ph.ZeroCount = &Histogram_ZeroCountFloat{ZeroCountFloat: h.GetZeroCountFloat()}
44+
} else {
45+
ph.Count = &Histogram_CountInt{CountInt: h.GetCountInt()}
46+
ph.ZeroCount = &Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}
47+
}
48+
return ph
49+
}
50+
2351
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
2452
// provided proto message. The caller has to make sure that the proto message
2553
// represents an interger histogram and not a float histogram.
@@ -118,3 +146,12 @@ func spansToSpansProto(s []histogram.Span) []BucketSpan {
118146

119147
return spans
120148
}
149+
150+
func spansPromProtoToSpansProto(s []prompb.BucketSpan) []BucketSpan {
151+
spans := make([]BucketSpan, len(s))
152+
for i := 0; i < len(s); i++ {
153+
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
154+
}
155+
156+
return spans
157+
}

pkg/util/push/otlp.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle
5858
tsList := []cortexpb.PreallocTimeseries(nil)
5959
for _, v := range promConverter.TimeSeries() {
6060
tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{
61-
Labels: makeLabels(v.Labels),
62-
Samples: makeSamples(v.Samples),
63-
Exemplars: makeExemplars(v.Exemplars),
61+
Labels: makeLabels(v.Labels),
62+
Samples: makeSamples(v.Samples),
63+
Exemplars: makeExemplars(v.Exemplars),
64+
Histograms: makeHistograms(v.Histograms),
6465
}})
6566
}
6667
prwReq.Timeseries = tsList
@@ -112,6 +113,14 @@ func makeExemplars(in []prompb.Exemplar) []cortexpb.Exemplar {
112113
return out
113114
}
114115

116+
func makeHistograms(in []prompb.Histogram) []cortexpb.Histogram {
117+
out := make([]cortexpb.Histogram, 0, len(in))
118+
for _, h := range in {
119+
out = append(out, cortexpb.HistogramPromProtoToHistogramProto(h))
120+
}
121+
return out
122+
}
123+
115124
func convertToMetricsAttributes(md pmetric.Metrics) pmetric.Metrics {
116125
cloneMd := pmetric.NewMetrics()
117126
md.CopyTo(cloneMd)

pkg/util/push/otlp_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ func verifyOTLPWriteRequestHandler(t *testing.T, expectSource cortexpb.WriteRequ
124124
// TODO: test more things
125125
assert.Equal(t, expectSource, request.Source)
126126
assert.False(t, request.SkipLabelNameValidation)
127+
for _, ts := range request.Timeseries {
128+
assert.NotEmpty(t, ts.Labels)
129+
// Make sure at least one of sample, exemplar or histogram is set.
130+
assert.True(t, len(ts.Samples) > 0 || len(ts.Exemplars) > 0 || len(ts.Histograms) > 0)
131+
}
127132
return &cortexpb.WriteResponse{}, nil
128133
}
129134
}

0 commit comments

Comments
 (0)