Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
* [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727
* [FEATURE] Distributor: Add an experimental `-distributor.otlp.allow-delta-temporality` flag to ingest delta temporality otlp metrics. #6934
* [FEATURE] Query Frontend: Add dynamic interval size for query splitting. This is enabled by configuring experimental flags `querier.max-shards-per-query` and/or `querier.max-fetched-data-duration-per-query`. The split interval size is dynamically increased to maintain a number of shards and total duration fetched below the configured values. #6458
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
* [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3010,6 +3010,10 @@ otlp:
# https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)
# CLI flag: -distributor.otlp.disable-target-info
[disable_target_info: <boolean> | default = false]

# EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.
# CLI flag: -distributor.otlp.allow-delta-temporality
[allow_delta_temporality: <boolean> | default = false]
```

### `etcd_config`
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ Currently experimental features are:
- `store-gateway.sharding-ring.final-sleep` (duration) CLI flag
- `alertmanager-sharding-ring.final-sleep` (duration) CLI flag
- OTLP Receiver
- Ingest delta temporality OTLP metrics (`-distributor.otlp.allow-delta-temporality=true`)
- Persistent tokens in the Ruler Ring:
- `-ruler.ring.tokens-file-path` (path) CLI flag
- Native Histograms
Expand Down
8 changes: 4 additions & 4 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries, metadata []promp
return metrics
}

func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportRequest {
func otlpWriteRequest(name string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()

// Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
Expand All @@ -261,7 +261,7 @@ func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportReq
counterMetric.SetDescription("test-counter-description")

counterMetric.SetEmptySum()
counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
counterMetric.Sum().SetAggregationTemporality(temporality)

counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty()
counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
Expand All @@ -276,8 +276,8 @@ func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportReq
return pmetricotlp.NewExportRequestFromMetrics(d)
}

func (c *Client) OTLPPushExemplar(name string, labels ...prompb.Label) (*http.Response, error) {
data, err := otlpWriteRequest(name, labels...).MarshalProto()
func (c *Client) OTLPPushExemplar(name string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) (*http.Response, error) {
data, err := otlpWriteRequest(name, temporality, labels...).MarshalProto()
if err != nil {
return nil, err
}
Expand Down
63 changes: 59 additions & 4 deletions integration/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore/providers/s3"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
Expand Down Expand Up @@ -149,7 +150,7 @@ func TestOTLPIngestExemplar(t *testing.T) {
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

res, err := c.OTLPPushExemplar("exemplar_1")
res, err := c.OTLPPushExemplar("exemplar_1", pmetric.AggregationTemporalityCumulative)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

Expand Down Expand Up @@ -241,15 +242,15 @@ func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) {
{Name: "attr3", Value: "value"},
}

res, err := c1.OTLPPushExemplar("series_1", labels...)
res, err := c1.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c2.OTLPPushExemplar("series_1", labels...)
res, err = c2.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c3.OTLPPushExemplar("series_1", labels...)
res, err = c3.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

Expand All @@ -265,3 +266,57 @@ func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) {
require.NoError(t, err)
require.Equal(t, labelSet3, []string{"__name__", "attr1", "attr2", "attr3", "instance", "job"})
}

func TestOTLPPushDeltaTemporality(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-auth.enabled": "true",

// OTLP
"-distributor.otlp.allow-delta-temporality": "true",

// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
"-alertmanager-storage.backend": "local",
"-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"),
})

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks-local.yaml", cortexConfigFile))

// start cortex and assert runtime-config is loaded correctly
cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex", cortexConfigFile, flags, "", 9009, 9095)
require.NoError(t, s.StartAndWaitReady(cortex))

c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push some series to Cortex.
now := time.Now()

labels := []prompb.Label{
{Name: "service.name", Value: "test-service"},
{Name: "attr1", Value: "value"},
}

res, err := c.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityDelta, labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

value, err := c.Query("series_1", now)
require.NoError(t, err)
vector, ok := value.(model.Vector)
require.True(t, ok)
require.Equal(t, 1, len(vector))
}
6 changes: 4 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ type InstanceLimits struct {
}

type OTLPConfig struct {
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
DisableTargetInfo bool `yaml:"disable_target_info"`
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
DisableTargetInfo bool `yaml:"disable_target_info"`
AllowDeltaTemporality bool `yaml:"allow_delta_temporality"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -220,6 +221,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.")
f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)")
f.BoolVar(&cfg.OTLPConfig.AllowDeltaTemporality, "distributor.otlp.allow-delta-temporality", false, "EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.")
}

// Validate config and returns error on failure
Expand Down
12 changes: 6 additions & 6 deletions pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri

// otlp to prompb TimeSeries
promTsList, promMetadata, err := convertToPromTS(r.Context(), req.Metrics(), cfg, overrides, userID, logger)
if err != nil {
if err != nil && len(promTsList) == 0 {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand Down Expand Up @@ -178,8 +178,9 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, []prompb.MetricMetadata, error) {
promConverter := prometheusremotewrite.NewPrometheusConverter()
settings := prometheusremotewrite.Settings{
AddMetricSuffixes: true,
DisableTargetInfo: cfg.DisableTargetInfo,
AddMetricSuffixes: true,
DisableTargetInfo: cfg.DisableTargetInfo,
AllowDeltaTemporality: cfg.AllowDeltaTemporality,
}

var annots annotations.Annotations
Expand All @@ -200,11 +201,10 @@ func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distribu
}

if err != nil {
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
return nil, nil, err
level.Warn(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
}

return promConverter.TimeSeries(), promConverter.Metadata(), nil
return promConverter.TimeSeries(), promConverter.Metadata(), err
}

func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter {
Expand Down
Loading