Skip to content

Commit f8260cd

Browse files
authored
Add active series limit for Native Histogram (#6796)
Signed-off-by: Paurush Garg <[email protected]>
1 parent 10c9dd9 commit f8260cd

File tree

11 files changed

+348
-43
lines changed

11 files changed

+348
-43
lines changed

CHANGELOG.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# Changelog
22

33
## master / unreleased
4-
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
54
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
5+
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
66
* [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727
77
* [FEATURE] Query Frontend: Add dynamic interval size for query splitting. This is enabled by configuring experimental flags `querier.max-shards-per-query` and/or `querier.max-fetched-data-duration-per-query`. The split interval size is dynamically increased to maintain a number of shards and total duration fetched below the configured values. #6458
88
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
@@ -38,10 +38,11 @@
3838
* [ENHANCEMENT] Querier: Add metric and enhanced logging for query partial data. #6676
3939
* [ENHANCEMENT] Ingester: Push request should fail when label set is out of order #6746
4040
* [ENHANCEMENT] Querier: Add `querier.ingester-query-max-attempts` to retry on partial data. #6714
41-
* [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766
41+
* [ENHANCEMENT] Distributor: Add min/max schema validation for Native Histogram. #6766
4242
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
4343
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
44-
* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histograms. #6794
44+
* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histogram. #6794
45+
* [ENHANCEMENT] Ingester: Add active series limit specifically for Native Histogram. #6796
4546
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
4647
* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805
4748
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809 #6821

docs/configuration/config-file-reference.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3606,6 +3606,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
36063606
# CLI flag: -ingester.max-series-per-metric
36073607
[max_series_per_metric: <int> | default = 50000]
36083608
3609+
# The maximum number of active native histogram series per user, per ingester. 0
3610+
# to disable. Supported only if ingester.active-series-metrics-enabled is true.
3611+
# CLI flag: -ingester.max-native-histogram-series-per-user
3612+
[max_native_histogram_series_per_user: <int> | default = 0]
3613+
36093614
# The maximum number of active series per user, across the cluster before
36103615
# replication. 0 to disable. Supported only if -distributor.shard-by-all-labels
36113616
# is true.
@@ -3617,6 +3622,13 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
36173622
# CLI flag: -ingester.max-global-series-per-metric
36183623
[max_global_series_per_metric: <int> | default = 0]
36193624
3625+
# The maximum number of active native histogram series per user, across the
3626+
# cluster before replication. 0 to disable. Supported only if
3627+
# -distributor.shard-by-all-labels and ingester.active-series-metrics-enabled is
3628+
# true.
3629+
# CLI flag: -ingester.max-global-native-histogram-series-per-user
3630+
[max_global_native_histogram_series_per_user: <int> | default = 0]
3631+
36203632
# [Experimental] Enable limits per LabelSet. Supported limits per labelSet:
36213633
# [max_series]
36223634
[limits_per_label_set: <list of LimitsPerLabelSet> | default = []]

pkg/cortex/cortex.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func (c *Config) Validate(log log.Logger) error {
204204
if err := c.BlocksStorage.Validate(); err != nil {
205205
return errors.Wrap(err, "invalid TSDB config")
206206
}
207-
if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels); err != nil {
207+
if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels, c.Ingester.ActiveSeriesMetricsEnabled); err != nil {
208208
return errors.Wrap(err, "invalid limits config")
209209
}
210210
if err := c.ResourceMonitor.Validate(); err != nil {

pkg/cortex/runtime_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (l runtimeConfigLoader) load(r io.Reader) (interface{}, error) {
7979
}
8080

8181
for _, ul := range overrides.TenantLimits {
82-
if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels); err != nil {
82+
if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels, l.cfg.Ingester.ActiveSeriesMetricsEnabled); err != nil {
8383
return nil, err
8484
}
8585
}

pkg/ingester/ingester.go

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,11 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
453453
return err
454454
}
455455

456+
// Total native histogram series limit.
457+
if err := u.limiter.AssertMaxNativeHistogramSeriesPerUser(u.userID, u.activeSeries.ActiveNativeHistogram()); err != nil {
458+
return err
459+
}
460+
456461
// Series per metric name limit.
457462
metricName, err := extract.MetricNameFromLabels(metric)
458463
if err != nil {
@@ -1220,21 +1225,22 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
12201225
// Keep track of some stats which are tracked only if the samples will be
12211226
// successfully committed
12221227
var (
1223-
succeededSamplesCount = 0
1224-
failedSamplesCount = 0
1225-
succeededHistogramsCount = 0
1226-
failedHistogramsCount = 0
1227-
succeededExemplarsCount = 0
1228-
failedExemplarsCount = 0
1229-
startAppend = time.Now()
1230-
sampleOutOfBoundsCount = 0
1231-
sampleOutOfOrderCount = 0
1232-
sampleTooOldCount = 0
1233-
newValueForTimestampCount = 0
1234-
perUserSeriesLimitCount = 0
1235-
perLabelSetSeriesLimitCount = 0
1236-
perMetricSeriesLimitCount = 0
1237-
discardedNativeHistogramCount = 0
1228+
succeededSamplesCount = 0
1229+
failedSamplesCount = 0
1230+
succeededHistogramsCount = 0
1231+
failedHistogramsCount = 0
1232+
succeededExemplarsCount = 0
1233+
failedExemplarsCount = 0
1234+
startAppend = time.Now()
1235+
sampleOutOfBoundsCount = 0
1236+
sampleOutOfOrderCount = 0
1237+
sampleTooOldCount = 0
1238+
newValueForTimestampCount = 0
1239+
perUserSeriesLimitCount = 0
1240+
perUserNativeHistogramSeriesLimitCount = 0
1241+
perLabelSetSeriesLimitCount = 0
1242+
perMetricSeriesLimitCount = 0
1243+
discardedNativeHistogramCount = 0
12381244

12391245
updateFirstPartial = func(errFn func() error) {
12401246
if firstPartialErr == nil {
@@ -1270,6 +1276,12 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
12701276
return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels))
12711277
})
12721278

1279+
case errors.Is(cause, errMaxNativeHistogramSeriesPerUserLimitExceeded):
1280+
perUserNativeHistogramSeriesLimitCount++
1281+
updateFirstPartial(func() error {
1282+
return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels))
1283+
})
1284+
12731285
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
12741286
perMetricSeriesLimitCount++
12751287
updateFirstPartial(func() error {
@@ -1513,6 +1525,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
15131525
if perUserSeriesLimitCount > 0 {
15141526
i.validateMetrics.DiscardedSamples.WithLabelValues(perUserSeriesLimit, userID).Add(float64(perUserSeriesLimitCount))
15151527
}
1528+
if perUserNativeHistogramSeriesLimitCount > 0 {
1529+
i.validateMetrics.DiscardedSamples.WithLabelValues(perUserNativeHistogramSeriesLimit, userID).Add(float64(perUserNativeHistogramSeriesLimitCount))
1530+
}
15161531
if perMetricSeriesLimitCount > 0 {
15171532
i.validateMetrics.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
15181533
}

pkg/ingester/ingester_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,7 @@ func TestIngesterUserLimitExceeded(t *testing.T) {
754754
limits := defaultLimitsTestConfig()
755755
limits.EnableNativeHistograms = true
756756
limits.MaxLocalSeriesPerUser = 1
757+
limits.MaxLocalNativeHistogramSeriesPerUser = 1
757758
limits.MaxLocalMetricsWithMetadataPerUser = 1
758759

759760
userID := "1"
@@ -868,6 +869,93 @@ func TestIngesterUserLimitExceeded(t *testing.T) {
868869

869870
}
870871

872+
func TestIngesterUserLimitExceededForNativeHistogram(t *testing.T) {
873+
limits := defaultLimitsTestConfig()
874+
limits.EnableNativeHistograms = true
875+
limits.MaxLocalNativeHistogramSeriesPerUser = 1
876+
limits.MaxLocalSeriesPerUser = 2
877+
limits.MaxLocalMetricsWithMetadataPerUser = 1
878+
879+
userID := "1"
880+
// Series
881+
labels1 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}
882+
labels3 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "biz"}}
883+
sampleNativeHistogram1 := cortexpb.HistogramToHistogramProto(0, tsdbutil.GenerateTestHistogram(1))
884+
sampleNativeHistogram2 := cortexpb.HistogramToHistogramProto(1, tsdbutil.GenerateTestHistogram(2))
885+
sampleNativeHistogram3 := cortexpb.HistogramToHistogramProto(0, tsdbutil.GenerateTestHistogram(3))
886+
887+
// Metadata
888+
metadata1 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric", Help: "a help for testmetric", Type: cortexpb.COUNTER}
889+
metadata2 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric2", Help: "a help for testmetric2", Type: cortexpb.COUNTER}
890+
891+
dir := t.TempDir()
892+
893+
chunksDir := filepath.Join(dir, "chunks")
894+
blocksDir := filepath.Join(dir, "blocks")
895+
require.NoError(t, os.Mkdir(chunksDir, os.ModePerm))
896+
require.NoError(t, os.Mkdir(blocksDir, os.ModePerm))
897+
898+
blocksIngesterGenerator := func(reg prometheus.Registerer) *Ingester {
899+
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, nil, blocksDir, reg)
900+
require.NoError(t, err)
901+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
902+
// Wait until it's ACTIVE
903+
test.Poll(t, time.Second, ring.ACTIVE, func() interface{} {
904+
return ing.lifecycler.GetState()
905+
})
906+
907+
return ing
908+
}
909+
910+
tests := []string{"blocks"}
911+
for i, ingGenerator := range []func(reg prometheus.Registerer) *Ingester{blocksIngesterGenerator} {
912+
t.Run(tests[i], func(t *testing.T) {
913+
reg := prometheus.NewRegistry()
914+
ing := ingGenerator(reg)
915+
916+
// Append only one series and one metadata first, expect no error.
917+
ctx := user.InjectOrgID(context.Background(), userID)
918+
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1}, nil, []*cortexpb.MetricMetadata{metadata1}, []cortexpb.Histogram{sampleNativeHistogram1}, cortexpb.API))
919+
require.NoError(t, err)
920+
921+
testLimits := func(reg prometheus.Gatherer) {
922+
// Append to two series, expect series-exceeded error.
923+
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1, labels3}, nil, nil, []cortexpb.Histogram{sampleNativeHistogram2, sampleNativeHistogram3}, cortexpb.API))
924+
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
925+
require.True(t, ok, "returned error is not an httpgrpc response")
926+
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
927+
assert.Equal(t, wrapWithUser(makeLimitError(perUserNativeHistogramSeriesLimit, ing.limiter.FormatError(userID, errMaxNativeHistogramSeriesPerUserLimitExceeded, labels1)), userID).Error(), string(httpResp.Body))
928+
929+
// Append two metadata, expect no error since metadata is a best effort approach.
930+
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(nil, nil, []*cortexpb.MetricMetadata{metadata1, metadata2}, nil, cortexpb.API))
931+
require.NoError(t, err)
932+
933+
// Read samples back via ingester queries.
934+
res, _, err := runTestQuery(ctx, t, ing, labels.MatchEqual, model.MetricNameLabel, "testmetric")
935+
require.NoError(t, err)
936+
require.NotNil(t, res)
937+
938+
// Verify metadata
939+
m, err := ing.MetricsMetadata(ctx, &client.MetricsMetadataRequest{Limit: -1, LimitPerMetric: -1, Metric: ""})
940+
require.NoError(t, err)
941+
assert.Equal(t, []*cortexpb.MetricMetadata{metadata1}, m.Metadata)
942+
}
943+
944+
testLimits(reg)
945+
946+
// Limits should hold after restart.
947+
services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
948+
// Use new registry to prevent metrics registration panic.
949+
reg = prometheus.NewRegistry()
950+
ing = ingGenerator(reg)
951+
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
952+
953+
testLimits(reg)
954+
})
955+
}
956+
957+
}
958+
871959
func benchmarkData(nSeries int) (allLabels []labels.Labels, allSamples []cortexpb.Sample) {
872960
for j := 0; j < nSeries; j++ {
873961
labels := chunk.BenchmarkLabels.Copy()
@@ -886,6 +974,7 @@ func TestIngesterMetricLimitExceeded(t *testing.T) {
886974
limits := defaultLimitsTestConfig()
887975
limits.EnableNativeHistograms = true
888976
limits.MaxLocalSeriesPerMetric = 1
977+
limits.MaxLocalNativeHistogramSeriesPerUser = 1
889978
limits.MaxLocalMetadataPerMetric = 1
890979

891980
userID := "1"

pkg/ingester/limiter.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import (
1212
)
1313

1414
var (
15-
errMaxSeriesPerMetricLimitExceeded = errors.New("per-metric series limit exceeded")
16-
errMaxMetadataPerMetricLimitExceeded = errors.New("per-metric metadata limit exceeded")
17-
errMaxSeriesPerUserLimitExceeded = errors.New("per-user series limit exceeded")
18-
errMaxMetadataPerUserLimitExceeded = errors.New("per-user metric metadata limit exceeded")
15+
errMaxSeriesPerMetricLimitExceeded = errors.New("per-metric series limit exceeded")
16+
errMaxMetadataPerMetricLimitExceeded = errors.New("per-metric metadata limit exceeded")
17+
errMaxSeriesPerUserLimitExceeded = errors.New("per-user series limit exceeded")
18+
errMaxNativeHistogramSeriesPerUserLimitExceeded = errors.New("per-user native histogram series limit exceeded")
19+
errMaxMetadataPerUserLimitExceeded = errors.New("per-user metric metadata limit exceeded")
1920
)
2021

2122
type errMaxSeriesPerLabelSetLimitExceeded struct {
@@ -95,6 +96,16 @@ func (l *Limiter) AssertMaxSeriesPerUser(userID string, series int) error {
9596
return errMaxSeriesPerUserLimitExceeded
9697
}
9798

99+
// AssertMaxNativeHistogramSeriesPerUser limit has not been reached compared to the current
100+
// number of native histogram series in input and returns an error if so.
101+
func (l *Limiter) AssertMaxNativeHistogramSeriesPerUser(userID string, series int) error {
102+
if actualLimit := l.maxNativeHistogramSeriesPerUser(userID); series < actualLimit {
103+
return nil
104+
}
105+
106+
return errMaxNativeHistogramSeriesPerUserLimitExceeded
107+
}
108+
98109
// AssertMaxMetricsWithMetadataPerUser limit has not been reached compared to the current
99110
// number of metrics with metadata in input and returns an error if so.
100111
func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int) error {
@@ -134,6 +145,8 @@ func (l *Limiter) FormatError(userID string, err error, lbls labels.Labels) erro
134145
switch {
135146
case errors.Is(err, errMaxSeriesPerUserLimitExceeded):
136147
return l.formatMaxSeriesPerUserError(userID)
148+
case errors.Is(err, errMaxNativeHistogramSeriesPerUserLimitExceeded):
149+
return l.formatMaxNativeHistogramsSeriesPerUserError(userID)
137150
case errors.Is(err, errMaxSeriesPerMetricLimitExceeded):
138151
return l.formatMaxSeriesPerMetricError(userID, lbls.Get(labels.MetricName))
139152
case errors.Is(err, errMaxMetadataPerUserLimitExceeded):
@@ -158,6 +171,15 @@ func (l *Limiter) formatMaxSeriesPerUserError(userID string) error {
158171
minNonZero(localLimit, globalLimit), l.AdminLimitMessage, localLimit, globalLimit, actualLimit)
159172
}
160173

174+
func (l *Limiter) formatMaxNativeHistogramsSeriesPerUserError(userID string) error {
175+
actualLimit := l.maxNativeHistogramSeriesPerUser(userID)
176+
localLimit := l.limits.MaxLocalNativeHistogramSeriesPerUser(userID)
177+
globalLimit := l.limits.MaxGlobalNativeHistogramSeriesPerUser(userID)
178+
179+
return fmt.Errorf("per-user native histogram series limit of %d exceeded, %s (local limit: %d global limit: %d actual local limit: %d)",
180+
minNonZero(localLimit, globalLimit), l.AdminLimitMessage, localLimit, globalLimit, actualLimit)
181+
}
182+
161183
func (l *Limiter) formatMaxSeriesPerMetricError(userID string, metric string) error {
162184
actualLimit := l.maxSeriesPerMetric(userID)
163185
localLimit := l.limits.MaxLocalSeriesPerMetric(userID)
@@ -248,6 +270,14 @@ func (l *Limiter) maxSeriesPerUser(userID string) int {
248270
)
249271
}
250272

273+
func (l *Limiter) maxNativeHistogramSeriesPerUser(userID string) int {
274+
return l.maxByLocalAndGlobal(
275+
userID,
276+
l.limits.MaxLocalNativeHistogramSeriesPerUser,
277+
l.limits.MaxGlobalNativeHistogramSeriesPerUser,
278+
)
279+
}
280+
251281
func (l *Limiter) maxMetadataPerUser(userID string) int {
252282
return l.maxByLocalAndGlobal(
253283
userID,

0 commit comments

Comments
 (0)