Skip to content

Commit 56bc703

Browse files
authored
Add type and unit labels to prw2 (#7077)
* Add -distributor.enable-type-and-unit-labels per tenant flag for prw2 and otlp Signed-off-by: SungJin1212 <[email protected]> * Upgrade thanos promql engine to bcec363c24e6bebc6f9c88f9eb815d562f2c9adb Signed-off-by: SungJin1212 <[email protected]> * fix lint Signed-off-by: SungJin1212 <[email protected]> * Add breaking change mention to change log Signed-off-by: SungJin1212 <[email protected]> * keep distributor.otlp.enable-type-and-unit-labels as Deprecated Signed-off-by: SungJin1212 <[email protected]> * changelog Signed-off-by: SungJin1212 <[email protected]> --------- Signed-off-by: SungJin1212 <[email protected]>
1 parent 5d98839 commit 56bc703

File tree

340 files changed

+1282
-72254
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

340 files changed

+1282
-72254
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## master / unreleased
44

55
* [FEATURE] StoreGateway: Introduces a new parquet mode. #7046
6+
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
67
* [ENHANCEMENT] StoreGateway: Add tracings to parquet mode. #7125
78
* [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092
89
* [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7098

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3300,8 +3300,7 @@ otlp:
33003300
# CLI flag: -distributor.otlp.allow-delta-temporality
33013301
[allow_delta_temporality: <boolean> | default = false]
33023302
3303-
# EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for
3304-
# the OTLP metrics.
3303+
# Deprecated: Use `-distributor.enable-type-and-unit-labels` flag instead.
33053304
# CLI flag: -distributor.otlp.enable-type-and-unit-labels
33063305
[enable_type_and_unit_labels: <boolean> | default = false]
33073306
```
@@ -4023,6 +4022,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
40234022
# CLI flag: -distributor.promote-resource-attributes
40244023
[promote_resource_attributes: <list of string> | default = ]
40254024
4025+
# EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics.
4026+
# This applies to remote write v2 and OTLP requests.
4027+
# CLI flag: -distributor.enable-type-and-unit-labels
4028+
[enable_type_and_unit_labels: <boolean> | default = false]
4029+
40264030
# The maximum number of active series per user, per ingester. 0 to disable.
40274031
# CLI flag: -ingester.max-series-per-user
40284032
[max_series_per_user: <int> | default = 5000000]

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ Currently experimental features are:
116116
- `alertmanager-sharding-ring.final-sleep` (duration) CLI flag
117117
- OTLP Receiver
118118
- Ingest delta temporality OTLP metrics (`-distributor.otlp.allow-delta-temporality=true`)
119-
- Add `__type__` and `__unit__` labels (`-distributor.otlp.enable-type-and-unit-labels`)
120119
- Persistent tokens in the Ruler Ring:
121120
- `-ruler.ring.tokens-file-path` (path) CLI flag
122121
- Native Histograms
@@ -133,3 +132,4 @@ Currently experimental features are:
133132
- `-store-gateway.query-protection.rejection`
134133
- Distributor/Ingester: Stream push connection
135134
- Enable stream push connection between distributor and ingester by setting `-distributor.use-stream-push=true` on Distributor.
135+
- Add `__type__` and `__unit__` labels to OTLP and remote write v2 requests (`-distributor.enable-type-and-unit-labels`)

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ require (
5050
github.com/spf13/afero v1.11.0
5151
github.com/stretchr/testify v1.11.1
5252
github.com/thanos-io/objstore v0.0.0-20250722142242-922b22272ee3
53-
github.com/thanos-io/promql-engine v0.0.0-20250924193140-e9123dc11264
53+
github.com/thanos-io/promql-engine v0.0.0-20251117105526-bcec363c24e6
5454
github.com/thanos-io/thanos v0.39.3-0.20250729120336-88d0ae8071cb
5555
github.com/uber/jaeger-client-go v2.30.0+incompatible
5656
github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1774,8 +1774,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1
17741774
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
17751775
github.com/thanos-io/objstore v0.0.0-20250722142242-922b22272ee3 h1:P301Anc27aVL7Ls88el92j+qW3PJp8zmiDl+kOUZv3A=
17761776
github.com/thanos-io/objstore v0.0.0-20250722142242-922b22272ee3/go.mod h1:uDHLkMKOGDAnlN75EAz8VrRzob1+VbgYSuUleatWuF0=
1777-
github.com/thanos-io/promql-engine v0.0.0-20250924193140-e9123dc11264 h1:sOmANo4XVhem4VgvI9w05DBwqMex/qw+cDjuHW2FKWw=
1778-
github.com/thanos-io/promql-engine v0.0.0-20250924193140-e9123dc11264/go.mod h1:MOFN0M1nDMcWZg1t4iF39sOard/K4SWgO/HHSODeDIc=
1777+
github.com/thanos-io/promql-engine v0.0.0-20251117105526-bcec363c24e6 h1:/8TlVay3hF8LQ7iGLRm9aNRGQsyYOXFPNmtg5dsNwcM=
1778+
github.com/thanos-io/promql-engine v0.0.0-20251117105526-bcec363c24e6/go.mod h1:MOFN0M1nDMcWZg1t4iF39sOard/K4SWgO/HHSODeDIc=
17791779
github.com/thanos-io/thanos v0.39.3-0.20250729120336-88d0ae8071cb h1:z/ePbn3lo/D4vdHGH8hpa2kgH9M6iLq0kOFtZwuelKM=
17801780
github.com/thanos-io/thanos v0.39.3-0.20250729120336-88d0ae8071cb/go.mod h1:gGUG3TDEoRSjTFVs/QO6QnQIILRgNF0P9l7BiiMfmHw=
17811781
github.com/tinylib/msgp v1.3.0 h1:ULuf7GPooDaIlbyvgAxBV/FI7ynli6LZ1/nVUNu+0ww=

integration/e2e/util.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,18 +465,20 @@ func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Labe
465465
st := writev2.NewSymbolTable()
466466
lb := labels.NewScratchBuilder(0)
467467
lb.Add("__name__", name)
468-
469468
for _, label := range additionalLabels {
470469
lb.Add(label.Name, label.Value)
471470
}
471+
472472
series = append(series, writev2.TimeSeries{
473473
// Generate the series
474474
LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil),
475475
Samples: []writev2.Sample{
476476
{Value: value, Timestamp: tsMillis},
477477
},
478478
Metadata: writev2.Metadata{
479-
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
479+
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
480+
HelpRef: 2, // equal to name
481+
UnitRef: 2, // equal to name
480482
},
481483
})
482484
symbols = st.Symbols()

integration/otlp_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func TestOTLPEnableTypeAndUnitLabels(t *testing.T) {
281281
"-auth.enabled": "true",
282282

283283
// OTLP
284-
"-distributor.otlp.enable-type-and-unit-labels": "true",
284+
"-distributor.enable-type-and-unit-labels": "true",
285285

286286
// alert manager
287287
"-alertmanager.web.external-url": "http://localhost/alertmanager",

integration/remote_write_v2_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,77 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) {
204204
require.Empty(t, result)
205205
}
206206

207+
func TestIngest_EnableTypeAndUnitLabels(t *testing.T) {
208+
const blockRangePeriod = 5 * time.Second
209+
210+
s, err := e2e.NewScenario(networkName)
211+
require.NoError(t, err)
212+
defer s.Close()
213+
214+
// Start dependencies.
215+
consul := e2edb.NewConsulWithName("consul")
216+
require.NoError(t, s.StartAndWaitReady(consul))
217+
218+
flags := mergeFlags(
219+
AlertmanagerLocalFlags(),
220+
map[string]string{
221+
"-store.engine": blocksStorageEngine,
222+
"-blocks-storage.backend": "filesystem",
223+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
224+
"-blocks-storage.bucket-store.sync-interval": "15m",
225+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
226+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
227+
"-querier.query-store-for-labels-enabled": "true",
228+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
229+
"-blocks-storage.tsdb.ship-interval": "1s",
230+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
231+
"-blocks-storage.tsdb.enable-native-histograms": "true",
232+
// Ingester.
233+
"-ring.store": "consul",
234+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
235+
// Distributor.
236+
"-distributor.replication-factor": "1",
237+
"-distributor.remote-writev2-enabled": "true",
238+
"-distributor.enable-type-and-unit-labels": "true",
239+
// Store-gateway.
240+
"-store-gateway.sharding-enabled": "false",
241+
// alert manager
242+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
243+
},
244+
)
245+
246+
// make alert manager config dir
247+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
248+
249+
path := path.Join(s.SharedDir(), "cortex-1")
250+
251+
flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
252+
// Start Cortex replicas.
253+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
254+
require.NoError(t, s.StartAndWaitReady(cortex))
255+
256+
// Wait until Cortex replicas have updated the ring state.
257+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
258+
259+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
260+
require.NoError(t, err)
261+
262+
now := time.Now()
263+
264+
// series push
265+
symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
266+
writeStats, err := c.PushV2(symbols1, series)
267+
require.NoError(t, err)
268+
testPushHeader(t, writeStats, 1, 0, 0)
269+
270+
value, err := c.Query("test_series", now)
271+
require.NoError(t, err)
272+
require.Equal(t, model.ValVector, value.Type())
273+
vec := value.(model.Vector)
274+
require.True(t, vec[0].Metric["__unit__"] != "")
275+
require.True(t, vec[0].Metric["__type__"] != "")
276+
}
277+
207278
func TestIngest(t *testing.T) {
208279
const blockRangePeriod = 5 * time.Second
209280

pkg/api/api.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
283283
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
284284
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
285285

286-
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
286+
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
287287
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
288288

289289
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
@@ -295,7 +295,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
295295
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")
296296

297297
// Legacy Routes
298-
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
298+
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
299299
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
300300
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
301301
}
@@ -313,7 +313,7 @@ type Ingester interface {
313313
}
314314

315315
// RegisterIngester registers the ingesters HTTP and GRPC service
316-
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
316+
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, overrides *validation.Overrides) {
317317
client.RegisterIngesterServer(a.server.GRPC, i)
318318

319319
a.indexPage.AddLink(SectionAdminEndpoints, "/ingester/all_user_stats", "Usage Statistics")
@@ -328,12 +328,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
328328
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
329329
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
330330
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
331-
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
331+
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
332332

333333
// Legacy Routes
334334
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
335335
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
336-
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
336+
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
337337
}
338338

339339
func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
492492
}
493493

494494
func (t *Cortex) initIngester() (serv services.Service, err error) {
495-
t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor)
495+
t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor, t.Overrides)
496496

497497
return nil, nil
498498
}

0 commit comments

Comments
 (0)