Skip to content

Commit b35b10c

Browse files
committed
Ruler: Add support for per-user external labels
Signed-off-by: Xiaochao Dong (@damnever) <[email protected]>
1 parent e9d2ed7 commit b35b10c

File tree

12 files changed

+171
-20
lines changed

12 files changed

+171
-20
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1717
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
1818
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
19+
* [FEATURE] Ruler: Add support for per-user external labels #6340
1920
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
2021
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
2122
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333

docs/configuration/config-file-reference.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3620,6 +3620,9 @@ query_rejection:
36203620

36213621
# list of rule groups to disable
36223622
[disabled_rule_groups: <list of DisabledRuleGroup> | default = []]
3623+
3624+
# external labels for alerting rules
3625+
[external_labels: <map of string (labelName) to string (labelValue)> | default = []]
36233626
```
36243627
36253628
### `memberlist_config`

pkg/cortex/modules.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) {
154154
// no need to initialize module if load path is empty
155155
return nil, nil
156156
}
157-
t.Cfg.RuntimeConfig.Loader = loadRuntimeConfig
157+
runtimeConfigLoader := runtimeConfigLoader{cfg: t.Cfg}
158+
t.Cfg.RuntimeConfig.Loader = runtimeConfigLoader.load
158159

159160
// make sure to set default limits before we start loading configuration into memory
160161
validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig)
@@ -612,14 +613,14 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
612613
}
613614

614615
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
615-
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
616+
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
616617
} else {
617618
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
618619
// TODO: Consider wrapping logger to differentiate from querier module logger
619620
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)
620621

621622
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
622-
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
623+
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
623624
}
624625

625626
if err != nil {

pkg/cortex/runtime_config.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ func (l *runtimeConfigTenantLimits) AllByUserID() map[string]*validation.Limits
5858
return nil
5959
}
6060

61-
func loadRuntimeConfig(r io.Reader) (interface{}, error) {
61+
type runtimeConfigLoader struct {
62+
cfg Config
63+
}
64+
65+
func (l runtimeConfigLoader) load(r io.Reader) (interface{}, error) {
6266
var overrides = &RuntimeConfigValues{}
6367

6468
decoder := yaml.NewDecoder(r)
@@ -74,6 +78,12 @@ func loadRuntimeConfig(r io.Reader) (interface{}, error) {
7478
return nil, errMultipleDocuments
7579
}
7680

81+
for _, ul := range overrides.TenantLimits {
82+
if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels); err != nil {
83+
return nil, err
84+
}
85+
}
86+
7787
return overrides, nil
7888
}
7989

pkg/cortex/runtime_config_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/stretchr/testify/assert"
88
"github.com/stretchr/testify/require"
99

10+
"github.com/cortexproject/cortex/pkg/distributor"
1011
"github.com/cortexproject/cortex/pkg/util/validation"
1112
)
1213

@@ -28,7 +29,8 @@ overrides:
2829
'1235': *id001
2930
'1236': *id001
3031
`)
31-
runtimeCfg, err := loadRuntimeConfig(yamlFile)
32+
loader := runtimeConfigLoader{cfg: Config{Distributor: distributor.Config{ShardByAllLabels: true}}}
33+
runtimeCfg, err := loader.load(yamlFile)
3234
require.NoError(t, err)
3335

3436
limits := validation.Limits{
@@ -51,7 +53,7 @@ func TestLoadRuntimeConfig_ShouldLoadEmptyFile(t *testing.T) {
5153
yamlFile := strings.NewReader(`
5254
# This is an empty YAML.
5355
`)
54-
actual, err := loadRuntimeConfig(yamlFile)
56+
actual, err := runtimeConfigLoader{}.load(yamlFile)
5557
require.NoError(t, err)
5658
assert.Equal(t, &RuntimeConfigValues{}, actual)
5759
}
@@ -60,7 +62,7 @@ func TestLoadRuntimeConfig_MissingPointerFieldsAreNil(t *testing.T) {
6062
yamlFile := strings.NewReader(`
6163
# This is an empty YAML.
6264
`)
63-
actual, err := loadRuntimeConfig(yamlFile)
65+
actual, err := runtimeConfigLoader{}.load(yamlFile)
6466
require.NoError(t, err)
6567

6668
actualCfg, ok := actual.(*RuntimeConfigValues)
@@ -102,7 +104,7 @@ overrides:
102104
}
103105

104106
for _, tc := range cases {
105-
actual, err := loadRuntimeConfig(strings.NewReader(tc))
107+
actual, err := runtimeConfigLoader{}.load(strings.NewReader(tc))
106108
assert.Equal(t, errMultipleDocuments, err)
107109
assert.Nil(t, actual)
108110
}

pkg/ruler/compat.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ type RulesLimits interface {
153153
RulerMaxRulesPerRuleGroup(userID string) int
154154
RulerQueryOffset(userID string) time.Duration
155155
DisabledRuleGroups(userID string) validation.DisabledRuleGroups
156+
ExternalLabels(userID string) labels.Labels
156157
}
157158

158159
// EngineQueryFunc returns a new engine query function validating max queryLength.

pkg/ruler/external_labels.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package ruler
2+
3+
import (
4+
"sync"
5+
6+
"github.com/prometheus/prometheus/model/labels"
7+
)
8+
9+
// userExternalLabels checks and merges per-user external labels with global external labels.
10+
type userExternalLabels struct {
11+
global labels.Labels
12+
limits RulesLimits
13+
builder *labels.Builder
14+
15+
mtx sync.Mutex
16+
users map[string]labels.Labels
17+
}
18+
19+
func newUserExternalLabels(global labels.Labels, limits RulesLimits) *userExternalLabels {
20+
return &userExternalLabels{
21+
global: global,
22+
limits: limits,
23+
builder: labels.NewBuilder(nil),
24+
25+
mtx: sync.Mutex{},
26+
users: map[string]labels.Labels{},
27+
}
28+
}
29+
30+
func (e *userExternalLabels) get(userID string) (labels.Labels, bool) {
31+
e.mtx.Lock()
32+
defer e.mtx.Unlock()
33+
lset, ok := e.users[userID]
34+
return lset, ok
35+
}
36+
37+
func (e *userExternalLabels) update(userID string) (labels.Labels, bool) {
38+
lset := e.limits.ExternalLabels(userID)
39+
40+
e.mtx.Lock()
41+
defer e.mtx.Unlock()
42+
43+
e.builder.Reset(e.global)
44+
for _, l := range lset {
45+
e.builder.Set(l.Name, l.Value)
46+
}
47+
lset = e.builder.Labels()
48+
49+
if !labels.Equal(e.users[userID], lset) {
50+
e.users[userID] = lset
51+
return lset, true
52+
}
53+
return lset, false
54+
}
55+
56+
func (e *userExternalLabels) remove(user string) {
57+
e.mtx.Lock()
58+
defer e.mtx.Unlock()
59+
delete(e.users, user)
60+
}
61+
62+
func (e *userExternalLabels) cleanup() {
63+
e.mtx.Lock()
64+
defer e.mtx.Unlock()
65+
for user := range e.users {
66+
delete(e.users, user)
67+
}
68+
}

pkg/ruler/manager.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/prometheus/client_golang/prometheus/promauto"
1818
"github.com/prometheus/prometheus/config"
1919
"github.com/prometheus/prometheus/discovery"
20+
"github.com/prometheus/prometheus/model/labels"
2021
"github.com/prometheus/prometheus/model/rulefmt"
2122
"github.com/prometheus/prometheus/notifier"
2223
promRules "github.com/prometheus/prometheus/rules"
@@ -47,6 +48,9 @@ type DefaultMultiTenantManager struct {
4748
notifiers map[string]*rulerNotifier
4849
notifiersDiscoveryMetrics map[string]discovery.DiscovererMetrics
4950

51+
// Per-user externalLabels.
52+
userExternalLabels *userExternalLabels
53+
5054
// rules backup
5155
rulesBackupManager *rulesBackupManager
5256

@@ -62,7 +66,7 @@ type DefaultMultiTenantManager struct {
6266
syncRuleMtx sync.Mutex
6367
}
6468

65-
func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
69+
func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
6670
ncfg, err := buildNotifierConfig(&cfg)
6771
if err != nil {
6872
return nil, err
@@ -92,6 +96,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
9296
frontendPool: newFrontendPool(cfg, logger, reg),
9397
ruleEvalMetrics: evalMetrics,
9498
notifiers: map[string]*rulerNotifier{},
99+
userExternalLabels: newUserExternalLabels(cfg.ExternalLabels, limits),
95100
notifiersDiscoveryMetrics: notifiersDiscoveryMetrics,
96101
mapper: newMapper(cfg.RulePath, logger),
97102
userManagers: map[string]RulesManager{},
@@ -146,6 +151,7 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
146151

147152
r.removeNotifier(userID)
148153
r.mapper.cleanupUser(userID)
154+
r.userExternalLabels.remove(userID)
149155
r.lastReloadSuccessful.DeleteLabelValues(userID)
150156
r.lastReloadSuccessfulTimestamp.DeleteLabelValues(userID)
151157
r.configUpdatesTotal.DeleteLabelValues(userID)
@@ -183,12 +189,13 @@ func (r *DefaultMultiTenantManager) BackUpRuleGroups(ctx context.Context, ruleGr
183189
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
184190
// Map the files to disk and return the file names to be passed to the users manager if they
185191
// have been updated
186-
update, files, err := r.mapper.MapRules(user, groups.Formatted())
192+
rulesUpdated, files, err := r.mapper.MapRules(user, groups.Formatted())
187193
if err != nil {
188194
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
189195
level.Error(r.logger).Log("msg", "unable to map rule files", "user", user, "err", err)
190196
return
191197
}
198+
externalLabels, externalLabelsUpdated := r.userExternalLabels.update(user)
192199

193200
existing := true
194201
manager := r.getRulesManager(user, ctx)
@@ -201,19 +208,26 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user
201208
return
202209
}
203210

204-
if !existing || update {
211+
if !existing || rulesUpdated || externalLabelsUpdated {
205212
level.Debug(r.logger).Log("msg", "updating rules", "user", user)
206213
r.configUpdatesTotal.WithLabelValues(user).Inc()
207-
if update && existing {
214+
if rulesUpdated && existing {
208215
r.updateRuleCache(user, manager.RuleGroups())
209216
}
210-
err = manager.Update(r.cfg.EvaluationInterval, files, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc)
217+
err = manager.Update(r.cfg.EvaluationInterval, files, externalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc)
211218
r.deleteRuleCache(user)
212219
if err != nil {
213220
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
214221
level.Error(r.logger).Log("msg", "unable to update rule manager", "user", user, "err", err)
215222
return
216223
}
224+
if externalLabelsUpdated {
225+
if err = r.notifierApplyExternalLabels(user, externalLabels); err != nil {
226+
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
227+
level.Error(r.logger).Log("msg", "unable to update notifier", "user", user, "err", err)
228+
return
229+
}
230+
}
217231

218232
r.lastReloadSuccessful.WithLabelValues(user).Set(1)
219233
r.lastReloadSuccessfulTimestamp.WithLabelValues(user).SetToCurrentTime()
@@ -348,6 +362,19 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string, userManag
348362
return n.notifier, nil
349363
}
350364

365+
func (r *DefaultMultiTenantManager) notifierApplyExternalLabels(userID string, externalLabels labels.Labels) error {
366+
r.notifiersMtx.Lock()
367+
defer r.notifiersMtx.Unlock()
368+
369+
n, ok := r.notifiers[userID]
370+
if !ok {
371+
return fmt.Errorf("notifier not found")
372+
}
373+
cfg := *r.notifierCfg // Copy it
374+
cfg.GlobalConfig.ExternalLabels = externalLabels
375+
return n.applyConfig(&cfg)
376+
}
377+
351378
func (r *DefaultMultiTenantManager) getCachedRules(userID string) ([]*promRules.Group, bool) {
352379
r.ruleCacheMtx.RLock()
353380
defer r.ruleCacheMtx.RUnlock()
@@ -402,6 +429,7 @@ func (r *DefaultMultiTenantManager) Stop() {
402429

403430
// cleanup user rules directories
404431
r.mapper.cleanup()
432+
r.userExternalLabels.cleanup()
405433
}
406434

407435
func (*DefaultMultiTenantManager) ValidateRuleGroup(g rulefmt.RuleGroup) []error {

pkg/ruler/manager_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ func TestSyncRuleGroups(t *testing.T) {
2929
}
3030

3131
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
32+
limits := ruleLimits{externalLabels: labels.FromStrings("from", "cortex")}
3233

33-
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, nil, log.NewNopLogger())
34+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, limits, ruleManagerFactory, nil, nil, log.NewNopLogger())
3435
require.NoError(t, err)
3536

3637
const user = "testUser"
@@ -61,6 +62,9 @@ func TestSyncRuleGroups(t *testing.T) {
6162
require.NoError(t, err)
6263
require.Equal(t, []string{user}, users)
6364
require.True(t, ok)
65+
lset, ok := m.userExternalLabels.get(user)
66+
require.True(t, ok)
67+
require.Equal(t, limits.externalLabels, lset)
6468
}
6569

6670
// Passing empty map / nil stops all managers.
@@ -79,6 +83,8 @@ func TestSyncRuleGroups(t *testing.T) {
7983
require.NoError(t, err)
8084
require.Equal(t, []string(nil), users)
8185
require.False(t, ok)
86+
_, ok = m.userExternalLabels.get(user)
87+
require.False(t, ok)
8288
}
8389

8490
// Resync same rules as before. Previously this didn't restart the manager.
@@ -154,7 +160,7 @@ func TestSlowRuleGroupSyncDoesNotSlowdownListRules(t *testing.T) {
154160
}
155161

156162
ruleManagerFactory := RuleManagerFactory(groupsToReturn, waitDurations)
157-
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, prometheus.NewRegistry(), log.NewNopLogger())
163+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleLimits{}, ruleManagerFactory, nil, prometheus.NewRegistry(), log.NewNopLogger())
158164
require.NoError(t, err)
159165

160166
m.SyncRuleGroups(context.Background(), userRules)
@@ -217,7 +223,7 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
217223

218224
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
219225

220-
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
226+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleLimits{}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
221227
require.NoError(t, err)
222228

223229
const user = "testUser"
@@ -265,7 +271,7 @@ func TestBackupRules(t *testing.T) {
265271
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
266272
config := Config{RulePath: dir}
267273
config.Ring.ReplicationFactor = 3
268-
m, err := NewDefaultMultiTenantManager(config, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
274+
m, err := NewDefaultMultiTenantManager(config, ruleLimits{}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
269275
require.NoError(t, err)
270276

271277
const user1 = "testUser"

0 commit comments

Comments
 (0)