Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
* [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727
* [FEATURE] Distributor: Add an experimental `-distributor.otlp.enable-type-and-unit-labels` flag to add `__type__` and `__unit__` labels for OTLP metrics. #6969
* [FEATURE] Distributor: Add an experimental `-distributor.otlp.allow-delta-temporality` flag to ingest delta temporality otlp metrics. #6934
* [FEATURE] Query Frontend: Add dynamic interval size for query splitting. This is enabled by configuring experimental flags `querier.max-shards-per-query` and/or `querier.max-fetched-data-duration-per-query`. The split interval size is dynamically increased to maintain a number of shards and total duration fetched below the configured values. #6458
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3265,6 +3265,11 @@ otlp:
# EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.
# CLI flag: -distributor.otlp.allow-delta-temporality
[allow_delta_temporality: <boolean> | default = false]

# EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for
# the OTLP metrics.
# CLI flag: -distributor.otlp.enable-type-and-unit-labels
[enable_type_and_unit_labels: <boolean> | default = false]
```

### `etcd_config`
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ Currently experimental features are:
- `alertmanager-sharding-ring.final-sleep` (duration) CLI flag
- OTLP Receiver
- Ingest delta temporality OTLP metrics (`-distributor.otlp.allow-delta-temporality=true`)
- Add `__type__` and `__unit__` labels (`-distributor.otlp.enable-type-and-unit-labels`)
- Persistent tokens in the Ruler Ring:
- `-ruler.ring.tokens-file-path` (path) CLI flag
- Native Histograms
Expand Down
7 changes: 4 additions & 3 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries, metadata []promp
return metrics
}

func otlpWriteRequest(name string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) pmetricotlp.ExportRequest {
func otlpWriteRequest(name, unit string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()

// Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
Expand All @@ -292,6 +292,7 @@ func otlpWriteRequest(name string, temporality pmetric.AggregationTemporality, l
// Generate One Counter
counterMetric := scopeMetric.Metrics().AppendEmpty()
counterMetric.SetName(name)
counterMetric.SetUnit(unit)
counterMetric.SetDescription("test-counter-description")

counterMetric.SetEmptySum()
Expand All @@ -310,8 +311,8 @@ func otlpWriteRequest(name string, temporality pmetric.AggregationTemporality, l
return pmetricotlp.NewExportRequestFromMetrics(d)
}

func (c *Client) OTLPPushExemplar(name string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) (*http.Response, error) {
data, err := otlpWriteRequest(name, temporality, labels...).MarshalProto()
func (c *Client) OTLPPushExemplar(name, unit string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) (*http.Response, error) {
data, err := otlpWriteRequest(name, unit, temporality, labels...).MarshalProto()
if err != nil {
return nil, err
}
Expand Down
69 changes: 64 additions & 5 deletions integration/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestOTLPIngestExemplar(t *testing.T) {
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

res, err := c.OTLPPushExemplar("exemplar_1", pmetric.AggregationTemporalityCumulative)
res, err := c.OTLPPushExemplar("exemplar_1", "", pmetric.AggregationTemporalityCumulative)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

Expand Down Expand Up @@ -242,15 +242,15 @@ func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) {
{Name: "attr3", Value: "value"},
}

res, err := c1.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...)
res, err := c1.OTLPPushExemplar("series_1", "", pmetric.AggregationTemporalityCumulative, labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c2.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...)
res, err = c2.OTLPPushExemplar("series_1", "", pmetric.AggregationTemporalityCumulative, labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c3.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...)
res, err = c3.OTLPPushExemplar("series_1", "", pmetric.AggregationTemporalityCumulative, labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

Expand All @@ -267,6 +267,65 @@ func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) {
require.Equal(t, labelSet3, []string{"__name__", "attr1", "attr2", "attr3", "instance", "job"})
}

func TestOTLPEnableTypeAndUnitLabels(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-auth.enabled": "true",

// OTLP
"-distributor.otlp.enable-type-and-unit-labels": "true",

// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
"-alertmanager-storage.backend": "local",
"-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"),
})

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks-local.yaml", cortexConfigFile))

// start cortex and assert runtime-config is loaded correctly
cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex", cortexConfigFile, flags, "", 9009, 9095)
require.NoError(t, s.StartAndWaitReady(cortex))

c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push some series to Cortex.
now := time.Now()

labels := []prompb.Label{
{Name: "service.name", Value: "test-service"},
{Name: "attr1", Value: "value"},
}

res, err := c.OTLPPushExemplar("series_1", "seconds", pmetric.AggregationTemporalityCumulative, labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

value, err := c.Query("series_1_seconds", now)
require.NoError(t, err)
vector, ok := value.(model.Vector)
fmt.Println("vector", vector)
require.True(t, ok)
require.Equal(t, 1, len(vector))

metric := vector[0].Metric
require.Equal(t, model.LabelValue("seconds"), metric["__unit__"])
require.Equal(t, model.LabelValue("gauge"), metric["__type__"])
}

func TestOTLPPushDeltaTemporality(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -310,7 +369,7 @@ func TestOTLPPushDeltaTemporality(t *testing.T) {
{Name: "attr1", Value: "value"},
}

res, err := c.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityDelta, labels...)
res, err := c.OTLPPushExemplar("series_1", "", pmetric.AggregationTemporalityDelta, labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

Expand Down
8 changes: 5 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,10 @@ type InstanceLimits struct {
}

type OTLPConfig struct {
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
DisableTargetInfo bool `yaml:"disable_target_info"`
AllowDeltaTemporality bool `yaml:"allow_delta_temporality"`
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
DisableTargetInfo bool `yaml:"disable_target_info"`
AllowDeltaTemporality bool `yaml:"allow_delta_temporality"`
EnableTypeAndUnitLabels bool `yaml:"enable_type_and_unit_labels"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -224,6 +225,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.")
f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)")
f.BoolVar(&cfg.OTLPConfig.AllowDeltaTemporality, "distributor.otlp.allow-delta-temporality", false, "EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.")
f.BoolVar(&cfg.OTLPConfig.EnableTypeAndUnitLabels, "distributor.otlp.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for the OTLP metrics.")
}

// Validate config and returns error on failure
Expand Down
7 changes: 4 additions & 3 deletions pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,10 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, []prompb.MetricMetadata, error) {
promConverter := prometheusremotewrite.NewPrometheusConverter()
settings := prometheusremotewrite.Settings{
AddMetricSuffixes: true,
DisableTargetInfo: cfg.DisableTargetInfo,
AllowDeltaTemporality: cfg.AllowDeltaTemporality,
AddMetricSuffixes: true,
DisableTargetInfo: cfg.DisableTargetInfo,
AllowDeltaTemporality: cfg.AllowDeltaTemporality,
EnableTypeAndUnitLabels: cfg.EnableTypeAndUnitLabels,
}

var annots annotations.Annotations
Expand Down
Loading
Loading