Skip to content

Commit cc0dd3b

Browse files
committed
Add user scanner to Ruler/Alertmanager
Signed-off-by: SungJin1212 <[email protected]>
1 parent 6497984 commit cc0dd3b

File tree

9 files changed

+312
-49
lines changed

9 files changed

+312
-49
lines changed

docs/blocks-storage/querier.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1940,6 +1940,11 @@ blocks_storage:
19401940
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
19411941
[max_stale_period: <duration> | default = 1h]
19421942

1943+
# How frequently user index file is updated, it only take effect when user
1944+
# scan stratehy is user_index.
1945+
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
1946+
[clean_up_interval: <duration> | default = 15m]
1947+
19431948
# TTL of the cached users. 0 disables caching and relies on caching at
19441949
# bucket client level.
19451950
# CLI flag: -blocks-storage.users-scanner.cache-ttl

docs/blocks-storage/store-gateway.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2017,6 +2017,11 @@ blocks_storage:
20172017
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
20182018
[max_stale_period: <duration> | default = 1h]
20192019

2020+
# How frequently user index file is updated, it only take effect when user
2021+
# scan stratehy is user_index.
2022+
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
2023+
[clean_up_interval: <duration> | default = 15m]
2024+
20202025
# TTL of the cached users. 0 disables caching and relies on caching at
20212026
# bucket client level.
20222027
# CLI flag: -blocks-storage.users-scanner.cache-ttl

docs/configuration/config-file-reference.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,26 @@ local:
986986
# Path at which alertmanager configurations are stored.
987987
# CLI flag: -alertmanager-storage.local.path
988988
[path: <string> | default = ""]
989+
990+
users_scanner:
991+
# Strategy to use to scan users. Supported values are: list, user_index.
992+
# CLI flag: -alertmanager-storage.users-scanner.strategy
993+
[strategy: <string> | default = "list"]
994+
995+
# Maximum period of time to consider the user index as stale. Fall back to the
996+
# base scanner if stale. Only valid when strategy is user_index.
997+
# CLI flag: -alertmanager-storage.users-scanner.user-index.max-stale-period
998+
[max_stale_period: <duration> | default = 1h]
999+
1000+
# How frequently user index file is updated, it only take effect when user
1001+
# scan stratehy is user_index.
1002+
# CLI flag: -alertmanager-storage.users-scanner.user-index.cleanup-interval
1003+
[clean_up_interval: <duration> | default = 15m]
1004+
1005+
# TTL of the cached users. 0 disables caching and relies on caching at bucket
1006+
# client level.
1007+
# CLI flag: -alertmanager-storage.users-scanner.cache-ttl
1008+
[cache_ttl: <duration> | default = 0s]
9891009
```
9901010
9911011
### `blocks_storage_config`
@@ -2602,6 +2622,11 @@ users_scanner:
26022622
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
26032623
[max_stale_period: <duration> | default = 1h]
26042624

2625+
# How frequently user index file is updated, it only take effect when user
2626+
# scan stratehy is user_index.
2627+
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
2628+
[clean_up_interval: <duration> | default = 15m]
2629+
26052630
# TTL of the cached users. 0 disables caching and relies on caching at bucket
26062631
# client level.
26072632
# CLI flag: -blocks-storage.users-scanner.cache-ttl
@@ -5832,6 +5857,26 @@ local:
58325857
# Directory to scan for rules
58335858
# CLI flag: -ruler-storage.local.directory
58345859
[directory: <string> | default = ""]
5860+
5861+
users_scanner:
5862+
# Strategy to use to scan users. Supported values are: list, user_index.
5863+
# CLI flag: -ruler-storage.users-scanner.strategy
5864+
[strategy: <string> | default = "list"]
5865+
5866+
# Maximum period of time to consider the user index as stale. Fall back to the
5867+
# base scanner if stale. Only valid when strategy is user_index.
5868+
# CLI flag: -ruler-storage.users-scanner.user-index.max-stale-period
5869+
[max_stale_period: <duration> | default = 1h]
5870+
5871+
# How frequently user index file is updated, it only take effect when user
5872+
# scan stratehy is user_index.
5873+
# CLI flag: -ruler-storage.users-scanner.user-index.cleanup-interval
5874+
[clean_up_interval: <duration> | default = 15m]
5875+
5876+
# TTL of the cached users. 0 disables caching and relies on caching at bucket
5877+
# client level.
5878+
# CLI flag: -ruler-storage.users-scanner.cache-ttl
5879+
[cache_ttl: <duration> | default = 0s]
58355880
```
58365881
58375882
### `runtime_configuration_storage_config`

integration/alertmanager_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,41 @@ func TestAlertmanager(t *testing.T) {
6868
assertServiceMetricsPrefixes(t, AlertManager, alertmanager)
6969
}
7070

71+
func TestAlertmanagerWithUserIndexUpdater(t *testing.T) {
72+
s, err := e2e.NewScenario(networkName)
73+
require.NoError(t, err)
74+
defer s.Close()
75+
76+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs/user-1.yaml", []byte(cortexAlertmanagerUserConfigYaml)))
77+
78+
// Start dependencies.
79+
consul := e2edb.NewConsul()
80+
minio := e2edb.NewMinio(9000, alertsBucketName)
81+
require.NoError(t, s.StartAndWaitReady(consul, minio))
82+
83+
baseFlags := mergeFlags(AlertmanagerFlags(), AlertmanagerS3Flags())
84+
flags := mergeFlags(baseFlags, AlertmanagerShardingFlags(consul.NetworkHTTPEndpoint(), 1), map[string]string{
85+
"-alertmanager-storage.users-scanner.strategy": "user_index",
86+
"-alertmanager-storage.users-scanner.user-index.cleanup-interval": "15s",
87+
"-alertmanager.configs.poll-interval": "5s",
88+
})
89+
90+
am := e2ecortex.NewAlertmanager(
91+
"alertmanager",
92+
flags,
93+
"",
94+
)
95+
96+
require.NoError(t, s.StartAndWaitReady(am))
97+
// To make sure user index file is updated/scanned
98+
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_user_index_last_successful_update_timestamp_seconds"}),
99+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "alertmanager")),
100+
)
101+
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(1)), []string{"cortex_user_index_scan_succeeded_total"}),
102+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "alertmanager")),
103+
)
104+
}
105+
71106
func TestAlertmanagerStoreAPI(t *testing.T) {
72107
s, err := e2e.NewScenario(networkName)
73108
require.NoError(t, err)

integration/ruler_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,65 @@ func TestRulerAPI(t *testing.T) {
142142
}
143143
}
144144

145+
func TestRulerWithUserIndexUpdater(t *testing.T) {
146+
s, err := e2e.NewScenario(networkName)
147+
require.NoError(t, err)
148+
defer s.Close()
149+
150+
// Start dependencies.
151+
consul := e2edb.NewConsul()
152+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
153+
require.NoError(t, s.StartAndWaitReady(consul, minio))
154+
155+
// Configure the ruler.
156+
rulerFlags := mergeFlags(
157+
BlocksStorageFlags(),
158+
RulerFlags(),
159+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
160+
map[string]string{
161+
"-ruler.sharding-strategy": "shuffle-sharding",
162+
"-ruler-storage.users-scanner.strategy": "user_index",
163+
"-ruler-storage.users-scanner.user-index.cleanup-interval": "15s",
164+
"-ruler.tenant-shard-size": "1",
165+
// Since we're not going to run any rule, we don't need the
166+
// store-gateway to be configured to a valid address.
167+
"-querier.store-gateway-addresses": "localhost:12345",
168+
// Enable the bucket index so we can skip the initial bucket scan.
169+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
170+
"-ruler.poll-interval": "2s",
171+
"-log.level": "info",
172+
},
173+
)
174+
175+
ruler := e2ecortex.NewRuler(
176+
"ruler",
177+
consul.NetworkHTTPEndpoint(),
178+
rulerFlags,
179+
"",
180+
)
181+
182+
require.NoError(t, s.StartAndWaitReady(ruler))
183+
184+
// Create a client with the ruler address configured
185+
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), "user-1")
186+
require.NoError(t, err)
187+
188+
ruleGroup := createTestRuleGroup(t)
189+
ns := "ns"
190+
191+
// Set the rule group into the ruler
192+
require.NoError(t, c.SetRuleGroup(ruleGroup, ns))
193+
194+
// To make sure user index file is updated/scanned
195+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_user_index_last_successful_update_timestamp_seconds"}),
196+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "ruler")),
197+
)
198+
199+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(1)), []string{"cortex_user_index_scan_succeeded_total"}),
200+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "ruler")),
201+
)
202+
}
203+
145204
func TestRulerAPISingleBinary(t *testing.T) {
146205
s, err := e2e.NewScenario(networkName)
147206
require.NoError(t, err)

pkg/alertmanager/multitenant.go

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ import (
3333
"github.com/cortexproject/cortex/pkg/ring"
3434
"github.com/cortexproject/cortex/pkg/ring/client"
3535
"github.com/cortexproject/cortex/pkg/ring/kv"
36-
"github.com/cortexproject/cortex/pkg/tenant"
3736
"github.com/cortexproject/cortex/pkg/util"
3837
"github.com/cortexproject/cortex/pkg/util/concurrency"
3938
"github.com/cortexproject/cortex/pkg/util/flagext"
4039
util_log "github.com/cortexproject/cortex/pkg/util/log"
4140
"github.com/cortexproject/cortex/pkg/util/services"
41+
"github.com/cortexproject/cortex/pkg/util/users"
4242
)
4343

4444
const (
@@ -91,6 +91,8 @@ type MultitenantAlertmanagerConfig struct {
9191

9292
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
9393
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
94+
95+
CleanUpInterval time.Duration `yaml:"-"`
9496
}
9597

9698
type ClusterConfig struct {
@@ -284,14 +286,16 @@ type MultitenantAlertmanager struct {
284286

285287
limits Limits
286288

287-
allowedTenants *util.AllowedTenants
289+
allowedTenants *users.AllowedTenants
288290

289291
registry prometheus.Registerer
290292
ringCheckErrors prometheus.Counter
291293
tenantsOwned prometheus.Gauge
292294
tenantsDiscovered prometheus.Gauge
293295
syncTotal *prometheus.CounterVec
294296
syncFailures *prometheus.CounterVec
297+
298+
userIndexUpdater *users.UserIndexUpdater
295299
}
296300

297301
// NewMultitenantAlertmanager creates a new MultitenantAlertmanager.
@@ -374,10 +378,11 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC
374378
multitenantMetrics: newMultitenantAlertmanagerMetrics(registerer),
375379
peer: peer,
376380
store: store,
381+
userIndexUpdater: store.GetUserIndexUpdater(),
377382
logger: log.With(logger, "component", "MultiTenantAlertmanager"),
378383
registry: registerer,
379384
limits: limits,
380-
allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants),
385+
allowedTenants: users.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants),
381386
ringCheckErrors: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
382387
Name: "cortex_alertmanager_ring_check_errors_total",
383388
Help: "Number of errors that have occurred when checking the ring for ownership.",
@@ -667,6 +672,10 @@ func (am *MultitenantAlertmanager) run(ctx context.Context) error {
667672
ringTickerChan = ringTicker.C
668673
}
669674

675+
if am.cfg.ShardingEnabled && am.userIndexUpdater != nil {
676+
go am.userIndexUpdateLoop(ctx)
677+
}
678+
670679
for {
671680
select {
672681
case <-ctx.Done():
@@ -693,6 +702,32 @@ func (am *MultitenantAlertmanager) run(ctx context.Context) error {
693702
}
694703
}
695704

705+
func (am *MultitenantAlertmanager) userIndexUpdateLoop(ctx context.Context) {
706+
// Hardcode ID to check which alertmanager owns updating user index.
707+
userID := users.UserIndexCompressedFilename
708+
// Align with clean up interval.
709+
ticker := time.NewTicker(util.DurationWithJitter(am.userIndexUpdater.GetCleanUpInterval(), 0.1))
710+
defer ticker.Stop()
711+
712+
for {
713+
select {
714+
case <-ctx.Done():
715+
level.Error(am.logger).Log("msg", "context timeout, exit user index update loop", "err", ctx.Err())
716+
return
717+
case <-ticker.C:
718+
owned := am.isUserOwned(userID)
719+
if !owned {
720+
continue
721+
}
722+
if err := am.userIndexUpdater.UpdateUserIndex(ctx); err != nil {
723+
level.Error(am.logger).Log("msg", "failed to update user index", "err", err)
724+
// Wait for next interval. Worst case, the user index scanner will fallback to list strategy.
725+
continue
726+
}
727+
}
728+
}
729+
}
730+
696731
func (am *MultitenantAlertmanager) loadAndSyncConfigs(ctx context.Context, syncReason string) error {
697732
level.Info(am.logger).Log("msg", "synchronizing alertmanager configs for users")
698733
am.syncTotal.WithLabelValues(syncReason).Inc()
@@ -795,7 +830,7 @@ func (am *MultitenantAlertmanager) isUserOwned(userID string) bool {
795830
return true
796831
}
797832

798-
alertmanagers, err := am.ring.Get(shardByUser(userID), SyncRingOp, nil, nil, nil)
833+
alertmanagers, err := am.ring.Get(users.ShardByUser(userID), SyncRingOp, nil, nil, nil)
799834
if err != nil {
800835
am.ringCheckErrors.Inc()
801836
level.Error(am.logger).Log("msg", "failed to load alertmanager configuration", "user", userID, "err", err)
@@ -1005,7 +1040,7 @@ func (am *MultitenantAlertmanager) GetPositionForUser(userID string) int {
10051040
return 0
10061041
}
10071042

1008-
set, err := am.ring.Get(shardByUser(userID), RingOp, nil, nil, nil)
1043+
set, err := am.ring.Get(users.ShardByUser(userID), RingOp, nil, nil, nil)
10091044
if err != nil {
10101045
level.Error(am.logger).Log("msg", "unable to read the ring while trying to determine the alertmanager position", "err", err)
10111046
// If we're unable to determine the position, we don't want a tenant to miss out on the notification - instead,
@@ -1048,7 +1083,7 @@ func (am *MultitenantAlertmanager) HandleRequest(ctx context.Context, in *httpgr
10481083

10491084
// serveRequest serves the Alertmanager's web UI and API.
10501085
func (am *MultitenantAlertmanager) serveRequest(w http.ResponseWriter, req *http.Request) {
1051-
userID, err := tenant.TenantID(req.Context())
1086+
userID, err := users.TenantID(req.Context())
10521087
if err != nil {
10531088
http.Error(w, err.Error(), http.StatusUnauthorized)
10541089
return
@@ -1106,7 +1141,7 @@ func (am *MultitenantAlertmanager) ReplicateStateForUser(ctx context.Context, us
11061141
level.Debug(am.logger).Log("msg", "message received for replication", "user", userID, "key", part.Key)
11071142

11081143
selfAddress := am.ringLifecycler.GetInstanceAddr()
1109-
err := ring.DoBatch(ctx, RingOp, am.ring, nil, []uint32{shardByUser(userID)}, func(desc ring.InstanceDesc, _ []int) error {
1144+
err := ring.DoBatch(ctx, RingOp, am.ring, nil, []uint32{users.ShardByUser(userID)}, func(desc ring.InstanceDesc, _ []int) error {
11101145
if desc.GetAddr() == selfAddress {
11111146
return nil
11121147
}
@@ -1137,7 +1172,7 @@ func (am *MultitenantAlertmanager) ReplicateStateForUser(ctx context.Context, us
11371172
// state from all replicas, but will consider it a success if state is obtained from at least one replica.
11381173
func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, userID string) ([]*clusterpb.FullState, error) {
11391174
// Only get the set of replicas which contain the specified user.
1140-
key := shardByUser(userID)
1175+
key := users.ShardByUser(userID)
11411176
replicationSet, err := am.ring.Get(key, RingOp, nil, nil, nil)
11421177
if err != nil {
11431178
return nil, err
@@ -1197,7 +1232,7 @@ func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, use
11971232

11981233
// UpdateState implements the Alertmanager service.
11991234
func (am *MultitenantAlertmanager) UpdateState(ctx context.Context, part *clusterpb.Part) (*alertmanagerpb.UpdateStateResponse, error) {
1200-
userID, err := tenant.TenantID(ctx)
1235+
userID, err := users.TenantID(ctx)
12011236
if err != nil {
12021237
return nil, err
12031238
}
@@ -1307,7 +1342,7 @@ func (am *MultitenantAlertmanager) getPerUserDirectories() map[string]string {
13071342

13081343
// UpdateState implements the Alertmanager service.
13091344
func (am *MultitenantAlertmanager) ReadState(ctx context.Context, req *alertmanagerpb.ReadStateRequest) (*alertmanagerpb.ReadStateResponse, error) {
1310-
userID, err := tenant.TenantID(ctx)
1345+
userID, err := users.TenantID(ctx)
13111346
if err != nil {
13121347
return nil, err
13131348
}

0 commit comments

Comments
 (0)