Skip to content

Commit 87dae49

Browse files
authored
OTLP: Add a AllowDeltaTemporality flag to ingest delta temporality metric (#6934)
1 parent 0691662 commit 87dae49

File tree

8 files changed

+324
-16
lines changed

8 files changed

+324
-16
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
55
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
66
* [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727
7+
* [FEATURE] Distributor: Add an experimental `-distributor.otlp.allow-delta-temporality` flag to ingest delta temporality otlp metrics. #6934
78
* [FEATURE] Query Frontend: Add dynamic interval size for query splitting. This is enabled by configuring experimental flags `querier.max-shards-per-query` and/or `querier.max-fetched-data-duration-per-query`. The split interval size is dynamically increased to maintain a number of shards and total duration fetched below the configured values. #6458
89
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
910
* [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3010,6 +3010,10 @@ otlp:
30103010
# https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)
30113011
# CLI flag: -distributor.otlp.disable-target-info
30123012
[disable_target_info: <boolean> | default = false]
3013+
3014+
# EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.
3015+
# CLI flag: -distributor.otlp.allow-delta-temporality
3016+
[allow_delta_temporality: <boolean> | default = false]
30133017
```
30143018

30153019
### `etcd_config`

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ Currently experimental features are:
116116
- `store-gateway.sharding-ring.final-sleep` (duration) CLI flag
117117
- `alertmanager-sharding-ring.final-sleep` (duration) CLI flag
118118
- OTLP Receiver
119+
- Ingest delta temporality OTLP metrics (`-distributor.otlp.allow-delta-temporality=true`)
119120
- Persistent tokens in the Ruler Ring:
120121
- `-ruler.ring.tokens-file-path` (path) CLI flag
121122
- Native Histograms

integration/e2ecortex/client.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries, metadata []promp
236236
return metrics
237237
}
238238

239-
func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportRequest {
239+
func otlpWriteRequest(name string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) pmetricotlp.ExportRequest {
240240
d := pmetric.NewMetrics()
241241

242242
// Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
@@ -261,7 +261,7 @@ func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportReq
261261
counterMetric.SetDescription("test-counter-description")
262262

263263
counterMetric.SetEmptySum()
264-
counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
264+
counterMetric.Sum().SetAggregationTemporality(temporality)
265265

266266
counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty()
267267
counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
@@ -276,8 +276,8 @@ func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportReq
276276
return pmetricotlp.NewExportRequestFromMetrics(d)
277277
}
278278

279-
func (c *Client) OTLPPushExemplar(name string, labels ...prompb.Label) (*http.Response, error) {
280-
data, err := otlpWriteRequest(name, labels...).MarshalProto()
279+
func (c *Client) OTLPPushExemplar(name string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) (*http.Response, error) {
280+
data, err := otlpWriteRequest(name, temporality, labels...).MarshalProto()
281281
if err != nil {
282282
return nil, err
283283
}

integration/otlp_test.go

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/stretchr/testify/assert"
1919
"github.com/stretchr/testify/require"
2020
"github.com/thanos-io/objstore/providers/s3"
21+
"go.opentelemetry.io/collector/pdata/pmetric"
2122

2223
"github.com/cortexproject/cortex/integration/e2e"
2324
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
@@ -149,7 +150,7 @@ func TestOTLPIngestExemplar(t *testing.T) {
149150
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
150151
require.NoError(t, err)
151152

152-
res, err := c.OTLPPushExemplar("exemplar_1")
153+
res, err := c.OTLPPushExemplar("exemplar_1", pmetric.AggregationTemporalityCumulative)
153154
require.NoError(t, err)
154155
require.Equal(t, 200, res.StatusCode)
155156

@@ -241,15 +242,15 @@ func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) {
241242
{Name: "attr3", Value: "value"},
242243
}
243244

244-
res, err := c1.OTLPPushExemplar("series_1", labels...)
245+
res, err := c1.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...)
245246
require.NoError(t, err)
246247
require.Equal(t, 200, res.StatusCode)
247248

248-
res, err = c2.OTLPPushExemplar("series_1", labels...)
249+
res, err = c2.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...)
249250
require.NoError(t, err)
250251
require.Equal(t, 200, res.StatusCode)
251252

252-
res, err = c3.OTLPPushExemplar("series_1", labels...)
253+
res, err = c3.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...)
253254
require.NoError(t, err)
254255
require.Equal(t, 200, res.StatusCode)
255256

@@ -265,3 +266,57 @@ func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) {
265266
require.NoError(t, err)
266267
require.Equal(t, labelSet3, []string{"__name__", "attr1", "attr2", "attr3", "instance", "job"})
267268
}
269+
270+
func TestOTLPPushDeltaTemporality(t *testing.T) {
271+
s, err := e2e.NewScenario(networkName)
272+
require.NoError(t, err)
273+
defer s.Close()
274+
275+
// Start dependencies.
276+
minio := e2edb.NewMinio(9000, bucketName)
277+
require.NoError(t, s.StartAndWaitReady(minio))
278+
279+
// Configure the blocks storage to frequently compact TSDB head
280+
// and ship blocks to the storage.
281+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
282+
"-auth.enabled": "true",
283+
284+
// OTLP
285+
"-distributor.otlp.allow-delta-temporality": "true",
286+
287+
// alert manager
288+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
289+
"-alertmanager-storage.backend": "local",
290+
"-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"),
291+
})
292+
293+
// make alert manager config dir
294+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
295+
296+
require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks-local.yaml", cortexConfigFile))
297+
298+
// start cortex and assert runtime-config is loaded correctly
299+
cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex", cortexConfigFile, flags, "", 9009, 9095)
300+
require.NoError(t, s.StartAndWaitReady(cortex))
301+
302+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
303+
require.NoError(t, err)
304+
305+
// Push some series to Cortex.
306+
now := time.Now()
307+
308+
labels := []prompb.Label{
309+
{Name: "service.name", Value: "test-service"},
310+
{Name: "attr1", Value: "value"},
311+
}
312+
313+
res, err := c.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityDelta, labels...)
314+
require.NoError(t, err)
315+
require.Equal(t, 200, res.StatusCode)
316+
317+
value, err := c.Query("series_1", now)
318+
require.NoError(t, err)
319+
vector, ok := value.(model.Vector)
320+
require.True(t, ok)
321+
require.Equal(t, 1, len(vector))
322+
}

pkg/distributor/distributor.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,9 @@ type InstanceLimits struct {
192192
}
193193

194194
type OTLPConfig struct {
195-
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
196-
DisableTargetInfo bool `yaml:"disable_target_info"`
195+
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
196+
DisableTargetInfo bool `yaml:"disable_target_info"`
197+
AllowDeltaTemporality bool `yaml:"allow_delta_temporality"`
197198
}
198199

199200
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -220,6 +221,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
220221

221222
f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.")
222223
f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)")
224+
f.BoolVar(&cfg.OTLPConfig.AllowDeltaTemporality, "distributor.otlp.allow-delta-temporality", false, "EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.")
223225
}
224226

225227
// Validate config and returns error on failure

pkg/util/push/otlp.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
6767

6868
// otlp to prompb TimeSeries
6969
promTsList, promMetadata, err := convertToPromTS(r.Context(), req.Metrics(), cfg, overrides, userID, logger)
70-
if err != nil {
70+
if err != nil && len(promTsList) == 0 {
7171
http.Error(w, err.Error(), http.StatusBadRequest)
7272
return
7373
}
@@ -178,8 +178,9 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
178178
func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, []prompb.MetricMetadata, error) {
179179
promConverter := prometheusremotewrite.NewPrometheusConverter()
180180
settings := prometheusremotewrite.Settings{
181-
AddMetricSuffixes: true,
182-
DisableTargetInfo: cfg.DisableTargetInfo,
181+
AddMetricSuffixes: true,
182+
DisableTargetInfo: cfg.DisableTargetInfo,
183+
AllowDeltaTemporality: cfg.AllowDeltaTemporality,
183184
}
184185

185186
var annots annotations.Annotations
@@ -200,11 +201,10 @@ func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distribu
200201
}
201202

202203
if err != nil {
203-
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
204-
return nil, nil, err
204+
level.Warn(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
205205
}
206206

207-
return promConverter.TimeSeries(), promConverter.Metadata(), nil
207+
return promConverter.TimeSeries(), promConverter.Metadata(), err
208208
}
209209

210210
func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter {

0 commit comments

Comments
 (0)