Skip to content

Commit 35732c3

Browse files
authored
Changing the per labelset config to be more generic (#5993)
* Genetic per labelset limit Signed-off-by: alanprot <[email protected]> * Error if duplicate labelset Signed-off-by: alanprot <[email protected]> * Test limiter Signed-off-by: alanprot <[email protected]> * Changelog + doc Signed-off-by: alanprot <[email protected]> * lint Signed-off-by: alanprot <[email protected]> * avoid calling labelset.String() multiples times Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent 0f07151 commit 35732c3

File tree

10 files changed

+311
-114
lines changed

10 files changed

+311
-114
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* [ENHANCEMENT] Distributor/Querier: Clean stale per-ingester metrics after ingester restarts. #5930
1212
* [ENHANCEMENT] Distributor/Ring: Allow disabling detailed ring metrics by ring member. #5931
1313
* [ENHANCEMENT] KV: Etcd Added etcd.ping-without-stream-allowed parameter to disable/enable PermitWithoutStream #5933
14-
* [ENHANCEMENT] Ingester: Add a new `max_series_per_label_set` limit. This limit functions similarly to `max_series_per_metric`, but allowing users to define the maximum number of series per LabelSet. #5950
14+
* [ENHANCEMENT] Ingester: Add a new `limits_per_label_set` limit. This limit functions similarly to `max_series_per_metric`, but allowing users to define the maximum number of series per LabelSet. #5950 #5993
1515
* [ENHANCEMENT] Store Gateway: Log gRPC requests together with headers configured in `http_request_headers_to_log`. #5958
1616
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
1717
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952

docs/configuration/config-file-reference.md

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3172,9 +3172,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
31723172
# CLI flag: -ingester.max-global-series-per-metric
31733173
[max_global_series_per_metric: <int> | default = 0]
31743174
3175-
# [Experimental] The maximum number of active series per LabelSet, across the
3176-
# cluster before replication. Empty list to disable.
3177-
[max_series_per_label_set: <list of MaxSeriesPerLabelSet> | default = []]
3175+
# [Experimental] Enable limits per LabelSet. Supported limits per labelSet:
3176+
# [max_series]
3177+
[limits_per_label_set: <list of LimitsPerLabelSet> | default = []]
31783178
31793179
# The maximum number of active metrics with metadata per user, per ingester. 0
31803180
# to disable.
@@ -5314,11 +5314,14 @@ otel:
53145314
[tls_insecure_skip_verify: <boolean> | default = false]
53155315
```
53165316
5317-
### `MaxSeriesPerLabelSet`
5317+
### `LimitsPerLabelSet`
53185318

53195319
```yaml
5320-
# The maximum number of active series per LabelSet before replication.
5321-
[limit: <int> | default = ]
5320+
limits:
5321+
# The maximum number of active series per LabelSet, across the cluster before
5322+
# replication. Setting the value 0 will enable the monitoring (metrics) but
5323+
# would not enforce any limits.
5324+
[max_series: <int> | default = ]
53225325
53235326
# LabelSet which the limit should be applied.
53245327
[label_set: <map of string (labelName) to string (labelValue)> | default = []]

pkg/ingester/ingester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) {
946946

947947
userDB.activeSeries.Purge(purgeTime)
948948
i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.Active()))
949-
if err := userDB.labelSetCounter.UpdateMetric(ctx, userDB, i.metrics.activeSeriesPerLabelSet); err != nil {
949+
if err := userDB.labelSetCounter.UpdateMetric(ctx, userDB, i.metrics); err != nil {
950950
level.Warn(i.logger).Log("msg", "failed to update per labelSet metrics", "user", userID, "err", err)
951951
}
952952
}

pkg/ingester/ingester_test.go

Lines changed: 112 additions & 62 deletions
Large diffs are not rendered by default.

pkg/ingester/limiter.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,20 +107,20 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int
107107

108108
// AssertMaxSeriesPerLabelSet limit has not been reached compared to the current
109109
// number of metrics with metadata in input and returns an error if so.
110-
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.MaxSeriesPerLabelSet) (int, error)) error {
111-
m := l.maxSeriesPerLabelSet(userID, metric)
110+
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.LimitsPerLabelSet) (int, error)) error {
111+
m := l.limitsPerLabelSets(userID, metric)
112112
for _, limit := range m {
113-
maxFunc := func(string) int {
114-
return limit.Limit
113+
maxSeriesFunc := func(string) int {
114+
return limit.Limits.MaxSeries
115115
}
116-
local := l.maxByLocalAndGlobal(userID, maxFunc, maxFunc)
116+
local := l.maxByLocalAndGlobal(userID, maxSeriesFunc, maxSeriesFunc)
117117
if u, err := f(limit); err != nil {
118118
return err
119119
} else if u >= local {
120120
return errMaxSeriesPerLabelSetLimitExceeded{
121121
id: limit.Id,
122122
localLimit: local,
123-
globalLimit: limit.Limit,
123+
globalLimit: limit.Limits.MaxSeries,
124124
}
125125
}
126126
}
@@ -189,15 +189,15 @@ func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLim
189189
minNonZero(err.globalLimit, err.localLimit), err.id, err.localLimit, err.globalLimit)
190190
}
191191

192-
func (l *Limiter) maxSeriesPerLabelSet(userID string, metric labels.Labels) []validation.MaxSeriesPerLabelSet {
193-
m := l.limits.MaxSeriesPerLabelSet(userID)
192+
func (l *Limiter) limitsPerLabelSets(userID string, metric labels.Labels) []validation.LimitsPerLabelSet {
193+
m := l.limits.LimitsPerLabelSet(userID)
194194

195195
// returning early to not have any overhead
196196
if len(m) == 0 {
197197
return nil
198198
}
199199

200-
r := make([]validation.MaxSeriesPerLabelSet, 0, len(m))
200+
r := make([]validation.LimitsPerLabelSet, 0, len(m))
201201
outer:
202202
for _, lbls := range m {
203203
for _, lbl := range lbls.LabelSet {

pkg/ingester/limiter_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package ingester
22

33
import (
44
"errors"
5+
56
"math"
67
"testing"
78

9+
"github.com/prometheus/prometheus/model/labels"
810
"github.com/stretchr/testify/assert"
911
"github.com/stretchr/testify/mock"
1012
"github.com/stretchr/testify/require"
@@ -423,6 +425,90 @@ func TestLimiter_AssertMaxSeriesPerUser(t *testing.T) {
423425
}
424426
}
425427

428+
func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) {
429+
430+
tests := map[string]struct {
431+
limits validation.Limits
432+
expected error
433+
ringReplicationFactor int
434+
ringIngesterCount int
435+
shardByAllLabels bool
436+
series int
437+
}{
438+
"both local and global limit are disabled": {
439+
ringReplicationFactor: 3,
440+
ringIngesterCount: 10,
441+
series: 200,
442+
shardByAllLabels: true,
443+
limits: validation.Limits{
444+
LimitsPerLabelSet: []validation.LimitsPerLabelSet{
445+
{
446+
LabelSet: labels.FromMap(map[string]string{"foo": "bar"}),
447+
Limits: validation.LimitsPerLabelSetEntry{
448+
MaxSeries: 0,
449+
},
450+
},
451+
},
452+
},
453+
},
454+
"current number of series is above the limit": {
455+
ringReplicationFactor: 3,
456+
ringIngesterCount: 10,
457+
series: 200,
458+
shardByAllLabels: true,
459+
expected: errMaxSeriesPerLabelSetLimitExceeded{globalLimit: 10, localLimit: 3},
460+
limits: validation.Limits{
461+
LimitsPerLabelSet: []validation.LimitsPerLabelSet{
462+
{
463+
LabelSet: labels.FromMap(map[string]string{"foo": "bar"}),
464+
Limits: validation.LimitsPerLabelSetEntry{
465+
MaxSeries: 10,
466+
},
467+
},
468+
},
469+
},
470+
},
471+
"current number of series is below the limit and shard by all labels": {
472+
ringReplicationFactor: 3,
473+
ringIngesterCount: 10,
474+
series: 2,
475+
shardByAllLabels: true,
476+
limits: validation.Limits{
477+
LimitsPerLabelSet: []validation.LimitsPerLabelSet{
478+
{
479+
LabelSet: labels.FromMap(map[string]string{"foo": "bar"}),
480+
Limits: validation.LimitsPerLabelSetEntry{
481+
MaxSeries: 10,
482+
},
483+
},
484+
},
485+
},
486+
},
487+
}
488+
489+
for testName, testData := range tests {
490+
testData := testData
491+
492+
t.Run(testName, func(t *testing.T) {
493+
// Mock the ring
494+
ring := &ringCountMock{}
495+
ring.On("HealthyInstancesCount").Return(testData.ringIngesterCount)
496+
ring.On("ZonesCount").Return(1)
497+
498+
// Mock limits
499+
limits, err := validation.NewOverrides(testData.limits, nil)
500+
require.NoError(t, err)
501+
502+
limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
503+
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(set validation.LimitsPerLabelSet) (int, error) {
504+
return testData.series, nil
505+
})
506+
507+
assert.Equal(t, actual, testData.expected)
508+
})
509+
}
510+
}
511+
426512
func TestLimiter_AssertMaxMetricsWithMetadataPerUser(t *testing.T) {
427513
tests := map[string]struct {
428514
maxLocalMetadataPerUser int

pkg/ingester/metrics.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ type ingesterMetrics struct {
3737
memSeriesRemovedTotal *prometheus.CounterVec
3838
memMetadataRemovedTotal *prometheus.CounterVec
3939

40-
activeSeriesPerUser *prometheus.GaugeVec
41-
activeSeriesPerLabelSet *prometheus.GaugeVec
40+
activeSeriesPerUser *prometheus.GaugeVec
41+
limitsPerLabelSet *prometheus.GaugeVec
42+
usagePerLabelSet *prometheus.GaugeVec
4243

4344
// Global limit metrics
4445
maxUsersGauge prometheus.GaugeFunc
@@ -212,10 +213,15 @@ func newIngesterMetrics(r prometheus.Registerer,
212213
return 0
213214
}),
214215

215-
activeSeriesPerLabelSet: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
216-
Name: "cortex_ingester_active_series_per_labelset",
217-
Help: "Number of currently active series per user and labelset.",
218-
}, []string{"user", "labelset"}),
216+
limitsPerLabelSet: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
217+
Name: "cortex_ingester_limits_per_labelset",
218+
Help: "Limits per user and labelset.",
219+
}, []string{"user", "limit", "labelset"}),
220+
221+
usagePerLabelSet: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
222+
Name: "cortex_ingester_usage_per_labelset",
223+
Help: "Current usage per user and labelset.",
224+
}, []string{"user", "limit", "labelset"}),
219225

220226
// Not registered automatically, but only if activeSeriesEnabled is true.
221227
activeSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{

pkg/ingester/user_state.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"sync"
66

7-
"github.com/prometheus/client_golang/prometheus"
87
"github.com/prometheus/common/model"
98
"github.com/prometheus/prometheus/model/labels"
109
"github.com/prometheus/prometheus/tsdb/index"
@@ -115,7 +114,7 @@ func newLabelSetCounter(limiter *Limiter) *labelSetCounter {
115114
}
116115

117116
func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTSDB, metric labels.Labels) error {
118-
return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(set validation.MaxSeriesPerLabelSet) (int, error) {
117+
return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(set validation.LimitsPerLabelSet) (int, error) {
119118
s := m.shards[util.HashFP(model.Fingerprint(set.Hash))%numMetricCounterShards]
120119
s.RLock()
121120
if r, ok := s.valuesCounter[set.Hash]; ok {
@@ -129,7 +128,7 @@ func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTS
129128
})
130129
}
131130

132-
func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.MaxSeriesPerLabelSet, s *labelSetCounterShard) (int, error) {
131+
func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) {
133132
ir, err := u.db.Head().Index()
134133
if err != nil {
135134
return 0, err
@@ -171,7 +170,7 @@ func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit
171170
}
172171

173172
func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labels) {
174-
limits := m.limiter.maxSeriesPerLabelSet(u.userID, metric)
173+
limits := m.limiter.limitsPerLabelSets(u.userID, metric)
175174
for _, l := range limits {
176175
s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards]
177176
s.Lock()
@@ -188,7 +187,7 @@ func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labe
188187
}
189188

190189
func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labels) {
191-
limits := m.limiter.maxSeriesPerLabelSet(u.userID, metric)
190+
limits := m.limiter.limitsPerLabelSets(u.userID, metric)
192191
for _, l := range limits {
193192
s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards]
194193
s.Lock()
@@ -199,23 +198,26 @@ func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labe
199198
}
200199
}
201200

202-
func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, vec *prometheus.GaugeVec) error {
203-
currentLbsLimitHash := map[uint64]validation.MaxSeriesPerLabelSet{}
204-
for _, l := range m.limiter.limits.MaxSeriesPerLabelSet(u.userID) {
201+
func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics *ingesterMetrics) error {
202+
currentLbsLimitHash := map[uint64]validation.LimitsPerLabelSet{}
203+
for _, l := range m.limiter.limits.LimitsPerLabelSet(u.userID) {
205204
currentLbsLimitHash[l.Hash] = l
206205
}
207206

208207
for i := 0; i < numMetricCounterShards; i++ {
209208
s := m.shards[i]
210209
s.RLock()
211210
for h, entry := range s.valuesCounter {
211+
lbls := entry.labels.String()
212212
// This limit no longer exists
213213
if _, ok := currentLbsLimitHash[h]; !ok {
214-
vec.DeleteLabelValues(u.userID, entry.labels.String())
214+
metrics.usagePerLabelSet.DeleteLabelValues(u.userID, "max_series", lbls)
215+
metrics.limitsPerLabelSet.DeleteLabelValues(u.userID, "max_series", lbls)
215216
continue
216217
}
218+
metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count))
219+
metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries))
217220
delete(currentLbsLimitHash, h)
218-
vec.WithLabelValues(u.userID, entry.labels.String()).Set(float64(entry.count))
219221
}
220222
s.RUnlock()
221223
}
@@ -227,7 +229,9 @@ func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, vec *pr
227229
if err != nil {
228230
return err
229231
}
230-
vec.WithLabelValues(u.userID, l.LabelSet.String()).Set(float64(count))
232+
lbls := l.LabelSet.String()
233+
metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(count))
234+
metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(l.Limits.MaxSeries))
231235
}
232236

233237
return nil

0 commit comments

Comments
 (0)