From 87d499b02dc6a630f0fbd840dea625b2df3b4151 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 27 Oct 2025 15:52:41 +0900 Subject: [PATCH 1/5] Add NH count validation Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + pkg/util/validation/errors.go | 56 +++++++++++++++++++ pkg/util/validation/validate.go | 83 +++++++++++++++++++++++++++- pkg/util/validation/validate_test.go | 74 +++++++++++++++++++++++++ 4 files changed, 213 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index acf99bb22b7..89ee730c046 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ * [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873 * [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893 * [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978 +* [ENHANCEMENT] Distributor: Add count validations for native histogram. #7072 * [ENHANCEMENT] Upgrade the Prometheus version to 3.6.0 and add a `-name-validation-scheme` flag to support UTF-8. #7040 #7056 * [ENHANCEMENT] Distributor: Emit an error with a 400 status code when empty labels are found before the relabelling or label dropping process. #7052 * [ENHANCEMENT] Parquet Storage: Add support for additional sort columns during Parquet file generation #7003 diff --git a/pkg/util/validation/errors.go b/pkg/util/validation/errors.go index 11ade9c93a5..22ec798378a 100644 --- a/pkg/util/validation/errors.go +++ b/pkg/util/validation/errors.go @@ -282,6 +282,62 @@ func (e *nativeHistogramSampleSizeBytesExceededError) Error() string { return fmt.Sprintf("native histogram sample size bytes exceeded for metric (actual: %d, limit: %d) metric: %.200q", e.nhSampleSizeBytes, e.limit, formatLabelSet(e.series)) } +// nativeHistogramMisMatchCountError is a ValidationError implementation for native histogram +// where the Count does not match the sum of observations in the buckets. +type nativeHistogramMisMatchCountError struct { + series []cortexpb.LabelAdapter + observations uint64 + count uint64 +} + +func newNativeHistogramMisMatchedCountError(series []cortexpb.LabelAdapter, observations, count uint64) ValidationError { + return &nativeHistogramMisMatchCountError{ + series: series, + observations: observations, + count: count, + } +} + +func (e *nativeHistogramMisMatchCountError) Error() string { + return fmt.Sprintf("native histogram bucket count mismatch: count is %d, but observations found in buckets is %d, metric: %.200q", e.count, e.observations, formatLabelSet(e.series)) +} + +// nativeHistogramNegativeCountError is a ValidationError implementation for float native histogram +// where the Count field is negative. +type nativeHistogramNegativeCountError struct { + series []cortexpb.LabelAdapter + count float64 +} + +func newNativeHistogramNegativeCountError(series []cortexpb.LabelAdapter, count float64) ValidationError { + return &nativeHistogramNegativeCountError{ + series: series, + count: count, + } +} + +func (e *nativeHistogramNegativeCountError) Error() string { + return fmt.Sprintf("native histogram observation count %.2f is negative, metric: %.200q", e.count, formatLabelSet(e.series)) +} + +// nativeHistogramNegativeBucketCountError is a ValidationError implementation for float native histogram +// where the count in buckets is negative. +type nativeHistogramNegativeBucketCountError struct { + series []cortexpb.LabelAdapter + count float64 +} + +func newNativeHistogramNegativeBucketCountError(series []cortexpb.LabelAdapter, count float64) ValidationError { + return &nativeHistogramNegativeBucketCountError{ + series: series, + count: count, + } +} + +func (e *nativeHistogramNegativeBucketCountError) Error() string { + return fmt.Sprintf("native histogram buckets have a negative count: %.2f, metric: %.200q", e.count, formatLabelSet(e.series)) +} + // formatLabelSet formats label adapters as a metric name with labels, while preserving // label order, and keeping duplicates. If there are multiple "__name__" labels, only // first one is used as metric name, other ones will be included as regular labels. diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 436cf3dfab3..1993c8444bb 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -2,6 +2,8 @@ package validation import ( "errors" + "fmt" + "math" "net/http" "strings" "time" @@ -60,6 +62,9 @@ const ( nativeHistogramBucketCountLimitExceeded = "native_histogram_buckets_exceeded" nativeHistogramInvalidSchema = "native_histogram_invalid_schema" nativeHistogramSampleSizeBytesExceeded = "native_histogram_sample_size_bytes_exceeded" + nativeHistogramNegativeCount = "native_histogram_negative_count" + nativeHistogramNegativeBucketCount = "native_histogram_negative_bucket_count" + nativeHistogramMisMatchCount = "native_histogram_mismatch_count" // RateLimited is one of the values for the reason to discard samples. // Declared here to avoid duplication in ingester and distributor. @@ -368,7 +373,6 @@ func ValidateMetadata(validateMetrics *ValidateMetrics, cfg *Limits, userID stri } func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, userID string, ls []cortexpb.LabelAdapter, histogramSample cortexpb.Histogram) (cortexpb.Histogram, error) { - // sample size validation for native histogram if limits.MaxNativeHistogramSampleSizeBytes > 0 && histogramSample.Size() > limits.MaxNativeHistogramSampleSizeBytes { validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramSampleSizeBytesExceeded, userID).Inc() @@ -381,6 +385,56 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u return cortexpb.Histogram{}, newNativeHistogramSchemaInvalidError(ls, int(histogramSample.Schema)) } + var nCount, pCount uint64 + if histogramSample.IsFloatHistogram() { + if err, c := checkHistogramBuckets(histogramSample.GetNegativeCounts(), &nCount, false); err != nil { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() + return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, c) + } + + if err, c := checkHistogramBuckets(histogramSample.GetPositiveCounts(), &pCount, false); err != nil { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() + return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, c) + } + + if histogramSample.GetZeroCountFloat() < 0 { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() + return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, histogramSample.GetZeroCountFloat()) + } + + if histogramSample.GetCountFloat() < 0 { + // validate if float histogram has negative count + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeCount, userID).Inc() + return cortexpb.Histogram{}, newNativeHistogramNegativeCountError(ls, histogramSample.GetCountFloat()) + } + } else { + if err, c := checkHistogramBuckets(histogramSample.GetNegativeDeltas(), &nCount, true); err != nil { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() + return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, c) + } + + if err, c := checkHistogramBuckets(histogramSample.GetPositiveDeltas(), &pCount, true); err != nil { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() + return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, c) + } + + // validate if there is mismatch between count with observations in buckets + observations := nCount + pCount + histogramSample.GetZeroCountInt() + count := histogramSample.GetCountInt() + + if math.IsNaN(histogramSample.Sum) { + if observations > count { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramMisMatchCount, userID).Inc() + return cortexpb.Histogram{}, newNativeHistogramMisMatchedCountError(ls, observations, count) + } + } else { + if observations != count { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramMisMatchCount, userID).Inc() + return cortexpb.Histogram{}, newNativeHistogramMisMatchedCountError(ls, observations, count) + } + } + } + if limits.MaxNativeHistogramBuckets == 0 { return histogramSample, nil } @@ -388,6 +442,7 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u var ( exceedLimit bool ) + if histogramSample.IsFloatHistogram() { // Initial check to see if the bucket limit is exceeded or not. If not, we can avoid type casting. exceedLimit = len(histogramSample.PositiveCounts)+len(histogramSample.NegativeCounts) > limits.MaxNativeHistogramBuckets @@ -399,6 +454,7 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc() return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets) } + fh := cortexpb.FloatHistogramProtoToFloatHistogram(histogramSample) oBuckets := len(fh.PositiveBuckets) + len(fh.NegativeBuckets) for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > limits.MaxNativeHistogramBuckets { @@ -437,10 +493,35 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u if oBuckets != len(h.PositiveBuckets)+len(h.NegativeBuckets) { validateMetrics.HistogramSamplesReducedResolution.WithLabelValues(userID).Inc() } + // If resolution reduced, convert new histogram to protobuf type again. return cortexpb.HistogramToHistogramProto(histogramSample.TimestampMs, h), nil } +// copy from https://github.com/prometheus/prometheus/blob/v3.6.0/model/histogram/generic.go#L399-L420 +func checkHistogramBuckets[BC histogram.BucketCount, IBC histogram.InternalBucketCount](buckets []IBC, count *BC, deltas bool) (error, float64) { + if len(buckets) == 0 { + return nil, 0 + } + + var last IBC + for i := range buckets { + var c IBC + if deltas { + c = last + buckets[i] + } else { + c = buckets[i] + } + if c < 0 { + return fmt.Errorf("bucket number %d has observation count of %v", i+1, c), float64(c) + } + last = c + *count += BC(c) + } + + return nil, 0 +} + func DeletePerUserValidationMetrics(validateMetrics *ValidateMetrics, userID string, log log.Logger) { filter := map[string]string{"user": userID} diff --git a/pkg/util/validation/validate_test.go b/pkg/util/validation/validate_test.go index 861934eb667..aa375ee28fb 100644 --- a/pkg/util/validation/validate_test.go +++ b/pkg/util/validation/validate_test.go @@ -1,6 +1,7 @@ package validation import ( + "math" "net/http" "strings" "testing" @@ -412,6 +413,31 @@ func TestValidateNativeHistogram(t *testing.T) { exceedMaxRangeSchemaFloatHistogram.Schema = 20 exceedMaxSampleSizeBytesLimitFloatHistogram := tsdbutil.GenerateTestFloatHistogram(100) + negativeBucketCountInNBucketsFH := tsdbutil.GenerateTestFloatHistogram(0) + negativeBucketCountInNBucketsFH.NegativeBuckets = []float64{-1.1, -1.2, -1.3, -1.4} + + negativeBucketCountInPBucketsFH := tsdbutil.GenerateTestFloatHistogram(0) + negativeBucketCountInPBucketsFH.PositiveBuckets = []float64{-1.1, -1.2, -1.3, -1.4} + + negativeCountFloatHistogram := tsdbutil.GenerateTestFloatHistogram(0) + negativeCountFloatHistogram.Count = -1.2345 + + negativeZeroCountFloatHistogram := tsdbutil.GenerateTestFloatHistogram(0) + negativeZeroCountFloatHistogram.ZeroCount = -1.2345 + + negativeBucketCountInNBucketsH := tsdbutil.GenerateTestHistogram(0) + negativeBucketCountInNBucketsH.NegativeBuckets = []int64{-1, -2, -3, -4} + + negativeBucketCountInPBucketsH := tsdbutil.GenerateTestHistogram(0) + negativeBucketCountInPBucketsH.PositiveBuckets = []int64{-1, -2, -3, -4} + + countMisMatchSumIsNaN := tsdbutil.GenerateTestHistogram(0) + countMisMatchSumIsNaN.Sum = math.NaN() + countMisMatchSumIsNaN.Count = 11 + + countMisMatch := tsdbutil.GenerateTestHistogram(0) + countMisMatch.Count = 11 + for _, tc := range []struct { name string bucketLimit int @@ -525,6 +551,54 @@ func TestValidateNativeHistogram(t *testing.T) { discardedSampleLabelValue: nativeHistogramSampleSizeBytesExceeded, maxNativeHistogramSampleSizeBytesLimit: 100, }, + { + name: "negative observations count in negative buckets for float native histogram", + histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeBucketCountInNBucketsFH.Copy()), + expectedErr: newNativeHistogramNegativeBucketCountError(lbls, negativeBucketCountInNBucketsFH.NegativeBuckets[0]), + discardedSampleLabelValue: nativeHistogramNegativeBucketCount, + }, + { + name: "negative observations count in positive buckets for float native histogram", + histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeBucketCountInPBucketsFH.Copy()), + expectedErr: newNativeHistogramNegativeBucketCountError(lbls, negativeBucketCountInPBucketsFH.PositiveBuckets[0]), + discardedSampleLabelValue: nativeHistogramNegativeBucketCount, + }, + { + name: "count is negative for float native histogram", + histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeCountFloatHistogram.Copy()), + expectedErr: newNativeHistogramNegativeCountError(lbls, negativeCountFloatHistogram.Count), + discardedSampleLabelValue: nativeHistogramNegativeCount, + }, + { + name: "zero count is negative for float native histogram", + histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeZeroCountFloatHistogram.Copy()), + expectedErr: newNativeHistogramNegativeBucketCountError(lbls, negativeZeroCountFloatHistogram.ZeroCount), + discardedSampleLabelValue: nativeHistogramNegativeBucketCount, + }, + { + name: "negative observations count in negative buckets for native histogram", + histogram: cortexpb.HistogramToHistogramProto(0, negativeBucketCountInNBucketsH.Copy()), + expectedErr: newNativeHistogramNegativeBucketCountError(lbls, float64(negativeBucketCountInNBucketsH.NegativeBuckets[0])), + discardedSampleLabelValue: nativeHistogramNegativeBucketCount, + }, + { + name: "negative observations count in positive buckets for native histogram", + histogram: cortexpb.HistogramToHistogramProto(0, negativeBucketCountInPBucketsH.Copy()), + expectedErr: newNativeHistogramNegativeBucketCountError(lbls, float64(negativeBucketCountInPBucketsH.PositiveBuckets[0])), + discardedSampleLabelValue: nativeHistogramNegativeBucketCount, + }, + { + name: "mismatch between observations count with count field when sum is NaN", + histogram: cortexpb.HistogramToHistogramProto(0, countMisMatchSumIsNaN.Copy()), + expectedErr: newNativeHistogramMisMatchedCountError(lbls, 12, 11), + discardedSampleLabelValue: nativeHistogramMisMatchCount, + }, + { + name: "mismatch between observations count with count field", + histogram: cortexpb.HistogramToHistogramProto(0, countMisMatch.Copy()), + expectedErr: newNativeHistogramMisMatchedCountError(lbls, 12, 11), + discardedSampleLabelValue: nativeHistogramMisMatchCount, + }, } { t.Run(tc.name, func(t *testing.T) { reg := prometheus.NewRegistry() From ff81b054308234aecde7ab0fec9752d9c3f39f4b Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 27 Oct 2025 16:09:01 +0900 Subject: [PATCH 2/5] fix lint Signed-off-by: SungJin1212 --- pkg/util/validation/validate.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 1993c8444bb..220b0b2272e 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -387,12 +387,12 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u var nCount, pCount uint64 if histogramSample.IsFloatHistogram() { - if err, c := checkHistogramBuckets(histogramSample.GetNegativeCounts(), &nCount, false); err != nil { + if c, err := checkHistogramBuckets(histogramSample.GetNegativeCounts(), &nCount, false); err != nil { validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, c) } - if err, c := checkHistogramBuckets(histogramSample.GetPositiveCounts(), &pCount, false); err != nil { + if c, err := checkHistogramBuckets(histogramSample.GetPositiveCounts(), &pCount, false); err != nil { validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, c) } @@ -408,12 +408,12 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u return cortexpb.Histogram{}, newNativeHistogramNegativeCountError(ls, histogramSample.GetCountFloat()) } } else { - if err, c := checkHistogramBuckets(histogramSample.GetNegativeDeltas(), &nCount, true); err != nil { + if c, err := checkHistogramBuckets(histogramSample.GetNegativeDeltas(), &nCount, true); err != nil { validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, c) } - if err, c := checkHistogramBuckets(histogramSample.GetPositiveDeltas(), &pCount, true); err != nil { + if c, err := checkHistogramBuckets(histogramSample.GetPositiveDeltas(), &pCount, true); err != nil { validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, c) } @@ -499,9 +499,9 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u } // copy from https://github.com/prometheus/prometheus/blob/v3.6.0/model/histogram/generic.go#L399-L420 -func checkHistogramBuckets[BC histogram.BucketCount, IBC histogram.InternalBucketCount](buckets []IBC, count *BC, deltas bool) (error, float64) { +func checkHistogramBuckets[BC histogram.BucketCount, IBC histogram.InternalBucketCount](buckets []IBC, count *BC, deltas bool) (float64, error) { if len(buckets) == 0 { - return nil, 0 + return 0, nil } var last IBC @@ -513,13 +513,13 @@ func checkHistogramBuckets[BC histogram.BucketCount, IBC histogram.InternalBucke c = buckets[i] } if c < 0 { - return fmt.Errorf("bucket number %d has observation count of %v", i+1, c), float64(c) + return float64(c), fmt.Errorf("bucket number %d has observation count of %v", i+1, c) } last = c *count += BC(c) } - return nil, 0 + return 0, nil } func DeletePerUserValidationMetrics(validateMetrics *ValidateMetrics, userID string, log log.Logger) { From 2602c78084ac5e483a326a6ce978a48ce0ba0c68 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Sun, 2 Nov 2025 07:53:48 +0900 Subject: [PATCH 3/5] move changelog Signed-off-by: SungJin1212 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89ee730c046..4cee4373006 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063 * [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074 +* [ENHANCEMENT] Distributor: Add count validations for native histogram. #7072 * [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082 ## 1.20.0 in progress @@ -31,7 +32,6 @@ * [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873 * [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893 * [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978 -* [ENHANCEMENT] Distributor: Add count validations for native histogram. #7072 * [ENHANCEMENT] Upgrade the Prometheus version to 3.6.0 and add a `-name-validation-scheme` flag to support UTF-8. #7040 #7056 * [ENHANCEMENT] Distributor: Emit an error with a 400 status code when empty labels are found before the relabelling or label dropping process. #7052 * [ENHANCEMENT] Parquet Storage: Add support for additional sort columns during Parquet file generation #7003 From 87779d204d2ae05dfb62a2ad7a47aa8a1a83680f Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 3 Nov 2025 19:52:12 +0900 Subject: [PATCH 4/5] Use prometheus NH validate Signed-off-by: SungJin1212 --- CHANGELOG.md | 2 +- pkg/util/validation/errors.go | 58 +++------------ pkg/util/validation/validate.go | 86 ++++++--------------- pkg/util/validation/validate_test.go | 107 +++++++++++++++++++++------ 4 files changed, 118 insertions(+), 135 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cee4373006..1cf0e7a9c94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ * [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063 * [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074 -* [ENHANCEMENT] Distributor: Add count validations for native histogram. #7072 +* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072 * [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082 ## 1.20.0 in progress diff --git a/pkg/util/validation/errors.go b/pkg/util/validation/errors.go index 22ec798378a..2efc077655a 100644 --- a/pkg/util/validation/errors.go +++ b/pkg/util/validation/errors.go @@ -282,60 +282,20 @@ func (e *nativeHistogramSampleSizeBytesExceededError) Error() string { return fmt.Sprintf("native histogram sample size bytes exceeded for metric (actual: %d, limit: %d) metric: %.200q", e.nhSampleSizeBytes, e.limit, formatLabelSet(e.series)) } -// nativeHistogramMisMatchCountError is a ValidationError implementation for native histogram -// where the Count does not match the sum of observations in the buckets. -type nativeHistogramMisMatchCountError struct { - series []cortexpb.LabelAdapter - observations uint64 - count uint64 -} - -func newNativeHistogramMisMatchedCountError(series []cortexpb.LabelAdapter, observations, count uint64) ValidationError { - return &nativeHistogramMisMatchCountError{ - series: series, - observations: observations, - count: count, - } -} - -func (e *nativeHistogramMisMatchCountError) Error() string { - return fmt.Sprintf("native histogram bucket count mismatch: count is %d, but observations found in buckets is %d, metric: %.200q", e.count, e.observations, formatLabelSet(e.series)) -} - -// nativeHistogramNegativeCountError is a ValidationError implementation for float native histogram -// where the Count field is negative. -type nativeHistogramNegativeCountError struct { - series []cortexpb.LabelAdapter - count float64 -} - -func newNativeHistogramNegativeCountError(series []cortexpb.LabelAdapter, count float64) ValidationError { - return &nativeHistogramNegativeCountError{ - series: series, - count: count, - } -} - -func (e *nativeHistogramNegativeCountError) Error() string { - return fmt.Sprintf("native histogram observation count %.2f is negative, metric: %.200q", e.count, formatLabelSet(e.series)) -} - -// nativeHistogramNegativeBucketCountError is a ValidationError implementation for float native histogram -// where the count in buckets is negative. -type nativeHistogramNegativeBucketCountError struct { - series []cortexpb.LabelAdapter - count float64 +type nativeHistogramInvalidError struct { + nhValidationErr error + series []cortexpb.LabelAdapter } -func newNativeHistogramNegativeBucketCountError(series []cortexpb.LabelAdapter, count float64) ValidationError { - return &nativeHistogramNegativeBucketCountError{ - series: series, - count: count, +func newNativeHistogramInvalidError(series []cortexpb.LabelAdapter, nhValidationErr error) ValidationError { + return &nativeHistogramInvalidError{ + series: series, + nhValidationErr: nhValidationErr, } } -func (e *nativeHistogramNegativeBucketCountError) Error() string { - return fmt.Sprintf("native histogram buckets have a negative count: %.2f, metric: %.200q", e.count, formatLabelSet(e.series)) +func (e *nativeHistogramInvalidError) Error() string { + return fmt.Sprintf("invalid native histogram, validation err: %v, metric: %.200q", e.nhValidationErr, formatLabelSet(e.series)) } // formatLabelSet formats label adapters as a metric name with labels, while preserving diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 220b0b2272e..8a58913ec9c 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -3,7 +3,6 @@ package validation import ( "errors" "fmt" - "math" "net/http" "strings" "time" @@ -62,9 +61,7 @@ const ( nativeHistogramBucketCountLimitExceeded = "native_histogram_buckets_exceeded" nativeHistogramInvalidSchema = "native_histogram_invalid_schema" nativeHistogramSampleSizeBytesExceeded = "native_histogram_sample_size_bytes_exceeded" - nativeHistogramNegativeCount = "native_histogram_negative_count" - nativeHistogramNegativeBucketCount = "native_histogram_negative_bucket_count" - nativeHistogramMisMatchCount = "native_histogram_mismatch_count" + nativeHistogramInvalid = "native_histogram_invalid" // RateLimited is one of the values for the reason to discard samples. // Declared here to avoid duplication in ingester and distributor. @@ -385,77 +382,33 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u return cortexpb.Histogram{}, newNativeHistogramSchemaInvalidError(ls, int(histogramSample.Schema)) } - var nCount, pCount uint64 - if histogramSample.IsFloatHistogram() { - if c, err := checkHistogramBuckets(histogramSample.GetNegativeCounts(), &nCount, false); err != nil { - validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() - return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, c) - } - - if c, err := checkHistogramBuckets(histogramSample.GetPositiveCounts(), &pCount, false); err != nil { - validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() - return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, c) - } - - if histogramSample.GetZeroCountFloat() < 0 { - validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() - return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, histogramSample.GetZeroCountFloat()) - } - - if histogramSample.GetCountFloat() < 0 { - // validate if float histogram has negative count - validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeCount, userID).Inc() - return cortexpb.Histogram{}, newNativeHistogramNegativeCountError(ls, histogramSample.GetCountFloat()) - } - } else { - if c, err := checkHistogramBuckets(histogramSample.GetNegativeDeltas(), &nCount, true); err != nil { - validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() - return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, c) - } - - if c, err := checkHistogramBuckets(histogramSample.GetPositiveDeltas(), &pCount, true); err != nil { - validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramNegativeBucketCount, userID).Inc() - return cortexpb.Histogram{}, newNativeHistogramNegativeBucketCountError(ls, c) - } - - // validate if there is mismatch between count with observations in buckets - observations := nCount + pCount + histogramSample.GetZeroCountInt() - count := histogramSample.GetCountInt() - - if math.IsNaN(histogramSample.Sum) { - if observations > count { - validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramMisMatchCount, userID).Inc() - return cortexpb.Histogram{}, newNativeHistogramMisMatchedCountError(ls, observations, count) - } - } else { - if observations != count { - validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramMisMatchCount, userID).Inc() - return cortexpb.Histogram{}, newNativeHistogramMisMatchedCountError(ls, observations, count) - } - } - } - - if limits.MaxNativeHistogramBuckets == 0 { - return histogramSample, nil - } - var ( exceedLimit bool ) if histogramSample.IsFloatHistogram() { - // Initial check to see if the bucket limit is exceeded or not. If not, we can avoid type casting. + fh := cortexpb.FloatHistogramProtoToFloatHistogram(histogramSample) + if err := fh.Validate(); err != nil { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramInvalid, userID).Inc() + return cortexpb.Histogram{}, newNativeHistogramInvalidError(ls, err) + } + + // limit check + if limits.MaxNativeHistogramBuckets == 0 { + return histogramSample, nil + } + exceedLimit = len(histogramSample.PositiveCounts)+len(histogramSample.NegativeCounts) > limits.MaxNativeHistogramBuckets if !exceedLimit { return histogramSample, nil } + // Exceed limit. if histogramSample.Schema <= histogram.ExponentialSchemaMin { validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc() return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets) } - fh := cortexpb.FloatHistogramProtoToFloatHistogram(histogramSample) oBuckets := len(fh.PositiveBuckets) + len(fh.NegativeBuckets) for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > limits.MaxNativeHistogramBuckets { if fh.Schema <= histogram.ExponentialSchemaMin { @@ -471,7 +424,17 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u return cortexpb.FloatHistogramToHistogramProto(histogramSample.TimestampMs, fh), nil } - // Initial check to see if bucket limit is exceeded or not. If not, we can avoid type casting. + h := cortexpb.HistogramProtoToHistogram(histogramSample) + if err := h.Validate(); err != nil { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramInvalid, userID).Inc() + return cortexpb.Histogram{}, newNativeHistogramInvalidError(ls, err) + } + + // limit check + if limits.MaxNativeHistogramBuckets == 0 { + return histogramSample, nil + } + exceedLimit = len(histogramSample.PositiveDeltas)+len(histogramSample.NegativeDeltas) > limits.MaxNativeHistogramBuckets if !exceedLimit { return histogramSample, nil @@ -481,7 +444,6 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc() return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets) } - h := cortexpb.HistogramProtoToHistogram(histogramSample) oBuckets := len(h.PositiveBuckets) + len(h.NegativeBuckets) for len(h.PositiveBuckets)+len(h.NegativeBuckets) > limits.MaxNativeHistogramBuckets { if h.Schema <= histogram.ExponentialSchemaMin { diff --git a/pkg/util/validation/validate_test.go b/pkg/util/validation/validate_test.go index aa375ee28fb..dca9d7d1863 100644 --- a/pkg/util/validation/validate_test.go +++ b/pkg/util/validation/validate_test.go @@ -1,6 +1,7 @@ package validation import ( + "errors" "math" "net/http" "strings" @@ -413,6 +414,18 @@ func TestValidateNativeHistogram(t *testing.T) { exceedMaxRangeSchemaFloatHistogram.Schema = 20 exceedMaxSampleSizeBytesLimitFloatHistogram := tsdbutil.GenerateTestFloatHistogram(100) + bucketNumMisMatchInPSpanFH := tsdbutil.GenerateTestFloatHistogram(0) + bucketNumMisMatchInPSpanFH.PositiveSpans[0].Length = 3 + + negativeSpanOffsetInPSpansFH := tsdbutil.GenerateTestFloatHistogram(0) + negativeSpanOffsetInPSpansFH.PositiveSpans[1].Offset = -1 + + bucketNumMisMatchInNSpanFH := tsdbutil.GenerateTestFloatHistogram(0) + bucketNumMisMatchInNSpanFH.NegativeSpans[0].Length = 3 + + negativeSpanOffsetInNSpansFH := tsdbutil.GenerateTestFloatHistogram(0) + negativeSpanOffsetInNSpansFH.NegativeSpans[1].Offset = -1 + negativeBucketCountInNBucketsFH := tsdbutil.GenerateTestFloatHistogram(0) negativeBucketCountInNBucketsFH.NegativeBuckets = []float64{-1.1, -1.2, -1.3, -1.4} @@ -425,6 +438,18 @@ func TestValidateNativeHistogram(t *testing.T) { negativeZeroCountFloatHistogram := tsdbutil.GenerateTestFloatHistogram(0) negativeZeroCountFloatHistogram.ZeroCount = -1.2345 + bucketNumMisMatchInPSpanH := tsdbutil.GenerateTestHistogram(0) + bucketNumMisMatchInPSpanH.PositiveSpans[0].Length = 3 + + negativeSpanOffsetInPSpansH := tsdbutil.GenerateTestHistogram(0) + negativeSpanOffsetInPSpansH.PositiveSpans[1].Offset = -1 + + bucketNumMisMatchInNSpanH := tsdbutil.GenerateTestHistogram(0) + bucketNumMisMatchInNSpanH.NegativeSpans[0].Length = 3 + + negativeSpanOffsetInNSpansH := tsdbutil.GenerateTestHistogram(0) + negativeSpanOffsetInNSpansH.NegativeSpans[1].Offset = -1 + negativeBucketCountInNBucketsH := tsdbutil.GenerateTestHistogram(0) negativeBucketCountInNBucketsH.NegativeBuckets = []int64{-1, -2, -3, -4} @@ -551,53 +576,89 @@ func TestValidateNativeHistogram(t *testing.T) { discardedSampleLabelValue: nativeHistogramSampleSizeBytesExceeded, maxNativeHistogramSampleSizeBytesLimit: 100, }, + { + name: "bucket number mismatch in positive spans for float native histogram", + histogram: cortexpb.FloatHistogramToHistogramProto(0, bucketNumMisMatchInPSpanFH.Copy()), + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("positive side: spans need 5 buckets, have 4 buckets: histogram spans specify different number of buckets than provided")), + discardedSampleLabelValue: nativeHistogramInvalid, + }, + { + name: "negative span offset found in positive spans for float native histogram", + histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeSpanOffsetInPSpansFH.Copy()), + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("positive side: span number 2 with offset -1: histogram has a span whose offset is negative")), + discardedSampleLabelValue: nativeHistogramInvalid, + }, + { + name: "bucket number mismatch in negative spans for float native histogram", + histogram: cortexpb.FloatHistogramToHistogramProto(0, bucketNumMisMatchInNSpanFH.Copy()), + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("negative side: spans need 5 buckets, have 4 buckets: histogram spans specify different number of buckets than provided")), + discardedSampleLabelValue: nativeHistogramInvalid, + }, + { + name: "negative spans offset found in negative spans for float native histogram", + histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeSpanOffsetInNSpansFH.Copy()), + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("negative side: span number 2 with offset -1: histogram has a span whose offset is negative")), + discardedSampleLabelValue: nativeHistogramInvalid, + }, { name: "negative observations count in negative buckets for float native histogram", histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeBucketCountInNBucketsFH.Copy()), - expectedErr: newNativeHistogramNegativeBucketCountError(lbls, negativeBucketCountInNBucketsFH.NegativeBuckets[0]), - discardedSampleLabelValue: nativeHistogramNegativeBucketCount, + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("negative side: bucket number 1 has observation count of -1.1: histogram has a bucket whose observation count is negative")), + discardedSampleLabelValue: nativeHistogramInvalid, + }, + { + name: "negative observations count in positive buckets for native histogram", + histogram: cortexpb.HistogramToHistogramProto(0, negativeBucketCountInPBucketsH.Copy()), + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("positive side: bucket number 1 has observation count of -1: histogram has a bucket whose observation count is negative")), + discardedSampleLabelValue: nativeHistogramInvalid, + }, + { + name: "bucket number mismatch in positive spans for native histogram", + histogram: cortexpb.HistogramToHistogramProto(0, bucketNumMisMatchInPSpanH.Copy()), + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("positive side: spans need 5 buckets, have 4 buckets: histogram spans specify different number of buckets than provided")), + discardedSampleLabelValue: nativeHistogramInvalid, }, { - name: "negative observations count in positive buckets for float native histogram", - histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeBucketCountInPBucketsFH.Copy()), - expectedErr: newNativeHistogramNegativeBucketCountError(lbls, negativeBucketCountInPBucketsFH.PositiveBuckets[0]), - discardedSampleLabelValue: nativeHistogramNegativeBucketCount, + name: "negative span offset found in positive spans for native histogram", + histogram: cortexpb.HistogramToHistogramProto(0, negativeSpanOffsetInPSpansH.Copy()), + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("positive side: span number 2 with offset -1: histogram has a span whose offset is negative")), + discardedSampleLabelValue: nativeHistogramInvalid, }, { - name: "count is negative for float native histogram", - histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeCountFloatHistogram.Copy()), - expectedErr: newNativeHistogramNegativeCountError(lbls, negativeCountFloatHistogram.Count), - discardedSampleLabelValue: nativeHistogramNegativeCount, + name: "bucket number mismatch in negative spans for native histogram", + histogram: cortexpb.HistogramToHistogramProto(0, bucketNumMisMatchInNSpanH.Copy()), + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("negative side: spans need 5 buckets, have 4 buckets: histogram spans specify different number of buckets than provided")), + discardedSampleLabelValue: nativeHistogramInvalid, }, { - name: "zero count is negative for float native histogram", - histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeZeroCountFloatHistogram.Copy()), - expectedErr: newNativeHistogramNegativeBucketCountError(lbls, negativeZeroCountFloatHistogram.ZeroCount), - discardedSampleLabelValue: nativeHistogramNegativeBucketCount, + name: "negative spans offset found in negative spans for native histogram", + histogram: cortexpb.FloatHistogramToHistogramProto(0, negativeSpanOffsetInNSpansFH.Copy()), + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("negative side: span number 2 with offset -1: histogram has a span whose offset is negative")), + discardedSampleLabelValue: nativeHistogramInvalid, }, { name: "negative observations count in negative buckets for native histogram", histogram: cortexpb.HistogramToHistogramProto(0, negativeBucketCountInNBucketsH.Copy()), - expectedErr: newNativeHistogramNegativeBucketCountError(lbls, float64(negativeBucketCountInNBucketsH.NegativeBuckets[0])), - discardedSampleLabelValue: nativeHistogramNegativeBucketCount, + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("negative side: bucket number 1 has observation count of -1: histogram has a bucket whose observation count is negative")), + discardedSampleLabelValue: nativeHistogramInvalid, }, { name: "negative observations count in positive buckets for native histogram", histogram: cortexpb.HistogramToHistogramProto(0, negativeBucketCountInPBucketsH.Copy()), - expectedErr: newNativeHistogramNegativeBucketCountError(lbls, float64(negativeBucketCountInPBucketsH.PositiveBuckets[0])), - discardedSampleLabelValue: nativeHistogramNegativeBucketCount, + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("positive side: bucket number 1 has observation count of -1: histogram has a bucket whose observation count is negative")), + discardedSampleLabelValue: nativeHistogramInvalid, }, { name: "mismatch between observations count with count field when sum is NaN", histogram: cortexpb.HistogramToHistogramProto(0, countMisMatchSumIsNaN.Copy()), - expectedErr: newNativeHistogramMisMatchedCountError(lbls, 12, 11), - discardedSampleLabelValue: nativeHistogramMisMatchCount, + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("12 observations found in buckets, but the Count field is 11: histogram's observation count should be at least the number of observations found in the buckets")), + discardedSampleLabelValue: nativeHistogramInvalid, }, { name: "mismatch between observations count with count field", histogram: cortexpb.HistogramToHistogramProto(0, countMisMatch.Copy()), - expectedErr: newNativeHistogramMisMatchedCountError(lbls, 12, 11), - discardedSampleLabelValue: nativeHistogramMisMatchCount, + expectedErr: newNativeHistogramInvalidError(lbls, errors.New("12 observations found in buckets, but the Count field is 11: histogram's observation count should equal the number of observations found in the buckets (in absence of NaN)")), + discardedSampleLabelValue: nativeHistogramInvalid, }, } { t.Run(tc.name, func(t *testing.T) { @@ -608,7 +669,7 @@ func TestValidateNativeHistogram(t *testing.T) { limits.MaxNativeHistogramSampleSizeBytes = tc.maxNativeHistogramSampleSizeBytesLimit actualHistogram, actualErr := ValidateNativeHistogram(validateMetrics, limits, userID, lbls, tc.histogram) if tc.expectedErr != nil { - require.Equal(t, tc.expectedErr, actualErr) + require.Equal(t, tc.expectedErr.Error(), actualErr.Error()) require.Equal(t, float64(1), testutil.ToFloat64(validateMetrics.DiscardedSamples.WithLabelValues(tc.discardedSampleLabelValue, userID))) // Should never increment if error was returned require.Equal(t, float64(0), testutil.ToFloat64(validateMetrics.HistogramSamplesReducedResolution.WithLabelValues(userID))) From 23d61becea93ecc733856cf8747119e13616a8a5 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 3 Nov 2025 19:59:12 +0900 Subject: [PATCH 5/5] fix lint Signed-off-by: SungJin1212 --- pkg/util/validation/validate.go | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 8a58913ec9c..f0612afd079 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -2,7 +2,6 @@ package validation import ( "errors" - "fmt" "net/http" "strings" "time" @@ -460,30 +459,6 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u return cortexpb.HistogramToHistogramProto(histogramSample.TimestampMs, h), nil } -// copy from https://github.com/prometheus/prometheus/blob/v3.6.0/model/histogram/generic.go#L399-L420 -func checkHistogramBuckets[BC histogram.BucketCount, IBC histogram.InternalBucketCount](buckets []IBC, count *BC, deltas bool) (float64, error) { - if len(buckets) == 0 { - return 0, nil - } - - var last IBC - for i := range buckets { - var c IBC - if deltas { - c = last + buckets[i] - } else { - c = buckets[i] - } - if c < 0 { - return float64(c), fmt.Errorf("bucket number %d has observation count of %v", i+1, c) - } - last = c - *count += BC(c) - } - - return 0, nil -} - func DeletePerUserValidationMetrics(validateMetrics *ValidateMetrics, userID string, log log.Logger) { filter := map[string]string{"user": userID}