Skip to content

Commit 394cc1b

Browse files
authored
Support otlp metadata ingestion (#6617)
Signed-off-by: SungJin1212 <[email protected]>
1 parent 4bf57d9 commit 394cc1b

File tree

5 files changed

+61
-15
lines changed

5 files changed

+61
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* [ENHANCEMENT] Querier: Apply bytes limiter to LabelNames and LabelValuesForLabelNames. #6568
1313
* [ENHANCEMENT] Query Frontend: Add a `too_many_tenants` reason label value to `cortex_rejected_queries_total` metric to track the rejected query count due to the # of tenant limits. #6569
1414
* [ENHANCEMENT] Alertmanager: Add receiver validations for msteamsv2 and rocketchat. #6606
15+
* [ENHANCEMENT] OTLP: Support otlp metadata ingestion. #6617
1516
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
1617
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
1718
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

integration/e2ecortex/client.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,14 +215,18 @@ func convertBucketLayout(bucket pmetric.ExponentialHistogramDataPointBuckets, sp
215215
}
216216

217217
// Convert Timeseries to Metrics
218-
func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics {
218+
func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries, metadata []prompb.MetricMetadata) pmetric.Metrics {
219219
metrics := pmetric.NewMetrics()
220-
for _, ts := range timeseries {
220+
for i, ts := range timeseries {
221221
metricName, attributes := getNameAndAttributes(ts)
222222
newMetric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
223223
newMetric.SetName(metricName)
224-
//TODO Set description for new metric
225-
//TODO Set unit for new metric
224+
225+
if metadata != nil {
226+
newMetric.SetDescription(metadata[i].GetHelp())
227+
newMetric.SetUnit(metadata[i].GetUnit())
228+
}
229+
226230
if len(ts.Samples) > 0 {
227231
createDataPointsGauge(newMetric, attributes, ts.Samples)
228232
} else if len(ts.Histograms) > 0 {
@@ -302,9 +306,9 @@ func (c *Client) OTLPPushExemplar(name string, labels ...prompb.Label) (*http.Re
302306
}
303307

304308
// Push series to OTLP endpoint
305-
func (c *Client) OTLP(timeseries []prompb.TimeSeries) (*http.Response, error) {
309+
func (c *Client) OTLP(timeseries []prompb.TimeSeries, metadata []prompb.MetricMetadata) (*http.Response, error) {
306310

307-
data, err := pmetricotlp.NewExportRequestFromMetrics(convertTimeseriesToMetrics(timeseries)).MarshalProto()
311+
data, err := pmetricotlp.NewExportRequestFromMetrics(convertTimeseriesToMetrics(timeseries, metadata)).MarshalProto()
308312
if err != nil {
309313
return nil, err
310314
}
@@ -356,6 +360,12 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
356360
return value, err
357361
}
358362

363+
// Metadata runs a metadata query
364+
func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) {
365+
metadata, err := c.querierClient.Metadata(context.Background(), name, limit)
366+
return metadata, err
367+
}
368+
359369
// QueryExemplars runs an exemplars query
360370
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
361371
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)

integration/otlp_test.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,21 @@ func TestOTLP(t *testing.T) {
6161

6262
// Push some series to Cortex.
6363
now := time.Now()
64-
series, expectedVector := generateSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"})
64+
series, expectedVector := generateSeries("series_1_total", now, prompb.Label{Name: "foo", Value: "bar"})
6565

66-
res, err := c.OTLP(series)
66+
metadata := []prompb.MetricMetadata{
67+
{
68+
Help: "help",
69+
Unit: "total",
70+
},
71+
}
72+
73+
res, err := c.OTLP(series, metadata)
6774
require.NoError(t, err)
6875
require.Equal(t, 200, res.StatusCode)
6976

7077
// Query the series.
71-
result, err := c.Query("series_1", now)
78+
result, err := c.Query("series_1_total", now)
7279
require.NoError(t, err)
7380
require.Equal(t, model.ValVector, result.Type())
7481
assert.Equal(t, expectedVector, result.(model.Vector))
@@ -81,13 +88,17 @@ func TestOTLP(t *testing.T) {
8188
require.NoError(t, err)
8289
require.Equal(t, []string{"__name__", "foo"}, labelNames)
8390

91+
metadataResult, err := c.Metadata("series_1", "")
92+
require.NoError(t, err)
93+
require.Equal(t, 1, len(metadataResult))
94+
8495
// Check that a range query does not return an error to sanity check the queryrange tripperware.
8596
_, err = c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second)
8697
require.NoError(t, err)
8798

8899
i := rand.Uint32()
89100
histogramSeries := e2e.GenerateHistogramSeries("histogram_series", now, i, false, prompb.Label{Name: "job", Value: "test"})
90-
res, err = c.OTLP(histogramSeries)
101+
res, err = c.OTLP(histogramSeries, nil)
91102
require.NoError(t, err)
92103
require.Equal(t, 200, res.StatusCode)
93104

pkg/util/push/otlp.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
6565
}
6666

6767
// otlp to prompb TimeSeries
68-
promTsList, err := convertToPromTS(r.Context(), req.Metrics(), cfg, overrides, userID, logger)
68+
promTsList, promMetadata, err := convertToPromTS(r.Context(), req.Metrics(), cfg, overrides, userID, logger)
6969
if err != nil {
7070
http.Error(w, err.Error(), http.StatusBadRequest)
7171
return
@@ -81,7 +81,11 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
8181
Histograms: makeHistograms(v.Histograms),
8282
}})
8383
}
84+
85+
metadata := makeMetadata(promMetadata)
86+
8487
prwReq.Timeseries = tsList
88+
prwReq.Metadata = metadata
8589

8690
if _, err := push(ctx, &prwReq); err != nil {
8791
resp, ok := httpgrpc.HTTPResponseFromError(err)
@@ -99,6 +103,19 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
99103
})
100104
}
101105

106+
func makeMetadata(promMetadata []prompb.MetricMetadata) []*cortexpb.MetricMetadata {
107+
metadata := make([]*cortexpb.MetricMetadata, 0, len(promMetadata))
108+
for _, m := range promMetadata {
109+
metadata = append(metadata, &cortexpb.MetricMetadata{
110+
Type: cortexpb.MetricMetadata_MetricType(m.Type),
111+
MetricFamilyName: m.MetricFamilyName,
112+
Help: m.Help,
113+
Unit: m.Unit,
114+
})
115+
}
116+
return metadata
117+
}
118+
102119
func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (pmetricotlp.ExportRequest, error) {
103120
expectedSize := int(r.ContentLength)
104121
if expectedSize > maxSize {
@@ -157,7 +174,7 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
157174
return decoderFunc(r.Body)
158175
}
159176

160-
func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, error) {
177+
func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, []prompb.MetricMetadata, error) {
161178
promConverter := prometheusremotewrite.NewPrometheusConverter()
162179
settings := prometheusremotewrite.Settings{
163180
AddMetricSuffixes: true,
@@ -181,9 +198,10 @@ func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distribu
181198

182199
if err != nil {
183200
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
184-
return nil, err
201+
return nil, nil, err
185202
}
186-
return promConverter.TimeSeries(), nil
203+
204+
return promConverter.TimeSeries(), promConverter.Metadata(), nil
187205
}
188206

189207
func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter {

pkg/util/push/otlp_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,15 @@ func TestOTLPConvertToPromTS(t *testing.T) {
195195
}
196196
overrides, err := validation.NewOverrides(limits, nil)
197197
require.NoError(t, err)
198-
tsList, err := convertToPromTS(ctx, d, test.cfg, overrides, "user-1", logger)
198+
tsList, metadata, err := convertToPromTS(ctx, d, test.cfg, overrides, "user-1", logger)
199199
require.NoError(t, err)
200200

201+
// test metadata conversion
202+
require.Equal(t, 1, len(metadata))
203+
require.Equal(t, prompb.MetricMetadata_MetricType(1), metadata[0].Type)
204+
require.Equal(t, "test_counter_total", metadata[0].MetricFamilyName)
205+
require.Equal(t, "test-counter-description", metadata[0].Help)
206+
201207
if test.cfg.DisableTargetInfo {
202208
require.Equal(t, 1, len(tsList)) // test_counter_total
203209
} else {

0 commit comments

Comments
 (0)