Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
* [BUGFIX] Compactor: Delete the prefix `blocks_meta` from the metadata fetcher metrics. #6832
* [BUGFIX] Store Gateway: Avoid race condition by deduplicating entries in bucket stores user scan. #6863
* [BUGFIX] Runtime-config: Change to check tenant limit validation when loading runtime config only for `all`, `distributor`, `querier`, and `ruler` targets. #6880
* [BUGFIX] Distributor: Return 429 response when Native Histogram samples are dropped #6994

## 1.19.0 2025-02-27

Expand Down
30 changes: 20 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,16 +778,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))

if !d.nativeHistogramIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) {
level.Warn(d.log).Log("msg", "native histogram ingestion rate limit (%v) exceeded while adding %d native histogram samples", d.nativeHistogramIngestionRateLimiter.Limit(now, userID), validatedHistogramSamples)
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, userID).Add(float64(validatedHistogramSamples))
validatedHistogramSamples = 0
} else {
seriesKeys = append(seriesKeys, nhSeriesKeys...)
validatedTimeseries = append(validatedTimeseries, nhValidatedTimeseries...)
}

if len(seriesKeys) == 0 && len(metadataKeys) == 0 {
if len(seriesKeys) == 0 && len(nhSeriesKeys) == 0 && len(metadataKeys) == 0 {
// Ensure the request slice is reused if there's no series or metadata passing the validation.
cortexpb.ReuseSlice(req.Timeseries)

Expand All @@ -812,6 +803,17 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
d.ingestionRate.Add(int64(totalN))

var nativeHistogramErr error

if !d.nativeHistogramIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, userID).Add(float64(validatedHistogramSamples))
nativeHistogramErr = httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (%v) exceeded while adding %d native histogram samples", d.nativeHistogramIngestionRateLimiter.Limit(now, userID), validatedHistogramSamples)
validatedHistogramSamples = 0
} else {
seriesKeys = append(seriesKeys, nhSeriesKeys...)
validatedTimeseries = append(validatedTimeseries, nhValidatedTimeseries...)
}

subRing := d.ingestersRing

// Obtain a subring if required.
Expand All @@ -822,6 +824,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
keys := append(seriesKeys, metadataKeys...)
initialMetadataIndex := len(seriesKeys)

if len(keys) == 0 && nativeHistogramErr != nil {
return nil, nativeHistogramErr
}

err = d.doBatch(ctx, req, subRing, keys, initialMetadataIndex, validatedMetadata, validatedTimeseries, userID)
if err != nil {
return nil, err
Expand All @@ -837,6 +843,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
resp.Exemplars = int64(validatedExemplars)
}

if nativeHistogramErr != nil {
return resp, nativeHistogramErr
}

return resp, firstPartialErr
}

Expand Down
91 changes: 60 additions & 31 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
metadata int
expectedError error
expectedNHDiscardedSampleMetricValue int
isPartialDrop bool
}

ctx := user.InjectOrgID(context.Background(), "user")
Expand All @@ -705,32 +706,32 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
"local strategy: native histograms limit should be set to each distributor": {
distributors: 2,
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
ingestionRate: 20,
ingestionBurstSize: 20,
ingestionRate: 30,
ingestionBurstSize: 30,
nativeHistogramIngestionRate: 10,
nativeHistogramIngestionBurstSize: 10,
pushes: []testPush{
{nhSamples: 4, expectedError: nil},
{nhSamples: 6, expectedError: nil},
{nhSamples: 6, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6},
{nhSamples: 4, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 10},
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 11},
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 12},
{metadata: 1, expectedError: nil},
{nhSamples: 7, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (10) exceeded while adding 7 native histogram samples"), expectedNHDiscardedSampleMetricValue: 7},
{nhSamples: 4, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 7},
{nhSamples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (10) exceeded while adding 3 native histogram samples"), expectedNHDiscardedSampleMetricValue: 10},
{metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 10},
},
},
"global strategy: native histograms limit should be evenly shared across distributors": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRate: 20,
ingestionBurstSize: 10,
ingestionRate: 40,
ingestionBurstSize: 20,
nativeHistogramIngestionRate: 10,
nativeHistogramIngestionBurstSize: 5,
pushes: []testPush{
{nhSamples: 2, expectedError: nil},
{nhSamples: 1, expectedError: nil},
{nhSamples: 3, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3},
{nhSamples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3},
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
{nhSamples: 3, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 3 native histogram samples"), expectedNHDiscardedSampleMetricValue: 3, isPartialDrop: true},
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3},
{nhSamples: 2, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 2 native histogram samples"), expectedNHDiscardedSampleMetricValue: 5},
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 5},
},
},
Expand All @@ -744,38 +745,61 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
pushes: []testPush{
{nhSamples: 10, expectedError: nil},
{nhSamples: 5, expectedError: nil},
{nhSamples: 6, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6},
{nhSamples: 6, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 6 native histogram samples"), expectedNHDiscardedSampleMetricValue: 6, isPartialDrop: true},
{nhSamples: 5, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6},
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 7},
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 8},
{nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 1 native histogram samples"), expectedNHDiscardedSampleMetricValue: 7},
},
},
"local strategy: If NH samples hit NH rate limit, other samples should succeed when under rate limit": {
"global strategy: Batch contains only NH samples and NH rate limit is hit": {
distributors: 2,
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRate: 20,
ingestionBurstSize: 20,
nativeHistogramIngestionRate: 5,
nativeHistogramIngestionBurstSize: 5,
nativeHistogramIngestionRate: 10,
nativeHistogramIngestionBurstSize: 10,
pushes: []testPush{
{samples: 5, nhSamples: 4, expectedError: nil},
{samples: 6, nhSamples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 2},
{samples: 4, metadata: 1, nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 5 samples and 1 metadata"), expectedNHDiscardedSampleMetricValue: 2},
{metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 2},
{nhSamples: 2, expectedError: nil},
{nhSamples: 3, expectedError: nil},
{nhSamples: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 6 native histogram samples"), expectedNHDiscardedSampleMetricValue: 6},
},
},
"global strategy: If NH samples hit NH rate limit, other samples should succeed when under rate limit": {
"global strategy: Batch contains only NH samples and metadata and NH rate limit is hit": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRate: 20,
ingestionBurstSize: 10,
ingestionBurstSize: 20,
nativeHistogramIngestionRate: 10,
nativeHistogramIngestionBurstSize: 5,
nativeHistogramIngestionBurstSize: 10,
pushes: []testPush{
{nhSamples: 2, expectedError: nil},
{nhSamples: 3, metadata: 2, expectedError: nil},
{nhSamples: 6, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 6 native histogram samples"), expectedNHDiscardedSampleMetricValue: 6, isPartialDrop: true}},
},
"global strategy: Batch contains regular and NH samples and NH rate limit is hit": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRate: 30,
ingestionBurstSize: 30,
nativeHistogramIngestionRate: 10,
nativeHistogramIngestionBurstSize: 10,
pushes: []testPush{
{samples: 3, nhSamples: 2, expectedError: nil},
{samples: 3, nhSamples: 4, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
{samples: 1, metadata: 1, nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 2 samples and 1 metadata"), expectedNHDiscardedSampleMetricValue: 4},
{metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
{samples: 3, nhSamples: 2, metadata: 1, expectedError: nil},
{samples: 1, nhSamples: 9, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 9 native histogram samples"), expectedNHDiscardedSampleMetricValue: 9, isPartialDrop: true},
{nhSamples: 9, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (5) exceeded while adding 9 native histogram samples"), expectedNHDiscardedSampleMetricValue: 18},
{samples: 3, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 18},
},
},
"global strategy: Batch contains regular and NH samples and normal ingestion rate limit is hit": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRate: 20,
ingestionBurstSize: 20,
nativeHistogramIngestionRate: 10,
nativeHistogramIngestionBurstSize: 10,
pushes: []testPush{
{samples: 4, nhSamples: 4, metadata: 4, expectedError: nil},
{samples: 4, nhSamples: 4, metadata: 4, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 8 samples and 4 metadata")},
{samples: 3, nhSamples: 3, metadata: 2, expectedError: nil},
},
},
}
Expand Down Expand Up @@ -812,8 +836,13 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
assert.Equal(t, emptyResponse, response)
assert.Nil(t, err)
} else {
assert.Nil(t, response)
assert.Equal(t, push.expectedError, err)
// Check if an empty response is expected
if push.isPartialDrop {
assert.Equal(t, emptyResponse, response)
} else {
assert.Nil(t, response)
}
}
assert.Equal(t, float64(push.expectedNHDiscardedSampleMetricValue), testutil.ToFloat64(distributors[0].validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, "user")))
}
Expand Down
Loading