Skip to content

Commit c860a65

Browse files
committed
Add -distributor.enable-type-and-unit-labels per tenant flag for prw2 and otlp
Signed-off-by: SungJin1212 <[email protected]>
1 parent 93fd1c5 commit c860a65

File tree

16 files changed

+368
-42
lines changed

16 files changed

+368
-42
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## master / unreleased
44

55
* [FEATURE] StoreGateway: Introduces a new parquet mode. #7046
6+
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` which enables to add `__unit__` and `__type__` labels for remote write v2 and OTLP requests. The `-distributor.otlp.enable-type-and-unit-labels` flag has been consolidated into this flag. #7077
67
* [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092
78
* [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063
89
* [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074
@@ -35,6 +36,7 @@
3536
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
3637
* [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893
3738
* [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978
39+
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.rw2-enable-type-and-unit-labels` which enables to add `__unit__` and `__type__` labels for remote write v2 requests. #7077
3840
* [ENHANCEMENT] Upgrade the Prometheus version to 3.6.0 and add a `-name-validation-scheme` flag to support UTF-8. #7040 #7056
3941
* [ENHANCEMENT] Distributor: Emit an error with a 400 status code when empty labels are found before the relabelling or label dropping process. #7052
4042
* [ENHANCEMENT] Parquet Storage: Add support for additional sort columns during Parquet file generation #7003

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3274,11 +3274,6 @@ otlp:
32743274
# EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.
32753275
# CLI flag: -distributor.otlp.allow-delta-temporality
32763276
[allow_delta_temporality: <boolean> | default = false]
3277-
3278-
# EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for
3279-
# the OTLP metrics.
3280-
# CLI flag: -distributor.otlp.enable-type-and-unit-labels
3281-
[enable_type_and_unit_labels: <boolean> | default = false]
32823277
```
32833278

32843279
### `etcd_config`
@@ -3998,6 +3993,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
39983993
# CLI flag: -distributor.promote-resource-attributes
39993994
[promote_resource_attributes: <list of string> | default = ]
40003995
3996+
# EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics.
3997+
# This applies to remote write v2 and OTLP requests.
3998+
# CLI flag: -distributor.enable-type-and-unit-labels
3999+
[enable_type_and_unit_labels: <boolean> | default = false]
4000+
40014001
# The maximum number of active series per user, per ingester. 0 to disable.
40024002
# CLI flag: -ingester.max-series-per-user
40034003
[max_series_per_user: <int> | default = 5000000]

integration/e2e/util.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,18 +465,20 @@ func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Labe
465465
st := writev2.NewSymbolTable()
466466
lb := labels.NewScratchBuilder(0)
467467
lb.Add("__name__", name)
468-
469468
for _, label := range additionalLabels {
470469
lb.Add(label.Name, label.Value)
471470
}
471+
472472
series = append(series, writev2.TimeSeries{
473473
// Generate the series
474474
LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil),
475475
Samples: []writev2.Sample{
476476
{Value: value, Timestamp: tsMillis},
477477
},
478478
Metadata: writev2.Metadata{
479-
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
479+
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
480+
HelpRef: 2, // equal to name
481+
UnitRef: 2, // equal to name
480482
},
481483
})
482484
symbols = st.Symbols()

integration/otlp_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func TestOTLPEnableTypeAndUnitLabels(t *testing.T) {
281281
"-auth.enabled": "true",
282282

283283
// OTLP
284-
"-distributor.otlp.enable-type-and-unit-labels": "true",
284+
"-distributor.enable-type-and-unit-labels": "true",
285285

286286
// alert manager
287287
"-alertmanager.web.external-url": "http://localhost/alertmanager",

integration/remote_write_v2_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,77 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) {
204204
require.Empty(t, result)
205205
}
206206

207+
func TestIngest_EnableTypeAndUnitLabels(t *testing.T) {
208+
const blockRangePeriod = 5 * time.Second
209+
210+
s, err := e2e.NewScenario(networkName)
211+
require.NoError(t, err)
212+
defer s.Close()
213+
214+
// Start dependencies.
215+
consul := e2edb.NewConsulWithName("consul")
216+
require.NoError(t, s.StartAndWaitReady(consul))
217+
218+
flags := mergeFlags(
219+
AlertmanagerLocalFlags(),
220+
map[string]string{
221+
"-store.engine": blocksStorageEngine,
222+
"-blocks-storage.backend": "filesystem",
223+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
224+
"-blocks-storage.bucket-store.sync-interval": "15m",
225+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
226+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
227+
"-querier.query-store-for-labels-enabled": "true",
228+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
229+
"-blocks-storage.tsdb.ship-interval": "1s",
230+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
231+
"-blocks-storage.tsdb.enable-native-histograms": "true",
232+
// Ingester.
233+
"-ring.store": "consul",
234+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
235+
// Distributor.
236+
"-distributor.replication-factor": "1",
237+
"-distributor.remote-writev2-enabled": "true",
238+
"-distributor.enable-type-and-unit-labels": "true",
239+
// Store-gateway.
240+
"-store-gateway.sharding-enabled": "false",
241+
// alert manager
242+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
243+
},
244+
)
245+
246+
// make alert manager config dir
247+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
248+
249+
path := path.Join(s.SharedDir(), "cortex-1")
250+
251+
flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
252+
// Start Cortex replicas.
253+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
254+
require.NoError(t, s.StartAndWaitReady(cortex))
255+
256+
// Wait until Cortex replicas have updated the ring state.
257+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
258+
259+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
260+
require.NoError(t, err)
261+
262+
now := time.Now()
263+
264+
// series push
265+
symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
266+
writeStats, err := c.PushV2(symbols1, series)
267+
require.NoError(t, err)
268+
testPushHeader(t, writeStats, 1, 0, 0)
269+
270+
value, err := c.Query("test_series", now)
271+
require.NoError(t, err)
272+
require.Equal(t, model.ValVector, value.Type())
273+
vec := value.(model.Vector)
274+
require.True(t, vec[0].Metric["__unit__"] != "")
275+
require.True(t, vec[0].Metric["__type__"] != "")
276+
}
277+
207278
func TestIngest(t *testing.T) {
208279
const blockRangePeriod = 5 * time.Second
209280

pkg/api/api.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
283283
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
284284
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
285285

286-
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
286+
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
287287
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
288288

289289
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
@@ -295,7 +295,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
295295
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")
296296

297297
// Legacy Routes
298-
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
298+
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
299299
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
300300
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
301301
}
@@ -313,7 +313,7 @@ type Ingester interface {
313313
}
314314

315315
// RegisterIngester registers the ingesters HTTP and GRPC service
316-
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
316+
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, overrides *validation.Overrides) {
317317
client.RegisterIngesterServer(a.server.GRPC, i)
318318

319319
a.indexPage.AddLink(SectionAdminEndpoints, "/ingester/all_user_stats", "Usage Statistics")
@@ -328,12 +328,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
328328
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
329329
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
330330
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
331-
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
331+
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
332332

333333
// Legacy Routes
334334
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
335335
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
336-
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
336+
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
337337
}
338338

339339
func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
492492
}
493493

494494
func (t *Cortex) initIngester() (serv services.Service, err error) {
495-
t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor)
495+
t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor, t.Overrides)
496496

497497
return nil, nil
498498
}

pkg/cortexpb/compatv2.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cortexpb
33
import (
44
"fmt"
55

6+
"github.com/prometheus/common/model"
67
"github.com/prometheus/prometheus/model/labels"
78
)
89

@@ -32,3 +33,26 @@ func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []s
3233
b.Sort()
3334
return b.Labels(), nil
3435
}
36+
37+
func MetadataV2MetricTypeToMetricType(mt MetadataV2_MetricType) model.MetricType {
38+
switch mt {
39+
case METRIC_TYPE_UNSPECIFIED:
40+
return model.MetricTypeUnknown
41+
case METRIC_TYPE_COUNTER:
42+
return model.MetricTypeCounter
43+
case METRIC_TYPE_GAUGE:
44+
return model.MetricTypeGauge
45+
case METRIC_TYPE_HISTOGRAM:
46+
return model.MetricTypeHistogram
47+
case METRIC_TYPE_GAUGEHISTOGRAM:
48+
return model.MetricTypeGaugeHistogram
49+
case METRIC_TYPE_SUMMARY:
50+
return model.MetricTypeSummary
51+
case METRIC_TYPE_INFO:
52+
return model.MetricTypeInfo
53+
case METRIC_TYPE_STATESET:
54+
return model.MetricTypeStateset
55+
default:
56+
return model.MetricTypeUnknown
57+
}
58+
}

pkg/distributor/distributor.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,9 @@ type InstanceLimits struct {
197197
}
198198

199199
type OTLPConfig struct {
200-
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
201-
DisableTargetInfo bool `yaml:"disable_target_info"`
202-
AllowDeltaTemporality bool `yaml:"allow_delta_temporality"`
203-
EnableTypeAndUnitLabels bool `yaml:"enable_type_and_unit_labels"`
200+
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
201+
DisableTargetInfo bool `yaml:"disable_target_info"`
202+
AllowDeltaTemporality bool `yaml:"allow_delta_temporality"`
204203
}
205204

206205
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -229,7 +228,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
229228
f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.")
230229
f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)")
231230
f.BoolVar(&cfg.OTLPConfig.AllowDeltaTemporality, "distributor.otlp.allow-delta-temporality", false, "EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.")
232-
f.BoolVar(&cfg.OTLPConfig.EnableTypeAndUnitLabels, "distributor.otlp.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for the OTLP metrics.")
233231
}
234232

235233
// Validate config and returns error on failure

pkg/util/push/otlp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distribu
181181
AddMetricSuffixes: true,
182182
DisableTargetInfo: cfg.DisableTargetInfo,
183183
AllowDeltaTemporality: cfg.AllowDeltaTemporality,
184-
EnableTypeAndUnitLabels: cfg.EnableTypeAndUnitLabels,
184+
EnableTypeAndUnitLabels: overrides.EnableTypeAndUnitLabels(userID),
185185
}
186186

187187
var annots annotations.Annotations

0 commit comments

Comments
 (0)