diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d359c31e8c..c81f753ce09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,7 +51,7 @@ * [ENHANCEMENT] Distributor: Add min/max schema validation for Native Histogram. #6766 * [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769 * [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778 -* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histogram. #6794 +* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histogram. #6794 and #6994 * [ENHANCEMENT] Ingester: Add active series limit specifically for Native Histogram. #6796 * [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780 * [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 80f4ec9025d..716a6c06a02 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -778,16 +778,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars)) d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata))) - if !d.nativeHistogramIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) { - level.Warn(d.log).Log("msg", "native histogram ingestion rate limit (%v) exceeded while adding %d native histogram samples", d.nativeHistogramIngestionRateLimiter.Limit(now, userID), validatedHistogramSamples) - d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, userID).Add(float64(validatedHistogramSamples)) - validatedHistogramSamples = 0 - } else { - seriesKeys = append(seriesKeys, nhSeriesKeys...) - validatedTimeseries = append(validatedTimeseries, nhValidatedTimeseries...) - } - - if len(seriesKeys) == 0 && len(metadataKeys) == 0 { + if len(seriesKeys) == 0 && len(nhSeriesKeys) == 0 && len(metadataKeys) == 0 { // Ensure the request slice is reused if there's no series or metadata passing the validation. cortexpb.ReuseSlice(req.Timeseries) @@ -812,6 +803,17 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate. d.ingestionRate.Add(int64(totalN)) + var nativeHistogramErr error + + if !d.nativeHistogramIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) { + d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, userID).Add(float64(validatedHistogramSamples)) + nativeHistogramErr = httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (%v) exceeded while adding %d native histogram samples", d.nativeHistogramIngestionRateLimiter.Limit(now, userID), validatedHistogramSamples) + validatedHistogramSamples = 0 + } else { + seriesKeys = append(seriesKeys, nhSeriesKeys...) + validatedTimeseries = append(validatedTimeseries, nhValidatedTimeseries...) + } + subRing := d.ingestersRing // Obtain a subring if required. @@ -822,6 +824,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co keys := append(seriesKeys, metadataKeys...) initialMetadataIndex := len(seriesKeys) + if len(keys) == 0 && nativeHistogramErr != nil { + return nil, nativeHistogramErr + } + err = d.doBatch(ctx, req, subRing, keys, initialMetadataIndex, validatedMetadata, validatedTimeseries, userID) if err != nil { return nil, err @@ -837,6 +843,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co resp.Exemplars = int64(validatedExemplars) } + if nativeHistogramErr != nil { + return resp, nativeHistogramErr + } + return resp, firstPartialErr } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index c9f931199a2..d24faa3bd43 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -689,6 +689,7 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { metadata int expectedError error expectedNHDiscardedSampleMetricValue int + isPartialDrop bool } ctx := user.InjectOrgID(context.Background(), "user") @@ -705,32 +706,32 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { "local strategy: native histograms limit should be set to each distributor": { distributors: 2, ingestionRateStrategy: validation.LocalIngestionRateStrategy, - ingestionRate: 20, - ingestionBurstSize: 20, + ingestionRate: 30, + ingestionBurstSize: 30, nativeHistogramIngestionRate: 10, nativeHistogramIngestionBurstSize: 10, pushes: []testPush{ {nhSamples: 4, expectedError: nil}, - {nhSamples: 6, expectedError: nil}, - {nhSamples: 6, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6}, - {nhSamples: 4, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 10}, - {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 11}, - {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 12}, + {metadata: 1, expectedError: nil}, + {nhSamples: 7, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (10) exceeded while adding 7 native histogram samples"), expectedNHDiscardedSampleMetricValue: 7}, + {nhSamples: 4, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 7}, + {nhSamples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (10) exceeded while adding 3 native histogram samples"), expectedNHDiscardedSampleMetricValue: 10}, + {metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 10}, }, }, "global strategy: native histograms limit should be evenly shared across distributors": { distributors: 2, ingestionRateStrategy: validation.GlobalIngestionRateStrategy, - ingestionRate: 20, - ingestionBurstSize: 10, + ingestionRate: 40, + ingestionBurstSize: 20, nativeHistogramIngestionRate: 10, nativeHistogramIngestionBurstSize: 5, pushes: []testPush{ {nhSamples: 2, expectedError: nil}, {nhSamples: 1, expectedError: nil}, - {nhSamples: 3, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3}, - {nhSamples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3}, - {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, + {nhSamples: 3, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 3 native histogram samples"), expectedNHDiscardedSampleMetricValue: 3, isPartialDrop: true}, + {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3}, + {nhSamples: 2, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 2 native histogram samples"), expectedNHDiscardedSampleMetricValue: 5}, {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 5}, }, }, @@ -744,38 +745,61 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { pushes: []testPush{ {nhSamples: 10, expectedError: nil}, {nhSamples: 5, expectedError: nil}, - {nhSamples: 6, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6}, + {nhSamples: 6, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 6 native histogram samples"), expectedNHDiscardedSampleMetricValue: 6, isPartialDrop: true}, {nhSamples: 5, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6}, - {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 7}, - {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 8}, + {nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 1 native histogram samples"), expectedNHDiscardedSampleMetricValue: 7}, }, }, - "local strategy: If NH samples hit NH rate limit, other samples should succeed when under rate limit": { + "global strategy: Batch contains only NH samples and NH rate limit is hit": { distributors: 2, - ingestionRateStrategy: validation.LocalIngestionRateStrategy, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, ingestionRate: 20, ingestionBurstSize: 20, - nativeHistogramIngestionRate: 5, - nativeHistogramIngestionBurstSize: 5, + nativeHistogramIngestionRate: 10, + nativeHistogramIngestionBurstSize: 10, pushes: []testPush{ - {samples: 5, nhSamples: 4, expectedError: nil}, - {samples: 6, nhSamples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 2}, - {samples: 4, metadata: 1, nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 5 samples and 1 metadata"), expectedNHDiscardedSampleMetricValue: 2}, - {metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 2}, + {nhSamples: 2, expectedError: nil}, + {nhSamples: 3, expectedError: nil}, + {nhSamples: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 6 native histogram samples"), expectedNHDiscardedSampleMetricValue: 6}, }, }, - "global strategy: If NH samples hit NH rate limit, other samples should succeed when under rate limit": { + "global strategy: Batch contains only NH samples and metadata and NH rate limit is hit": { distributors: 2, ingestionRateStrategy: validation.GlobalIngestionRateStrategy, ingestionRate: 20, - ingestionBurstSize: 10, + ingestionBurstSize: 20, nativeHistogramIngestionRate: 10, - nativeHistogramIngestionBurstSize: 5, + nativeHistogramIngestionBurstSize: 10, + pushes: []testPush{ + {nhSamples: 2, expectedError: nil}, + {nhSamples: 3, metadata: 2, expectedError: nil}, + {nhSamples: 6, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 6 native histogram samples"), expectedNHDiscardedSampleMetricValue: 6, isPartialDrop: true}}, + }, + "global strategy: Batch contains regular and NH samples and NH rate limit is hit": { + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 30, + ingestionBurstSize: 30, + nativeHistogramIngestionRate: 10, + nativeHistogramIngestionBurstSize: 10, pushes: []testPush{ - {samples: 3, nhSamples: 2, expectedError: nil}, - {samples: 3, nhSamples: 4, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, - {samples: 1, metadata: 1, nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 2 samples and 1 metadata"), expectedNHDiscardedSampleMetricValue: 4}, - {metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, + {samples: 3, nhSamples: 2, metadata: 1, expectedError: nil}, + {samples: 1, nhSamples: 9, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 9 native histogram samples"), expectedNHDiscardedSampleMetricValue: 9, isPartialDrop: true}, + {nhSamples: 9, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 9 native histogram samples"), expectedNHDiscardedSampleMetricValue: 18}, + {samples: 3, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 18}, + }, + }, + "global strategy: Batch contains regular and NH samples and normal ingestion rate limit is hit": { + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 20, + ingestionBurstSize: 20, + nativeHistogramIngestionRate: 10, + nativeHistogramIngestionBurstSize: 10, + pushes: []testPush{ + {samples: 4, nhSamples: 4, metadata: 4, expectedError: nil}, + {samples: 4, nhSamples: 4, metadata: 4, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 8 samples and 4 metadata")}, + {samples: 3, nhSamples: 3, metadata: 2, expectedError: nil}, }, }, } @@ -812,8 +836,13 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { assert.Equal(t, emptyResponse, response) assert.Nil(t, err) } else { - assert.Nil(t, response) assert.Equal(t, push.expectedError, err) + // Check if an empty response is expected + if push.isPartialDrop { + assert.Equal(t, emptyResponse, response) + } else { + assert.Nil(t, response) + } } assert.Equal(t, float64(push.expectedNHDiscardedSampleMetricValue), testutil.ToFloat64(distributors[0].validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, "user"))) }