Skip to content

Commit e711d0b

Browse files
authored
Bug fix: Return 429 response when Native Histogram samples are discarded by Native Histogram Ingestion Rate Limiter (#6994)
* fix native histograms ingestion rate bug Signed-off-by: Paurush Garg <[email protected]> * fix native histograms ingestion rate bug Signed-off-by: Paurush Garg <[email protected]> * fix native histograms ingestion rate bug Signed-off-by: Paurush Garg <[email protected]> * fix native histograms ingestion rate bug Signed-off-by: Paurush Garg <[email protected]> --------- Signed-off-by: Paurush Garg <[email protected]>
1 parent 135fa6c commit e711d0b

File tree

3 files changed

+81
-42
lines changed

3 files changed

+81
-42
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
* [ENHANCEMENT] Distributor: Add min/max schema validation for Native Histogram. #6766
5252
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
5353
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
54-
* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histogram. #6794
54+
* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histogram. #6794 and #6994
5555
* [ENHANCEMENT] Ingester: Add active series limit specifically for Native Histogram. #6796
5656
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
5757
* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805

pkg/distributor/distributor.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -778,16 +778,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
778778
d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars))
779779
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))
780780

781-
if !d.nativeHistogramIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) {
782-
level.Warn(d.log).Log("msg", "native histogram ingestion rate limit (%v) exceeded while adding %d native histogram samples", d.nativeHistogramIngestionRateLimiter.Limit(now, userID), validatedHistogramSamples)
783-
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, userID).Add(float64(validatedHistogramSamples))
784-
validatedHistogramSamples = 0
785-
} else {
786-
seriesKeys = append(seriesKeys, nhSeriesKeys...)
787-
validatedTimeseries = append(validatedTimeseries, nhValidatedTimeseries...)
788-
}
789-
790-
if len(seriesKeys) == 0 && len(metadataKeys) == 0 {
781+
if len(seriesKeys) == 0 && len(nhSeriesKeys) == 0 && len(metadataKeys) == 0 {
791782
// Ensure the request slice is reused if there's no series or metadata passing the validation.
792783
cortexpb.ReuseSlice(req.Timeseries)
793784

@@ -812,6 +803,17 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
812803
// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
813804
d.ingestionRate.Add(int64(totalN))
814805

806+
var nativeHistogramErr error
807+
808+
if !d.nativeHistogramIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) {
809+
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, userID).Add(float64(validatedHistogramSamples))
810+
nativeHistogramErr = httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (%v) exceeded while adding %d native histogram samples", d.nativeHistogramIngestionRateLimiter.Limit(now, userID), validatedHistogramSamples)
811+
validatedHistogramSamples = 0
812+
} else {
813+
seriesKeys = append(seriesKeys, nhSeriesKeys...)
814+
validatedTimeseries = append(validatedTimeseries, nhValidatedTimeseries...)
815+
}
816+
815817
subRing := d.ingestersRing
816818

817819
// Obtain a subring if required.
@@ -822,6 +824,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
822824
keys := append(seriesKeys, metadataKeys...)
823825
initialMetadataIndex := len(seriesKeys)
824826

827+
if len(keys) == 0 && nativeHistogramErr != nil {
828+
return nil, nativeHistogramErr
829+
}
830+
825831
err = d.doBatch(ctx, req, subRing, keys, initialMetadataIndex, validatedMetadata, validatedTimeseries, userID)
826832
if err != nil {
827833
return nil, err
@@ -837,6 +843,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
837843
resp.Exemplars = int64(validatedExemplars)
838844
}
839845

846+
if nativeHistogramErr != nil {
847+
return resp, nativeHistogramErr
848+
}
849+
840850
return resp, firstPartialErr
841851
}
842852

pkg/distributor/distributor_test.go

Lines changed: 60 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,7 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
689689
metadata int
690690
expectedError error
691691
expectedNHDiscardedSampleMetricValue int
692+
isPartialDrop bool
692693
}
693694

694695
ctx := user.InjectOrgID(context.Background(), "user")
@@ -705,32 +706,32 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
705706
"local strategy: native histograms limit should be set to each distributor": {
706707
distributors: 2,
707708
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
708-
ingestionRate: 20,
709-
ingestionBurstSize: 20,
709+
ingestionRate: 30,
710+
ingestionBurstSize: 30,
710711
nativeHistogramIngestionRate: 10,
711712
nativeHistogramIngestionBurstSize: 10,
712713
pushes: []testPush{
713714
{nhSamples: 4, expectedError: nil},
714-
{nhSamples: 6, expectedError: nil},
715-
{nhSamples: 6, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6},
716-
{nhSamples: 4, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 10},
717-
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 11},
718-
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 12},
715+
{metadata: 1, expectedError: nil},
716+
{nhSamples: 7, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (10) exceeded while adding 7 native histogram samples"), expectedNHDiscardedSampleMetricValue: 7},
717+
{nhSamples: 4, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 7},
718+
{nhSamples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (10) exceeded while adding 3 native histogram samples"), expectedNHDiscardedSampleMetricValue: 10},
719+
{metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 10},
719720
},
720721
},
721722
"global strategy: native histograms limit should be evenly shared across distributors": {
722723
distributors: 2,
723724
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
724-
ingestionRate: 20,
725-
ingestionBurstSize: 10,
725+
ingestionRate: 40,
726+
ingestionBurstSize: 20,
726727
nativeHistogramIngestionRate: 10,
727728
nativeHistogramIngestionBurstSize: 5,
728729
pushes: []testPush{
729730
{nhSamples: 2, expectedError: nil},
730731
{nhSamples: 1, expectedError: nil},
731-
{nhSamples: 3, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3},
732-
{nhSamples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3},
733-
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
732+
{nhSamples: 3, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 3 native histogram samples"), expectedNHDiscardedSampleMetricValue: 3, isPartialDrop: true},
733+
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3},
734+
{nhSamples: 2, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 2 native histogram samples"), expectedNHDiscardedSampleMetricValue: 5},
734735
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 5},
735736
},
736737
},
@@ -744,38 +745,61 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
744745
pushes: []testPush{
745746
{nhSamples: 10, expectedError: nil},
746747
{nhSamples: 5, expectedError: nil},
747-
{nhSamples: 6, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6},
748+
{nhSamples: 6, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 6 native histogram samples"), expectedNHDiscardedSampleMetricValue: 6, isPartialDrop: true},
748749
{nhSamples: 5, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6},
749-
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 7},
750-
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 8},
750+
{nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 1 native histogram samples"), expectedNHDiscardedSampleMetricValue: 7},
751751
},
752752
},
753-
"local strategy: If NH samples hit NH rate limit, other samples should succeed when under rate limit": {
753+
"global strategy: Batch contains only NH samples and NH rate limit is hit": {
754754
distributors: 2,
755-
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
755+
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
756756
ingestionRate: 20,
757757
ingestionBurstSize: 20,
758-
nativeHistogramIngestionRate: 5,
759-
nativeHistogramIngestionBurstSize: 5,
758+
nativeHistogramIngestionRate: 10,
759+
nativeHistogramIngestionBurstSize: 10,
760760
pushes: []testPush{
761-
{samples: 5, nhSamples: 4, expectedError: nil},
762-
{samples: 6, nhSamples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 2},
763-
{samples: 4, metadata: 1, nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 5 samples and 1 metadata"), expectedNHDiscardedSampleMetricValue: 2},
764-
{metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 2},
761+
{nhSamples: 2, expectedError: nil},
762+
{nhSamples: 3, expectedError: nil},
763+
{nhSamples: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 6 native histogram samples"), expectedNHDiscardedSampleMetricValue: 6},
765764
},
766765
},
767-
"global strategy: If NH samples hit NH rate limit, other samples should succeed when under rate limit": {
766+
"global strategy: Batch contains only NH samples and metadata and NH rate limit is hit": {
768767
distributors: 2,
769768
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
770769
ingestionRate: 20,
771-
ingestionBurstSize: 10,
770+
ingestionBurstSize: 20,
772771
nativeHistogramIngestionRate: 10,
773-
nativeHistogramIngestionBurstSize: 5,
772+
nativeHistogramIngestionBurstSize: 10,
773+
pushes: []testPush{
774+
{nhSamples: 2, expectedError: nil},
775+
{nhSamples: 3, metadata: 2, expectedError: nil},
776+
{nhSamples: 6, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 6 native histogram samples"), expectedNHDiscardedSampleMetricValue: 6, isPartialDrop: true}},
777+
},
778+
"global strategy: Batch contains regular and NH samples and NH rate limit is hit": {
779+
distributors: 2,
780+
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
781+
ingestionRate: 30,
782+
ingestionBurstSize: 30,
783+
nativeHistogramIngestionRate: 10,
784+
nativeHistogramIngestionBurstSize: 10,
774785
pushes: []testPush{
775-
{samples: 3, nhSamples: 2, expectedError: nil},
776-
{samples: 3, nhSamples: 4, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
777-
{samples: 1, metadata: 1, nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 2 samples and 1 metadata"), expectedNHDiscardedSampleMetricValue: 4},
778-
{metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
786+
{samples: 3, nhSamples: 2, metadata: 1, expectedError: nil},
787+
{samples: 1, nhSamples: 9, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 9 native histogram samples"), expectedNHDiscardedSampleMetricValue: 9, isPartialDrop: true},
788+
{nhSamples: 9, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 9 native histogram samples"), expectedNHDiscardedSampleMetricValue: 18},
789+
{samples: 3, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 18},
790+
},
791+
},
792+
"global strategy: Batch contains regular and NH samples and normal ingestion rate limit is hit": {
793+
distributors: 2,
794+
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
795+
ingestionRate: 20,
796+
ingestionBurstSize: 20,
797+
nativeHistogramIngestionRate: 10,
798+
nativeHistogramIngestionBurstSize: 10,
799+
pushes: []testPush{
800+
{samples: 4, nhSamples: 4, metadata: 4, expectedError: nil},
801+
{samples: 4, nhSamples: 4, metadata: 4, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 8 samples and 4 metadata")},
802+
{samples: 3, nhSamples: 3, metadata: 2, expectedError: nil},
779803
},
780804
},
781805
}
@@ -812,8 +836,13 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
812836
assert.Equal(t, emptyResponse, response)
813837
assert.Nil(t, err)
814838
} else {
815-
assert.Nil(t, response)
816839
assert.Equal(t, push.expectedError, err)
840+
// Check if an empty response is expected
841+
if push.isPartialDrop {
842+
assert.Equal(t, emptyResponse, response)
843+
} else {
844+
assert.Nil(t, response)
845+
}
817846
}
818847
assert.Equal(t, float64(push.expectedNHDiscardedSampleMetricValue), testutil.ToFloat64(distributors[0].validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, "user")))
819848
}

0 commit comments

Comments
 (0)