Skip to content

Commit 1ef21a1

Browse files
authored
Add enable type and unit labels flag (#6969)
* Add EnableTypeAndUnitLabels flag to add type and unit label Signed-off-by: SungJin1212 <[email protected]> * fix lint Signed-off-by: SungJin1212 <[email protected]> --------- Signed-off-by: SungJin1212 <[email protected]>
1 parent cfce88f commit 1ef21a1

File tree

8 files changed

+181
-34
lines changed

8 files changed

+181
-34
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
55
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
66
* [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727
7+
* [FEATURE] Distributor: Add an experimental `-distributor.otlp.enable-type-and-unit-labels` flag to add `__type__` and `__unit__` labels for OTLP metrics. #6969
78
* [FEATURE] Distributor: Add an experimental `-distributor.otlp.allow-delta-temporality` flag to ingest delta temporality otlp metrics. #6934
89
* [FEATURE] Query Frontend: Add dynamic interval size for query splitting. This is enabled by configuring experimental flags `querier.max-shards-per-query` and/or `querier.max-fetched-data-duration-per-query`. The split interval size is dynamically increased to maintain a number of shards and total duration fetched below the configured values. #6458
910
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3265,6 +3265,11 @@ otlp:
32653265
# EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.
32663266
# CLI flag: -distributor.otlp.allow-delta-temporality
32673267
[allow_delta_temporality: <boolean> | default = false]
3268+
3269+
# EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for
3270+
# the OTLP metrics.
3271+
# CLI flag: -distributor.otlp.enable-type-and-unit-labels
3272+
[enable_type_and_unit_labels: <boolean> | default = false]
32683273
```
32693274

32703275
### `etcd_config`

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ Currently experimental features are:
118118
- `alertmanager-sharding-ring.final-sleep` (duration) CLI flag
119119
- OTLP Receiver
120120
- Ingest delta temporality OTLP metrics (`-distributor.otlp.allow-delta-temporality=true`)
121+
- Add `__type__` and `__unit__` labels (`-distributor.otlp.enable-type-and-unit-labels`)
121122
- Persistent tokens in the Ruler Ring:
122123
- `-ruler.ring.tokens-file-path` (path) CLI flag
123124
- Native Histograms

integration/e2ecortex/client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries, metadata []promp
270270
return metrics
271271
}
272272

273-
func otlpWriteRequest(name string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) pmetricotlp.ExportRequest {
273+
func otlpWriteRequest(name, unit string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) pmetricotlp.ExportRequest {
274274
d := pmetric.NewMetrics()
275275

276276
// Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
@@ -292,6 +292,7 @@ func otlpWriteRequest(name string, temporality pmetric.AggregationTemporality, l
292292
// Generate One Counter
293293
counterMetric := scopeMetric.Metrics().AppendEmpty()
294294
counterMetric.SetName(name)
295+
counterMetric.SetUnit(unit)
295296
counterMetric.SetDescription("test-counter-description")
296297

297298
counterMetric.SetEmptySum()
@@ -310,8 +311,8 @@ func otlpWriteRequest(name string, temporality pmetric.AggregationTemporality, l
310311
return pmetricotlp.NewExportRequestFromMetrics(d)
311312
}
312313

313-
func (c *Client) OTLPPushExemplar(name string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) (*http.Response, error) {
314-
data, err := otlpWriteRequest(name, temporality, labels...).MarshalProto()
314+
func (c *Client) OTLPPushExemplar(name, unit string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) (*http.Response, error) {
315+
data, err := otlpWriteRequest(name, unit, temporality, labels...).MarshalProto()
315316
if err != nil {
316317
return nil, err
317318
}

integration/otlp_test.go

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func TestOTLPIngestExemplar(t *testing.T) {
150150
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
151151
require.NoError(t, err)
152152

153-
res, err := c.OTLPPushExemplar("exemplar_1", pmetric.AggregationTemporalityCumulative)
153+
res, err := c.OTLPPushExemplar("exemplar_1", "", pmetric.AggregationTemporalityCumulative)
154154
require.NoError(t, err)
155155
require.Equal(t, 200, res.StatusCode)
156156

@@ -242,15 +242,15 @@ func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) {
242242
{Name: "attr3", Value: "value"},
243243
}
244244

245-
res, err := c1.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...)
245+
res, err := c1.OTLPPushExemplar("series_1", "", pmetric.AggregationTemporalityCumulative, labels...)
246246
require.NoError(t, err)
247247
require.Equal(t, 200, res.StatusCode)
248248

249-
res, err = c2.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...)
249+
res, err = c2.OTLPPushExemplar("series_1", "", pmetric.AggregationTemporalityCumulative, labels...)
250250
require.NoError(t, err)
251251
require.Equal(t, 200, res.StatusCode)
252252

253-
res, err = c3.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...)
253+
res, err = c3.OTLPPushExemplar("series_1", "", pmetric.AggregationTemporalityCumulative, labels...)
254254
require.NoError(t, err)
255255
require.Equal(t, 200, res.StatusCode)
256256

@@ -267,6 +267,65 @@ func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) {
267267
require.Equal(t, labelSet3, []string{"__name__", "attr1", "attr2", "attr3", "instance", "job"})
268268
}
269269

270+
func TestOTLPEnableTypeAndUnitLabels(t *testing.T) {
271+
s, err := e2e.NewScenario(networkName)
272+
require.NoError(t, err)
273+
defer s.Close()
274+
275+
// Start dependencies.
276+
minio := e2edb.NewMinio(9000, bucketName)
277+
require.NoError(t, s.StartAndWaitReady(minio))
278+
279+
// Configure the blocks storage to frequently compact TSDB head
280+
// and ship blocks to the storage.
281+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
282+
"-auth.enabled": "true",
283+
284+
// OTLP
285+
"-distributor.otlp.enable-type-and-unit-labels": "true",
286+
287+
// alert manager
288+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
289+
"-alertmanager-storage.backend": "local",
290+
"-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"),
291+
})
292+
293+
// make alert manager config dir
294+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
295+
296+
require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks-local.yaml", cortexConfigFile))
297+
298+
// start cortex and assert runtime-config is loaded correctly
299+
cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex", cortexConfigFile, flags, "", 9009, 9095)
300+
require.NoError(t, s.StartAndWaitReady(cortex))
301+
302+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
303+
require.NoError(t, err)
304+
305+
// Push some series to Cortex.
306+
now := time.Now()
307+
308+
labels := []prompb.Label{
309+
{Name: "service.name", Value: "test-service"},
310+
{Name: "attr1", Value: "value"},
311+
}
312+
313+
res, err := c.OTLPPushExemplar("series_1", "seconds", pmetric.AggregationTemporalityCumulative, labels...)
314+
require.NoError(t, err)
315+
require.Equal(t, 200, res.StatusCode)
316+
317+
value, err := c.Query("series_1_seconds", now)
318+
require.NoError(t, err)
319+
vector, ok := value.(model.Vector)
320+
fmt.Println("vector", vector)
321+
require.True(t, ok)
322+
require.Equal(t, 1, len(vector))
323+
324+
metric := vector[0].Metric
325+
require.Equal(t, model.LabelValue("seconds"), metric["__unit__"])
326+
require.Equal(t, model.LabelValue("gauge"), metric["__type__"])
327+
}
328+
270329
func TestOTLPPushDeltaTemporality(t *testing.T) {
271330
s, err := e2e.NewScenario(networkName)
272331
require.NoError(t, err)
@@ -310,7 +369,7 @@ func TestOTLPPushDeltaTemporality(t *testing.T) {
310369
{Name: "attr1", Value: "value"},
311370
}
312371

313-
res, err := c.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityDelta, labels...)
372+
res, err := c.OTLPPushExemplar("series_1", "", pmetric.AggregationTemporalityDelta, labels...)
314373
require.NoError(t, err)
315374
require.Equal(t, 200, res.StatusCode)
316375

pkg/distributor/distributor.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,10 @@ type InstanceLimits struct {
193193
}
194194

195195
type OTLPConfig struct {
196-
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
197-
DisableTargetInfo bool `yaml:"disable_target_info"`
198-
AllowDeltaTemporality bool `yaml:"allow_delta_temporality"`
196+
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
197+
DisableTargetInfo bool `yaml:"disable_target_info"`
198+
AllowDeltaTemporality bool `yaml:"allow_delta_temporality"`
199+
EnableTypeAndUnitLabels bool `yaml:"enable_type_and_unit_labels"`
199200
}
200201

201202
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -224,6 +225,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
224225
f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.")
225226
f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)")
226227
f.BoolVar(&cfg.OTLPConfig.AllowDeltaTemporality, "distributor.otlp.allow-delta-temporality", false, "EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.")
228+
f.BoolVar(&cfg.OTLPConfig.EnableTypeAndUnitLabels, "distributor.otlp.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for the OTLP metrics.")
227229
}
228230

229231
// Validate config and returns error on failure

pkg/util/push/otlp.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,10 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
178178
func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, []prompb.MetricMetadata, error) {
179179
promConverter := prometheusremotewrite.NewPrometheusConverter()
180180
settings := prometheusremotewrite.Settings{
181-
AddMetricSuffixes: true,
182-
DisableTargetInfo: cfg.DisableTargetInfo,
183-
AllowDeltaTemporality: cfg.AllowDeltaTemporality,
181+
AddMetricSuffixes: true,
182+
DisableTargetInfo: cfg.DisableTargetInfo,
183+
AllowDeltaTemporality: cfg.AllowDeltaTemporality,
184+
EnableTypeAndUnitLabels: cfg.EnableTypeAndUnitLabels,
184185
}
185186

186187
var annots annotations.Annotations

0 commit comments

Comments
 (0)