diff --git a/CHANGELOG.md b/CHANGELOG.md index cfb1ec43241..0851e0540c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603 * [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718 * [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727 +* [FEATURE] Distributor: Add an experimental `-distributor.otlp.allow-delta-temporality` flag to ingest delta temporality otlp metrics. #6934 * [FEATURE] Query Frontend: Add dynamic interval size for query splitting. This is enabled by configuring experimental flags `querier.max-shards-per-query` and/or `querier.max-fetched-data-duration-per-query`. The split interval size is dynamically increased to maintain a number of shards and total duration fetched below the configured values. #6458 * [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526 * [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index acd03b3de78..f5d5aa2432f 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3010,6 +3010,10 @@ otlp: # https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems) # CLI flag: -distributor.otlp.disable-target-info [disable_target_info: | default = false] + + # EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested. + # CLI flag: -distributor.otlp.allow-delta-temporality + [allow_delta_temporality: | default = false] ``` ### `etcd_config` diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 700fbf5beb7..e06b92bd405 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -116,6 +116,7 @@ Currently experimental features are: - `store-gateway.sharding-ring.final-sleep` (duration) CLI flag - `alertmanager-sharding-ring.final-sleep` (duration) CLI flag - OTLP Receiver + - Ingest delta temporality OTLP metrics (`-distributor.otlp.allow-delta-temporality=true`) - Persistent tokens in the Ruler Ring: - `-ruler.ring.tokens-file-path` (path) CLI flag - Native Histograms diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 9067b60c078..6f872a460fc 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -236,7 +236,7 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries, metadata []promp return metrics } -func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportRequest { +func otlpWriteRequest(name string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) pmetricotlp.ExportRequest { d := pmetric.NewMetrics() // Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram @@ -261,7 +261,7 @@ func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportReq counterMetric.SetDescription("test-counter-description") counterMetric.SetEmptySum() - counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + counterMetric.Sum().SetAggregationTemporality(temporality) counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty() counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) @@ -276,8 +276,8 @@ func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportReq return pmetricotlp.NewExportRequestFromMetrics(d) } -func (c *Client) OTLPPushExemplar(name string, labels ...prompb.Label) (*http.Response, error) { - data, err := otlpWriteRequest(name, labels...).MarshalProto() +func (c *Client) OTLPPushExemplar(name string, temporality pmetric.AggregationTemporality, labels ...prompb.Label) (*http.Response, error) { + data, err := otlpWriteRequest(name, temporality, labels...).MarshalProto() if err != nil { return nil, err } diff --git a/integration/otlp_test.go b/integration/otlp_test.go index 7eda34e55ec..a2a40351330 100644 --- a/integration/otlp_test.go +++ b/integration/otlp_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore/providers/s3" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/cortexproject/cortex/integration/e2e" e2edb "github.com/cortexproject/cortex/integration/e2e/db" @@ -149,7 +150,7 @@ func TestOTLPIngestExemplar(t *testing.T) { c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) - res, err := c.OTLPPushExemplar("exemplar_1") + res, err := c.OTLPPushExemplar("exemplar_1", pmetric.AggregationTemporalityCumulative) require.NoError(t, err) require.Equal(t, 200, res.StatusCode) @@ -241,15 +242,15 @@ func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) { {Name: "attr3", Value: "value"}, } - res, err := c1.OTLPPushExemplar("series_1", labels...) + res, err := c1.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...) require.NoError(t, err) require.Equal(t, 200, res.StatusCode) - res, err = c2.OTLPPushExemplar("series_1", labels...) + res, err = c2.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...) require.NoError(t, err) require.Equal(t, 200, res.StatusCode) - res, err = c3.OTLPPushExemplar("series_1", labels...) + res, err = c3.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityCumulative, labels...) require.NoError(t, err) require.Equal(t, 200, res.StatusCode) @@ -265,3 +266,57 @@ func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) { require.NoError(t, err) require.Equal(t, labelSet3, []string{"__name__", "attr1", "attr2", "attr3", "instance", "job"}) } + +func TestOTLPPushDeltaTemporality(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-auth.enabled": "true", + + // OTLP + "-distributor.otlp.allow-delta-temporality": "true", + + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + "-alertmanager-storage.backend": "local", + "-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"), + }) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks-local.yaml", cortexConfigFile)) + + // start cortex and assert runtime-config is loaded correctly + cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex", cortexConfigFile, flags, "", 9009, 9095) + require.NoError(t, s.StartAndWaitReady(cortex)) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Push some series to Cortex. + now := time.Now() + + labels := []prompb.Label{ + {Name: "service.name", Value: "test-service"}, + {Name: "attr1", Value: "value"}, + } + + res, err := c.OTLPPushExemplar("series_1", pmetric.AggregationTemporalityDelta, labels...) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + value, err := c.Query("series_1", now) + require.NoError(t, err) + vector, ok := value.(model.Vector) + require.True(t, ok) + require.Equal(t, 1, len(vector)) +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 931bdbf98bd..50d601ae9b4 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -192,8 +192,9 @@ type InstanceLimits struct { } type OTLPConfig struct { - ConvertAllAttributes bool `yaml:"convert_all_attributes"` - DisableTargetInfo bool `yaml:"disable_target_info"` + ConvertAllAttributes bool `yaml:"convert_all_attributes"` + DisableTargetInfo bool `yaml:"disable_target_info"` + AllowDeltaTemporality bool `yaml:"allow_delta_temporality"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -220,6 +221,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.") f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)") + f.BoolVar(&cfg.OTLPConfig.AllowDeltaTemporality, "distributor.otlp.allow-delta-temporality", false, "EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.") } // Validate config and returns error on failure diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index 9fa05148abc..83468120656 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -67,7 +67,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri // otlp to prompb TimeSeries promTsList, promMetadata, err := convertToPromTS(r.Context(), req.Metrics(), cfg, overrides, userID, logger) - if err != nil { + if err != nil && len(promTsList) == 0 { http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -178,8 +178,9 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) ( func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, []prompb.MetricMetadata, error) { promConverter := prometheusremotewrite.NewPrometheusConverter() settings := prometheusremotewrite.Settings{ - AddMetricSuffixes: true, - DisableTargetInfo: cfg.DisableTargetInfo, + AddMetricSuffixes: true, + DisableTargetInfo: cfg.DisableTargetInfo, + AllowDeltaTemporality: cfg.AllowDeltaTemporality, } var annots annotations.Annotations @@ -200,11 +201,10 @@ func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distribu } if err != nil { - level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err) - return nil, nil, err + level.Warn(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err) } - return promConverter.TimeSeries(), promConverter.Metadata(), nil + return promConverter.TimeSeries(), promConverter.Metadata(), err } func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter { diff --git a/pkg/util/push/otlp_test.go b/pkg/util/push/otlp_test.go index de3b780e095..99932188fcd 100644 --- a/pkg/util/push/otlp_test.go +++ b/pkg/util/push/otlp_test.go @@ -4,9 +4,11 @@ import ( "bytes" "compress/gzip" "context" + "fmt" "io" "net/http" "net/http/httptest" + "sort" "testing" "time" @@ -25,6 +27,249 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) +func TestOTLP_AllowDeltaTemporality(t *testing.T) { + logger := log.NewNopLogger() + ctx := context.Background() + ts := time.Now() + + tests := []struct { + description string + allowDeltaTemporality bool + otlpSeries []pmetric.Metric + expectedSeries []prompb.TimeSeries + expectedMetadata []prompb.MetricMetadata + expectedErr string + }{ + { + description: "[allowDeltaTemporality: false] cumulative type should be converted", + allowDeltaTemporality: false, + otlpSeries: []pmetric.Metric{ + createOtelSum("test_1", pmetric.AggregationTemporalityCumulative, ts), + createOtelSum("test_2", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromFloatSeries("test_1", ts), + createPromFloatSeries("test_2", ts), + }, + expectedMetadata: []prompb.MetricMetadata{ + createPromMetadata("test_1", prompb.MetricMetadata_GAUGE), + createPromMetadata("test_2", prompb.MetricMetadata_GAUGE), + }, + }, + { + description: "[allowDeltaTemporality: false] delta type should not be converted", + allowDeltaTemporality: false, + otlpSeries: []pmetric.Metric{ + createOtelSum("test_1", pmetric.AggregationTemporalityDelta, ts), + createOtelSum("test_2", pmetric.AggregationTemporalityDelta, ts), + }, + expectedSeries: []prompb.TimeSeries{}, + expectedMetadata: []prompb.MetricMetadata{}, + expectedErr: `invalid temporality and type combination for metric "test_1"; invalid temporality and type combination for metric "test_2"`, + }, + { + description: "[allowDeltaTemporality: true] delta type should be converted", + allowDeltaTemporality: true, + otlpSeries: []pmetric.Metric{ + createOtelSum("test_1", pmetric.AggregationTemporalityDelta, ts), + createOtelSum("test_2", pmetric.AggregationTemporalityDelta, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromFloatSeries("test_1", ts), + createPromFloatSeries("test_2", ts), + }, + expectedMetadata: []prompb.MetricMetadata{ + createPromMetadata("test_1", prompb.MetricMetadata_UNKNOWN), + createPromMetadata("test_2", prompb.MetricMetadata_UNKNOWN), + }, + }, + { + description: "[allowDeltaTemporality: false] mixed delta and cumulative, should be converted only for cumulative type", + allowDeltaTemporality: false, + otlpSeries: []pmetric.Metric{ + createOtelSum("test_1", pmetric.AggregationTemporalityDelta, ts), + createOtelSum("test_2", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromFloatSeries("test_2", ts), + }, + expectedMetadata: []prompb.MetricMetadata{ + createPromMetadata("test_2", prompb.MetricMetadata_GAUGE), + }, + expectedErr: `invalid temporality and type combination for metric "test_1"`, + }, + { + description: "[allowDeltaTemporality: false, exponential histogram] cumulative histogram should be converted", + allowDeltaTemporality: false, + otlpSeries: []pmetric.Metric{ + createOtelExponentialHistogram("test_1", pmetric.AggregationTemporalityCumulative, ts), + createOtelExponentialHistogram("test_2", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromNativeHistogramSeries("test_1", prompb.Histogram_UNKNOWN, ts), + createPromNativeHistogramSeries("test_2", prompb.Histogram_UNKNOWN, ts), + }, + expectedMetadata: []prompb.MetricMetadata{ + createPromMetadata("test_1", prompb.MetricMetadata_HISTOGRAM), + createPromMetadata("test_2", prompb.MetricMetadata_HISTOGRAM), + }, + }, + { + description: "[allowDeltaTemporality: false, exponential histogram] delta histogram should not be converted", + allowDeltaTemporality: false, + otlpSeries: []pmetric.Metric{ + createOtelExponentialHistogram("test_1", pmetric.AggregationTemporalityDelta, ts), + createOtelExponentialHistogram("test_2", pmetric.AggregationTemporalityDelta, ts), + }, + expectedSeries: []prompb.TimeSeries{}, + expectedMetadata: []prompb.MetricMetadata{}, + expectedErr: `invalid temporality and type combination for metric "test_1"; invalid temporality and type combination for metric "test_2"`, + }, + { + description: "[allowDeltaTemporality: true, exponential histogram] delta histogram should be converted", + allowDeltaTemporality: true, + otlpSeries: []pmetric.Metric{ + createOtelExponentialHistogram("test_1", pmetric.AggregationTemporalityDelta, ts), + createOtelExponentialHistogram("test_2", pmetric.AggregationTemporalityDelta, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromNativeHistogramSeries("test_1", prompb.Histogram_GAUGE, ts), + createPromNativeHistogramSeries("test_2", prompb.Histogram_GAUGE, ts), + }, + expectedMetadata: []prompb.MetricMetadata{ + createPromMetadata("test_1", prompb.MetricMetadata_UNKNOWN), + createPromMetadata("test_2", prompb.MetricMetadata_UNKNOWN), + }, + }, + { + description: "[allowDeltaTemporality: false, exponential histogram] mixed delta and cumulative histogram, should be converted only for cumulative type", + allowDeltaTemporality: false, + otlpSeries: []pmetric.Metric{ + createOtelExponentialHistogram("test_1", pmetric.AggregationTemporalityDelta, ts), + createOtelExponentialHistogram("test_2", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromNativeHistogramSeries("test_2", prompb.Histogram_UNKNOWN, ts), + }, + expectedMetadata: []prompb.MetricMetadata{ + createPromMetadata("test_2", prompb.MetricMetadata_HISTOGRAM), + }, + expectedErr: `invalid temporality and type combination for metric "test_1"`, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + cfg := distributor.OTLPConfig{AllowDeltaTemporality: test.allowDeltaTemporality} + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + sm := rm.ScopeMetrics().AppendEmpty() + + for _, s := range test.otlpSeries { + s.CopyTo(sm.Metrics().AppendEmpty()) + } + + limits := validation.Limits{} + overrides := validation.NewOverrides(limits, nil) + promSeries, metadata, err := convertToPromTS(ctx, metrics, cfg, overrides, "user-1", logger) + require.Equal(t, sortTimeSeries(test.expectedSeries), sortTimeSeries(promSeries)) + require.Equal(t, test.expectedMetadata, metadata) + if test.expectedErr != "" { + require.Equal(t, test.expectedErr, err.Error()) + } else { + require.NoError(t, err) + } + + }) + } +} + +func createPromMetadata(name string, metadataType prompb.MetricMetadata_MetricType) prompb.MetricMetadata { + return prompb.MetricMetadata{ + Type: metadataType, + MetricFamilyName: name, + } +} + +// copied from: https://github.com/prometheus/prometheus/blob/v3.5.0/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +func sortTimeSeries(series []prompb.TimeSeries) []prompb.TimeSeries { + for i := range series { + sort.Slice(series[i].Labels, func(j, k int) bool { + return series[i].Labels[j].Name < series[i].Labels[k].Name + }) + } + + sort.Slice(series, func(i, j int) bool { + return fmt.Sprint(series[i].Labels) < fmt.Sprint(series[j].Labels) + }) + + return series +} + +// copied from: https://github.com/prometheus/prometheus/blob/v3.5.0/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +func createPromFloatSeries(name string, ts time.Time) prompb.TimeSeries { + return prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: name}, + {Name: "test_label", Value: "test_value"}, + }, + Samples: []prompb.Sample{{ + Value: 5, + Timestamp: ts.UnixMilli(), + }}, + } +} + +// copied from: https://github.com/prometheus/prometheus/blob/v3.5.0/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +func createOtelSum(name string, temporality pmetric.AggregationTemporality, ts time.Time) pmetric.Metric { + metrics := pmetric.NewMetricSlice() + m := metrics.AppendEmpty() + m.SetName(name) + sum := m.SetEmptySum() + sum.SetAggregationTemporality(temporality) + dp := sum.DataPoints().AppendEmpty() + dp.SetDoubleValue(5) + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + dp.Attributes().PutStr("test_label", "test_value") + return m +} + +// copied from: https://github.com/prometheus/prometheus/blob/v3.5.0/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +func createOtelExponentialHistogram(name string, temporality pmetric.AggregationTemporality, ts time.Time) pmetric.Metric { + metrics := pmetric.NewMetricSlice() + m := metrics.AppendEmpty() + m.SetName(name) + hist := m.SetEmptyExponentialHistogram() + hist.SetAggregationTemporality(temporality) + dp := hist.DataPoints().AppendEmpty() + dp.SetCount(1) + dp.SetSum(5) + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + dp.Attributes().PutStr("test_label", "test_value") + return m +} + +// copied from: https://github.com/prometheus/prometheus/blob/v3.5.0/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +func createPromNativeHistogramSeries(name string, hint prompb.Histogram_ResetHint, ts time.Time) prompb.TimeSeries { + return prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: name}, + {Name: "test_label", Value: "test_value"}, + }, + Histograms: []prompb.Histogram{ + { + Count: &prompb.Histogram_CountInt{CountInt: 1}, + Sum: 5, + Schema: 0, + ZeroThreshold: 1e-128, + ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: 0}, + Timestamp: ts.UnixMilli(), + ResetHint: hint, + }, + }, + } +} + func TestOTLPConvertToPromTS(t *testing.T) { logger := log.NewNopLogger() ctx := context.Background()