Skip to content

Commit 77f9e92

Browse files
committed
Change otlp attribute conversion to consist with prometheus
Signed-off-by: SungJin1212 <[email protected]>
1 parent e070ec6 commit 77f9e92

File tree

10 files changed

+413
-30
lines changed

10 files changed

+413
-30
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
1010
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1111
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
12+
* [ENHANCEMENT] OTLP: Change otlp handler to consist with the Prometheus otlp handler. Enable `target_info` metric and disable convert all attributes to labels by default. Add a config to specify promote resource attributes for tenants. #6272
1213
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
1314
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
1415
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188

docs/configuration/config-file-reference.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2608,6 +2608,15 @@ instance_limits:
26082608
# unlimited.
26092609
# CLI flag: -distributor.instance-limits.max-inflight-push-requests
26102610
[max_inflight_push_requests: <int> | default = 0]
2611+
2612+
otlp:
2613+
# If enabled, all resource attributes are converted to labels.
2614+
# CLI flag: -distributor.otlp-config.convert-all-attributes
2615+
[convert_all_attributes: <boolean> | default = false]
2616+
2617+
# If enabled, a target_info metric is not ingested.
2618+
# CLI flag: -distributor.otlp-config.disable-target-info
2619+
[disable_target_info: <boolean> | default = false]
26112620
```
26122621

26132622
### `etcd_config`
@@ -3246,6 +3255,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
32463255
# CLI flag: -validation.max-native-histogram-buckets
32473256
[max_native_histogram_buckets: <int> | default = 0]
32483257
3258+
# Comma separated list of resource attributes that should be converted to
3259+
# labels.
3260+
# CLI flag: -distributor.promote-resource-attributes
3261+
[promote_resource_attributes: <list of string> | default = ]
3262+
32493263
# The maximum number of active series per user, per ingester. 0 to disable.
32503264
# CLI flag: -ingester.max-series-per-user
32513265
[max_series_per_user: <int> | default = 5000000]

integration/e2ecortex/client.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics
231231
return metrics
232232
}
233233

234-
func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
234+
func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportRequest {
235235
d := pmetric.NewMetrics()
236236

237237
// Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
@@ -244,6 +244,9 @@ func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
244244
resourceMetric.Resource().Attributes().PutStr("service.name", "test-service")
245245
resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance")
246246
resourceMetric.Resource().Attributes().PutStr("host.name", "test-host")
247+
for _, label := range labels {
248+
resourceMetric.Resource().Attributes().PutStr(label.Name, label.Value)
249+
}
247250

248251
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()
249252

@@ -258,7 +261,6 @@ func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
258261
counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty()
259262
counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
260263
counterDataPoint.SetDoubleValue(10.0)
261-
counterDataPoint.Attributes().PutStr("foo.bar", "baz")
262264

263265
counterExemplar := counterDataPoint.Exemplars().AppendEmpty()
264266
counterExemplar.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
@@ -269,8 +271,8 @@ func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
269271
return pmetricotlp.NewExportRequestFromMetrics(d)
270272
}
271273

272-
func (c *Client) OTLPPushExemplar(name string) (*http.Response, error) {
273-
data, err := otlpWriteRequest(name).MarshalProto()
274+
func (c *Client) OTLPPushExemplar(name string, labels ...prompb.Label) (*http.Response, error) {
275+
data, err := otlpWriteRequest(name, labels...).MarshalProto()
274276
if err != nil {
275277
return nil, err
276278
}

integration/otlp_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package integration
55

66
import (
7+
"bytes"
8+
"context"
79
"fmt"
810
"math/rand"
911
"path/filepath"
@@ -15,6 +17,7 @@ import (
1517
"github.com/prometheus/prometheus/tsdb/tsdbutil"
1618
"github.com/stretchr/testify/assert"
1719
"github.com/stretchr/testify/require"
20+
"github.com/thanos-io/objstore/providers/s3"
1821

1922
"github.com/cortexproject/cortex/integration/e2e"
2023
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
@@ -144,3 +147,110 @@ func TestOTLPIngestExemplar(t *testing.T) {
144147
require.NoError(t, err)
145148
require.Equal(t, 1, len(exemplars))
146149
}
150+
151+
func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) {
152+
configFileName := "runtime-config.yaml"
153+
154+
s, err := e2e.NewScenario(networkName)
155+
require.NoError(t, err)
156+
defer s.Close()
157+
158+
// Start dependencies.
159+
minio := e2edb.NewMinio(9000, bucketName)
160+
require.NoError(t, s.StartAndWaitReady(minio))
161+
162+
// Configure the blocks storage to frequently compact TSDB head
163+
// and ship blocks to the storage.
164+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
165+
"-auth.enabled": "true",
166+
"-runtime-config.backend": "s3",
167+
"-runtime-config.s3.access-key-id": e2edb.MinioAccessKey,
168+
"-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey,
169+
"-runtime-config.s3.bucket-name": bucketName,
170+
"-runtime-config.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
171+
"-runtime-config.s3.insecure": "true",
172+
"-runtime-config.file": configFileName,
173+
"-runtime-config.reload-period": "1s",
174+
175+
// Distributor
176+
"-distributor.otlp-config.convert-all-attributes": "false",
177+
"-distributor.promote-resource-attributes": "attr1,attr2,attr3",
178+
179+
// alert manager
180+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
181+
"-alertmanager-storage.backend": "local",
182+
"-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"),
183+
})
184+
185+
// make alert manager config dir
186+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
187+
188+
client, err := s3.NewBucketWithConfig(nil, s3.Config{
189+
Endpoint: minio.HTTPEndpoint(),
190+
Insecure: true,
191+
Bucket: bucketName,
192+
AccessKey: e2edb.MinioAccessKey,
193+
SecretKey: e2edb.MinioSecretKey,
194+
}, "runtime-config-test")
195+
196+
require.NoError(t, err)
197+
198+
// update runtime config
199+
newRuntimeConfig := []byte(`overrides:
200+
user-1:
201+
promote_resource_attributes: ["attr1"]
202+
user-2:
203+
promote_resource_attributes: ["attr1", "attr2"]
204+
`)
205+
require.NoError(t, client.Upload(context.Background(), configFileName, bytes.NewReader(newRuntimeConfig)))
206+
time.Sleep(2 * time.Second)
207+
208+
require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks-local.yaml", cortexConfigFile))
209+
210+
// start cortex and assert runtime-config is loaded correctly
211+
cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex", cortexConfigFile, flags, "", 9009, 9095)
212+
require.NoError(t, s.StartAndWaitReady(cortex))
213+
214+
c1, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
215+
require.NoError(t, err)
216+
217+
c2, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-2")
218+
require.NoError(t, err)
219+
220+
c3, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-3")
221+
require.NoError(t, err)
222+
223+
// Push some series to Cortex.
224+
now := time.Now()
225+
226+
labels := []prompb.Label{
227+
{Name: "service.name", Value: "test-service"},
228+
{Name: "attr1", Value: "value"},
229+
{Name: "attr2", Value: "value"},
230+
{Name: "attr3", Value: "value"},
231+
}
232+
233+
res, err := c1.OTLPPushExemplar("series_1", labels...)
234+
require.NoError(t, err)
235+
require.Equal(t, 200, res.StatusCode)
236+
237+
res, err = c2.OTLPPushExemplar("series_1", labels...)
238+
require.NoError(t, err)
239+
require.Equal(t, 200, res.StatusCode)
240+
241+
res, err = c3.OTLPPushExemplar("series_1", labels...)
242+
require.NoError(t, err)
243+
require.Equal(t, 200, res.StatusCode)
244+
245+
labelSet1, err := c1.LabelNames(now.Add(-time.Minute*5), now, "series_1")
246+
require.NoError(t, err)
247+
require.Equal(t, labelSet1, []string{"__name__", "attr1", "instance", "job"})
248+
249+
labelSet2, err := c2.LabelNames(now.Add(-time.Minute*5), now, "series_1")
250+
require.NoError(t, err)
251+
require.Equal(t, labelSet2, []string{"__name__", "attr1", "attr2", "instance", "job"})
252+
253+
labelSet3, err := c3.LabelNames(now.Add(-time.Minute*5), now, "series_1")
254+
require.NoError(t, err)
255+
require.Equal(t, labelSet3, []string{"__name__", "attr1", "attr2", "attr3", "instance", "job"})
256+
}

pkg/api/api.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
4141
"github.com/cortexproject/cortex/pkg/util/flagext"
4242
"github.com/cortexproject/cortex/pkg/util/push"
43+
"github.com/cortexproject/cortex/pkg/util/validation"
4344
)
4445

4546
// DistributorPushWrapper wraps around a push. It is similar to middleware.Interface.
@@ -273,11 +274,11 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
273274
}
274275

275276
// RegisterDistributor registers the endpoints associated with the distributor.
276-
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config) {
277+
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
277278
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
278279

279280
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
280-
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
281+
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
281282

282283
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
283284
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func (t *Cortex) initGrpcClientServices() (serv services.Service, err error) {
246246
}
247247

248248
func (t *Cortex) initDistributor() (serv services.Service, err error) {
249-
t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor)
249+
t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Overrides)
250250

251251
return nil, nil
252252
}

pkg/distributor/distributor.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,21 @@ type Config struct {
164164

165165
// Limits for distributor
166166
InstanceLimits InstanceLimits `yaml:"instance_limits"`
167+
168+
// OTLPConfig
169+
OTLPConfig OTLPConfig `yaml:"otlp"`
167170
}
168171

169172
type InstanceLimits struct {
170173
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
171174
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
172175
}
173176

177+
type OTLPConfig struct {
178+
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
179+
DisableTargetInfo bool `yaml:"disable_target_info"`
180+
}
181+
174182
// RegisterFlags adds the flags required to config this to the given FlagSet
175183
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
176184
cfg.PoolConfig.RegisterFlags(f)
@@ -188,6 +196,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
188196

189197
f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
190198
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")
199+
200+
f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp-config.convert-all-attributes", false, "If enabled, all resource attributes are converted to labels.")
201+
f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp-config.disable-target-info", false, "If enabled, a target_info metric is not ingested.")
191202
}
192203

193204
// Validate config and returns error on failure

pkg/util/push/otlp.go

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,50 @@
11
package push
22

33
import (
4+
"context"
45
"net/http"
56

7+
"github.com/go-kit/log"
68
"github.com/go-kit/log/level"
79
"github.com/prometheus/prometheus/model/labels"
810
"github.com/prometheus/prometheus/prompb"
911
"github.com/prometheus/prometheus/storage/remote"
1012
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
13+
"github.com/prometheus/prometheus/util/annotations"
1114
"github.com/weaveworks/common/httpgrpc"
1215
"github.com/weaveworks/common/middleware"
1316
"go.opentelemetry.io/collector/pdata/pcommon"
1417
"go.opentelemetry.io/collector/pdata/pmetric"
1518

1619
"github.com/cortexproject/cortex/pkg/cortexpb"
20+
"github.com/cortexproject/cortex/pkg/distributor"
21+
"github.com/cortexproject/cortex/pkg/tenant"
1722
"github.com/cortexproject/cortex/pkg/util"
18-
"github.com/cortexproject/cortex/pkg/util/log"
23+
util_log "github.com/cortexproject/cortex/pkg/util/log"
24+
"github.com/cortexproject/cortex/pkg/util/validation"
1925
)
2026

2127
// OTLPHandler is a http.Handler which accepts OTLP metrics.
22-
func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
28+
func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
2329
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2430
ctx := r.Context()
25-
logger := log.WithContext(ctx, log.Logger)
31+
logger := util_log.WithContext(ctx, util_log.Logger)
2632
if sourceIPs != nil {
2733
source := sourceIPs.Get(r)
2834
if source != "" {
2935
ctx = util.AddSourceIPsToOutgoingContext(ctx, source)
30-
logger = log.WithSourceIPs(source, logger)
36+
logger = util_log.WithSourceIPs(source, logger)
3137
}
3238
}
33-
req, err := remote.DecodeOTLPWriteRequest(r)
39+
40+
userID, err := tenant.TenantID(ctx)
3441
if err != nil {
35-
level.Error(logger).Log("err", err.Error())
36-
http.Error(w, err.Error(), http.StatusBadRequest)
3742
return
3843
}
3944

40-
promConverter := prometheusremotewrite.NewPrometheusConverter()
41-
setting := prometheusremotewrite.Settings{
42-
AddMetricSuffixes: true,
43-
DisableTargetInfo: true,
44-
}
45-
annots, err := promConverter.FromMetrics(ctx, convertToMetricsAttributes(req.Metrics()), setting)
46-
ws, _ := annots.AsStrings("", 0, 0)
47-
if len(ws) > 0 {
48-
level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
49-
}
50-
45+
req, err := remote.DecodeOTLPWriteRequest(r)
5146
if err != nil {
52-
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
47+
level.Error(logger).Log("err", err.Error())
5348
http.Error(w, err.Error(), http.StatusBadRequest)
5449
return
5550
}
@@ -60,8 +55,16 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle
6055
SkipLabelNameValidation: false,
6156
}
6257

58+
// otlp to prompb TimeSeries
59+
promTsList, err := convertToPromTS(r.Context(), req.Metrics(), cfg, overrides.PromoteResourceAttributes(userID), logger)
60+
if err != nil {
61+
http.Error(w, err.Error(), http.StatusBadRequest)
62+
return
63+
}
64+
65+
// convert prompb to cortexpb TimeSeries
6366
tsList := []cortexpb.PreallocTimeseries(nil)
64-
for _, v := range promConverter.TimeSeries() {
67+
for _, v := range promTsList {
6568
tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{
6669
Labels: makeLabels(v.Labels),
6770
Samples: makeSamples(v.Samples),
@@ -87,6 +90,34 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle
8790
})
8891
}
8992

93+
func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, promoteResourceAttributes []string, logger log.Logger) ([]prompb.TimeSeries, error) {
94+
promConverter := prometheusremotewrite.NewPrometheusConverter()
95+
settings := prometheusremotewrite.Settings{
96+
AddMetricSuffixes: true,
97+
PromoteResourceAttributes: promoteResourceAttributes,
98+
DisableTargetInfo: cfg.DisableTargetInfo,
99+
}
100+
101+
var annots annotations.Annotations
102+
var err error
103+
if cfg.ConvertAllAttributes {
104+
annots, err = promConverter.FromMetrics(ctx, convertToMetricsAttributes(pmetrics), settings)
105+
} else {
106+
annots, err = promConverter.FromMetrics(ctx, pmetrics, settings)
107+
}
108+
109+
ws, _ := annots.AsStrings("", 0, 0)
110+
if len(ws) > 0 {
111+
level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
112+
}
113+
114+
if err != nil {
115+
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
116+
return nil, err
117+
}
118+
return promConverter.TimeSeries(), nil
119+
}
120+
90121
func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter {
91122
out := make(labels.Labels, 0, len(in))
92123
for _, l := range in {

0 commit comments

Comments
 (0)