Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
* [ENHANCEMENT] Distributor: Add native histograms max sample size bytes limit validation. #6834
* [ENHANCEMENT] Querier: Support caching parquet labels file in parquet queryable. #6835
* [ENHANCEMENT] Querier: Support query limits in parquet queryable. #6870
* [ENHANCEMENT] Ingester: Add new metric `cortex_ingester_push_errors_total` to track reasons for ingester request failures. #6901
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
11 changes: 6 additions & 5 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,11 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
span, ctx := opentracing.StartSpanFromContext(ctx, "Ingester.Push")
defer span.Finish()

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

// We will report *this* request in the error too.
inflight := i.inflightPushRequests.Inc()
i.maxInflightPushRequests.Track(inflight)
Expand All @@ -1175,6 +1180,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
gl := i.getInstanceLimits()
if gl != nil && gl.MaxInflightPushRequests > 0 {
if inflight > gl.MaxInflightPushRequests {
i.metrics.pushErrorsTotal.WithLabelValues(userID, pushErrTooManyInflightRequests).Inc()
return nil, errTooManyInflightPushRequests
}
}
Expand All @@ -1186,11 +1192,6 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
defer req.Free()
defer cortexpb.ReuseSlice(req.Timeseries)

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

il := i.getInstanceLimits()
if il != nil && il.MaxIngestionRate > 0 {
if rate := i.ingestionRate.Rate(); rate >= il.MaxIngestionRate {
Expand Down
8 changes: 7 additions & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6515,7 +6515,8 @@ func TestIngester_inflightPushRequests(t *testing.T) {
cfg.InstanceLimitsFn = func() *InstanceLimits { return &limits }
cfg.LifecyclerConfig.JoinAfter = 0

i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
reg := prometheus.NewRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -6553,6 +6554,11 @@ func TestIngester_inflightPushRequests(t *testing.T) {

_, err := i.Push(ctx, req)
require.Equal(t, errTooManyInflightPushRequests, err)
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_ingester_push_errors_total The total number of push errors per user.
# TYPE cortex_ingester_push_errors_total counter
cortex_ingester_push_errors_total{reason="errTooManyInflightRequests",user="test"} 1
`), "cortex_ingester_push_errors_total"))
return nil
})

Expand Down
2 changes: 2 additions & 0 deletions pkg/ingester/instance_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ var (
errMaxSeriesLimitReached = errors.New("cannot add series: ingesters's max series limit reached")
errTooManyInflightPushRequests = errors.New("cannot push: too many inflight push requests in ingester")
errTooManyInflightQueryRequests = errors.New("cannot push: too many inflight query requests in ingester")

pushErrTooManyInflightRequests = "errTooManyInflightRequests"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I think we can omit 'err' from the reason label, since the metric name itself contains error.

Suggested change
pushErrTooManyInflightRequests = "errTooManyInflightRequests"
pushErrTooManyInflightRequests = "tooManyInflightRequests"

)

// InstanceLimits describes limits used by ingester. Reaching any of these will result in error response to the call.
Expand Down
6 changes: 6 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ingesterMetrics struct {
memMetadataCreatedTotal *prometheus.CounterVec
memSeriesRemovedTotal *prometheus.CounterVec
memMetadataRemovedTotal *prometheus.CounterVec
pushErrorsTotal *prometheus.CounterVec

activeSeriesPerUser *prometheus.GaugeVec
activeNHSeriesPerUser *prometheus.GaugeVec
Expand Down Expand Up @@ -165,6 +166,10 @@ func newIngesterMetrics(r prometheus.Registerer,
Name: "cortex_ingester_memory_metadata_removed_total",
Help: "The total number of metadata that were removed per user.",
}, []string{"user"}),
pushErrorsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_push_errors_total",
Help: "The total number of push errors per user.",
}, []string{"user", "reason"}),

maxUsersGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: instanceLimits,
Expand Down Expand Up @@ -295,6 +300,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) {
m.activeNHSeriesPerUser.DeleteLabelValues(userID)
m.usagePerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID})
m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID})
m.pushErrorsTotal.DeletePartialMatch(prometheus.Labels{"user": userID})

if m.memSeriesCreatedTotal != nil {
m.memSeriesCreatedTotal.DeleteLabelValues(userID)
Expand Down
Loading