@@ -254,11 +254,12 @@ func (r tsdbCloseCheckResult) shouldClose() bool {
254
254
}
255
255
256
256
type userTSDB struct {
257
- db * tsdb.DB
258
- userID string
259
- activeSeries * ActiveSeries
260
- seriesInMetric * metricCounter
261
- limiter * Limiter
257
+ db * tsdb.DB
258
+ userID string
259
+ activeSeries * ActiveSeries
260
+ seriesInMetric * metricCounter
261
+ labelSetCounter * labelSetCounter
262
+ limiter * Limiter
262
263
263
264
instanceSeriesCount * atomic.Int64 // Shared across all userTSDB instances created by ingester.
264
265
instanceLimitsFn func () * InstanceLimits
@@ -399,6 +400,10 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
399
400
return err
400
401
}
401
402
403
+ if err := u .labelSetCounter .canAddSeriesForLabelSet (context .TODO (), u , metric ); err != nil {
404
+ return err
405
+ }
406
+
402
407
return nil
403
408
}
404
409
@@ -412,6 +417,7 @@ func (u *userTSDB) PostCreation(metric labels.Labels) {
412
417
return
413
418
}
414
419
u .seriesInMetric .increaseSeriesForMetric (metricName )
420
+ u .labelSetCounter .increaseSeriesLabelSet (u , metric )
415
421
}
416
422
417
423
// PostDeletion implements SeriesLifecycleCallback interface.
@@ -425,6 +431,7 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels)
425
431
continue
426
432
}
427
433
u .seriesInMetric .decreaseSeriesForMetric (metricName )
434
+ u .labelSetCounter .decreaseSeriesLabelSet (u , metric )
428
435
}
429
436
}
430
437
@@ -713,6 +720,15 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe
713
720
TSDBState : newTSDBState (bucketClient , registerer ),
714
721
logger : logger ,
715
722
}
723
+ i .limiter = NewLimiter (
724
+ limits ,
725
+ i .lifecycler ,
726
+ cfg .DistributorShardingStrategy ,
727
+ cfg .DistributorShardByAllLabels ,
728
+ cfg .LifecyclerConfig .RingConfig .ReplicationFactor ,
729
+ cfg .LifecyclerConfig .RingConfig .ZoneAwarenessEnabled ,
730
+ cfg .AdminLimitMessage ,
731
+ )
716
732
i .metrics = newIngesterMetrics (registerer ,
717
733
false ,
718
734
false ,
@@ -924,6 +940,7 @@ func (i *Ingester) updateActiveSeries() {
924
940
925
941
userDB .activeSeries .Purge (purgeTime )
926
942
i .metrics .activeSeriesPerUser .WithLabelValues (userID ).Set (float64 (userDB .activeSeries .Active ()))
943
+ userDB .labelSetCounter .UpdateMetric (userDB , i .metrics .activeSeriesPerLabelSet )
927
944
}
928
945
}
929
946
@@ -1100,38 +1117,43 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1100
1117
// of it, so that we can return it back to the distributor, which will return a
1101
1118
// 400 error to the client. The client (Prometheus) will not retry on 400, and
1102
1119
// we actually ingested all samples which haven't failed.
1103
- switch cause := errors .Cause (err ); cause {
1104
- case storage .ErrOutOfBounds :
1120
+ switch cause := errors .Cause (err ); {
1121
+ case errors . Is ( cause , storage .ErrOutOfBounds ) :
1105
1122
sampleOutOfBoundsCount ++
1106
1123
updateFirstPartial (func () error { return wrappedTSDBIngestErr (err , model .Time (s .TimestampMs ), ts .Labels ) })
1107
1124
continue
1108
1125
1109
- case storage .ErrOutOfOrderSample :
1126
+ case errors . Is ( cause , storage .ErrOutOfOrderSample ) :
1110
1127
sampleOutOfOrderCount ++
1111
1128
updateFirstPartial (func () error { return wrappedTSDBIngestErr (err , model .Time (s .TimestampMs ), ts .Labels ) })
1112
1129
continue
1113
1130
1114
- case storage .ErrDuplicateSampleForTimestamp :
1131
+ case errors . Is ( cause , storage .ErrDuplicateSampleForTimestamp ) :
1115
1132
newValueForTimestampCount ++
1116
1133
updateFirstPartial (func () error { return wrappedTSDBIngestErr (err , model .Time (s .TimestampMs ), ts .Labels ) })
1117
1134
continue
1118
1135
1119
- case storage .ErrTooOldSample :
1136
+ case errors . Is ( cause , storage .ErrTooOldSample ) :
1120
1137
sampleTooOldCount ++
1121
1138
updateFirstPartial (func () error { return wrappedTSDBIngestErr (err , model .Time (s .TimestampMs ), ts .Labels ) })
1122
1139
continue
1123
1140
1124
- case errMaxSeriesPerUserLimitExceeded :
1141
+ case errors . Is ( cause , errMaxSeriesPerUserLimitExceeded ) :
1125
1142
perUserSeriesLimitCount ++
1126
1143
updateFirstPartial (func () error { return makeLimitError (perUserSeriesLimit , i .limiter .FormatError (userID , cause )) })
1127
1144
continue
1128
1145
1129
- case errMaxSeriesPerMetricLimitExceeded :
1146
+ case errors . Is ( cause , errMaxSeriesPerMetricLimitExceeded ) :
1130
1147
perMetricSeriesLimitCount ++
1131
1148
updateFirstPartial (func () error {
1132
1149
return makeMetricLimitError (perMetricSeriesLimit , copiedLabels , i .limiter .FormatError (userID , cause ))
1133
1150
})
1134
1151
continue
1152
+ case errors .As (cause , & errMaxSeriesPerLabelSetLimitExceeded {}):
1153
+ updateFirstPartial (func () error {
1154
+ return makeMetricLimitError (perLabelsetSeriesLimit , copiedLabels , i .limiter .FormatError (userID , cause ))
1155
+ })
1156
+ continue
1135
1157
}
1136
1158
1137
1159
// The error looks an issue on our side, so we should rollback
@@ -2018,6 +2040,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
2018
2040
userID : userID ,
2019
2041
activeSeries : NewActiveSeries (),
2020
2042
seriesInMetric : newMetricCounter (i .limiter , i .cfg .getIgnoreSeriesLimitForMetricNamesMap ()),
2043
+ labelSetCounter : newLabelSetCounter (i .limiter ),
2021
2044
ingestedAPISamples : util_math .NewEWMARate (0.2 , i .cfg .RateUpdatePeriod ),
2022
2045
ingestedRuleSamples : util_math .NewEWMARate (0.2 , i .cfg .RateUpdatePeriod ),
2023
2046
0 commit comments