Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* [ENHANCEMENT] StoreGateway: Add new `cortex_bucket_store_chunk_pool_inuse_bytes` metric to track the usage in chunk pool. #6310
* [ENHANCEMENT] Distributor: Add new `cortex_distributor_inflight_client_requests` metric to track number of ingester client inflight requests. #6358
* [ENHANCEMENT] Distributor: Expose `cortex_label_size_bytes` native histogram metric. #6372
* [ENHANCEMENT] Distributor: Return HTTP 5XX instead of HTTP 4XX when instance limits are hit. #6358
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled #6271
Expand Down
11 changes: 3 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ var (
// Validation errors.
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size. The value must be greater than or equal to 0")

// Distributor instance limits errors.
errTooManyInflightPushRequests = errors.New("too many inflight push requests in distributor")
errMaxSamplesPushRateLimitReached = errors.New("distributor's samples push rate limit reached")
errTooManyInflightClientRequests = errors.New("too many inflight ingester client requests in distributor")
)

const (
Expand Down Expand Up @@ -668,19 +663,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))

if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) {
return nil, errTooManyInflightPushRequests
return nil, httpgrpc.Errorf(http.StatusServiceUnavailable, "too many inflight push requests in distributor")
}

if d.cfg.InstanceLimits.MaxIngestionRate > 0 {
if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate {
return nil, errMaxSamplesPushRateLimitReached
return nil, httpgrpc.Errorf(http.StatusServiceUnavailable, "distributor's samples push rate limit reached")
}
}

// only reject requests at this stage to allow distributor to finish sending the current batch request to all ingesters
// even if we've exceeded the MaxInflightClientRequests in the `doBatch`
if d.cfg.InstanceLimits.MaxInflightClientRequests > 0 && d.inflightClientRequests.Load() > int64(d.cfg.InstanceLimits.MaxInflightClientRequests) {
return nil, errTooManyInflightClientRequests
return nil, httpgrpc.Errorf(http.StatusServiceUnavailable, "too many inflight ingester client requests in distributor")
}

removeReplica := false
Expand Down
15 changes: 8 additions & 7 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
preInflight: 101,
inflightLimit: 101,
pushes: []testPush{
{samples: 100, expectedError: errTooManyInflightPushRequests},
{samples: 100, expectedError: httpgrpc.Errorf(http.StatusServiceUnavailable, "too many inflight push requests in distributor")},
},
},
"below inflight client limit": {
Expand All @@ -863,7 +863,8 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
preInflightClient: 103,
inflightClientLimit: 101,
pushes: []testPush{
{samples: 100, expectedError: errTooManyInflightClientRequests},
{samples: 100, expectedError: httpgrpc.Errorf(http.StatusServiceUnavailable,
"too many inflight ingester client requests in distributor")},
},
},
"below ingestion rate limit": {
Expand Down Expand Up @@ -892,7 +893,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
ingestionRateLimit: 1000,

pushes: []testPush{
{samples: 100, expectedError: errMaxSamplesPushRateLimitReached},
{samples: 100, expectedError: httpgrpc.Errorf(http.StatusServiceUnavailable, "distributor's samples push rate limit reached")},
{samples: 100, expectedError: nil},
},
},
Expand All @@ -902,10 +903,10 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
ingestionRateLimit: 1000,

pushes: []testPush{
{samples: 5000, expectedError: nil}, // after push, rate = 500 + 0.2*(5000-500) = 1400
{samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1400 + 0.2*(0 - 1400) = 1120
{samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1120 + 0.2*(0 - 1120) = 896
{samples: 5000, expectedError: nil}, // 896 is below 1000, so this push succeeds, new rate = 896 + 0.2*(5000-896) = 1716.8
{samples: 5000, expectedError: nil}, // after push, rate = 500 + 0.2*(5000-500) = 1400
{samples: 5000, expectedError: httpgrpc.Errorf(http.StatusServiceUnavailable, "distributor's samples push rate limit reached")}, // after push, rate = 1400 + 0.2*(0 - 1400) = 1120
{samples: 5000, expectedError: httpgrpc.Errorf(http.StatusServiceUnavailable, "distributor's samples push rate limit reached")}, // after push, rate = 1120 + 0.2*(0 - 1120) = 896
{samples: 5000, expectedError: nil}, // 896 is below 1000, so this push succeeds, new rate = 896 + 0.2*(5000-896) = 1716.8
},
},
}
Expand Down
Loading