Skip to content

Commit f2aed1d

Browse files
committed
Introduce user scanner to Ruler/Alertmanager
Signed-off-by: SungJin1212 <[email protected]>
1 parent bd4773b commit f2aed1d

File tree

12 files changed

+138
-41
lines changed

12 files changed

+138
-41
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
55
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
66
* [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727
7+
* [FEATURE] Alertmanager/Ruler: Introduce a user scanner to reduce the number of list calls to object storage. #6999
78
* [FEATURE] Distributor: Add an experimental `-distributor.otlp.enable-type-and-unit-labels` flag to add `__type__` and `__unit__` labels for OTLP metrics. #6969
89
* [FEATURE] Distributor: Add an experimental `-distributor.otlp.allow-delta-temporality` flag to ingest delta temporality otlp metrics. #6934
910
* [FEATURE] Query Frontend: Add dynamic interval size for query splitting. This is enabled by configuring experimental flags `querier.max-shards-per-query` and/or `querier.max-fetched-data-duration-per-query`. The split interval size is dynamically increased to maintain a number of shards and total duration fetched below the configured values. #6458

docs/configuration/config-file-reference.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -981,6 +981,21 @@ local:
981981
# Path at which alertmanager configurations are stored.
982982
# CLI flag: -alertmanager-storage.local.path
983983
[path: <string> | default = ""]
984+
985+
users_scanner:
986+
# Strategy to use to scan users. Supported values are: list, user_index.
987+
# CLI flag: -alertmanager-storage.users-scanner.strategy
988+
[strategy: <string> | default = "list"]
989+
990+
# Maximum period of time to consider the user index as stale. Fall back to the
991+
# base scanner if stale. Only valid when strategy is user_index.
992+
# CLI flag: -alertmanager-storage.users-scanner.user-index.max-stale-period
993+
[max_stale_period: <duration> | default = 1h]
994+
995+
# TTL of the cached users. 0 disables caching and relies on caching at bucket
996+
# client level.
997+
# CLI flag: -alertmanager-storage.users-scanner.cache-ttl
998+
[cache_ttl: <duration> | default = 0s]
984999
```
9851000
9861001
### `blocks_storage_config`
@@ -5772,6 +5787,21 @@ local:
57725787
# Directory to scan for rules
57735788
# CLI flag: -ruler-storage.local.directory
57745789
[directory: <string> | default = ""]
5790+
5791+
users_scanner:
5792+
# Strategy to use to scan users. Supported values are: list, user_index.
5793+
# CLI flag: -ruler-storage.users-scanner.strategy
5794+
[strategy: <string> | default = "list"]
5795+
5796+
# Maximum period of time to consider the user index as stale. Fall back to the
5797+
# base scanner if stale. Only valid when strategy is user_index.
5798+
# CLI flag: -ruler-storage.users-scanner.user-index.max-stale-period
5799+
[max_stale_period: <duration> | default = 1h]
5800+
5801+
# TTL of the cached users. 0 disables caching and relies on caching at bucket
5802+
# client level.
5803+
# CLI flag: -ruler-storage.users-scanner.cache-ttl
5804+
[cache_ttl: <duration> | default = 0s]
57755805
```
57765806
57775807
### `runtime_configuration_storage_config`

pkg/alertmanager/alertstore/bucketclient/bucket_client.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,22 @@ package bucketclient
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"io"
78
"strings"
89
"sync"
910

1011
"github.com/go-kit/log"
1112
"github.com/gogo/protobuf/proto"
1213
"github.com/pkg/errors"
14+
"github.com/prometheus/client_golang/prometheus"
1315
"github.com/thanos-io/objstore"
14-
15-
"github.com/cortexproject/cortex/pkg/storage/tsdb"
16+
"github.com/thanos-io/thanos/pkg/extprom"
1617

1718
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
1819
"github.com/cortexproject/cortex/pkg/storage/bucket"
20+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
21+
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
1922
"github.com/cortexproject/cortex/pkg/util/concurrency"
2023
"github.com/cortexproject/cortex/pkg/util/runutil"
2124
)
@@ -45,27 +48,36 @@ type BucketAlertStore struct {
4548
amBucket objstore.Bucket
4649
cfgProvider bucket.TenantConfigProvider
4750
logger log.Logger
51+
52+
usersScanner users.Scanner
4853
}
4954

50-
func NewBucketAlertStore(bkt objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *BucketAlertStore {
55+
func NewBucketAlertStore(bkt objstore.InstrumentedBucket, userScannerCfg tsdb.UsersScannerConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) (*BucketAlertStore, error) {
56+
alertBucket := bucket.NewPrefixedBucketClient(bkt, alertsPrefix)
57+
58+
usersScanner, err := users.NewScanner(userScannerCfg, alertBucket, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "alertmanager"}, reg))
59+
if err != nil {
60+
return nil, errors.Wrap(err, "unable to initialize alertmanager users scanner")
61+
}
5162
return &BucketAlertStore{
52-
alertsBucket: bucket.NewPrefixedBucketClient(bkt, alertsPrefix),
63+
usersScanner: usersScanner,
64+
alertsBucket: alertBucket,
5365
amBucket: bucket.NewPrefixedBucketClient(bkt, alertmanagerPrefix),
5466
cfgProvider: cfgProvider,
5567
logger: logger,
56-
}
68+
}, nil
5769
}
5870

5971
// ListAllUsers implements alertstore.AlertStore.
6072
func (s *BucketAlertStore) ListAllUsers(ctx context.Context) ([]string, error) {
61-
var userIDs []string
62-
63-
err := s.alertsBucket.Iter(ctx, "", func(key string) error {
64-
userIDs = append(userIDs, key)
65-
return nil
66-
})
67-
68-
return userIDs, err
73+
active, deleting, _, err := s.usersScanner.ScanUsers(ctx)
74+
if err != nil {
75+
return nil, fmt.Errorf("unable to list users in alertmanager store bucket: %w", err)
76+
}
77+
userIDs := make([]string, 0, len(active)+len(deleting))
78+
userIDs = append(userIDs, active...)
79+
userIDs = append(userIDs, deleting...)
80+
return userIDs, nil
6981
}
7082

7183
// GetAlertConfigs implements alertstore.AlertStore.

pkg/alertmanager/alertstore/config.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ import (
77
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/local"
88
"github.com/cortexproject/cortex/pkg/configs/client"
99
"github.com/cortexproject/cortex/pkg/storage/bucket"
10+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
1011
)
1112

12-
// Config configures a the alertmanager storage backend.
13+
// Config configures the alertmanager storage backend.
1314
type Config struct {
1415
bucket.Config `yaml:",inline"`
15-
ConfigDB client.Config `yaml:"configdb"`
16-
Local local.StoreConfig `yaml:"local"`
16+
ConfigDB client.Config `yaml:"configdb"`
17+
Local local.StoreConfig `yaml:"local"`
18+
UsersScanner tsdb.UsersScannerConfig `yaml:"users_scanner"`
1719
}
1820

1921
// RegisterFlags registers the backend storage config.
@@ -24,6 +26,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
2426
cfg.ConfigDB.RegisterFlagsWithPrefix(prefix, f)
2527
cfg.Local.RegisterFlagsWithPrefix(prefix, f)
2628
cfg.RegisterFlagsWithPrefix(prefix, f)
29+
cfg.UsersScanner.RegisterFlagsWithPrefix(prefix, f)
2730
}
2831

2932
// IsFullStateSupported returns if the given configuration supports access to FullState objects.

pkg/alertmanager/alertstore/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,5 +67,5 @@ func NewAlertStore(ctx context.Context, cfg Config, cfgProvider bucket.TenantCon
6767
return nil, err
6868
}
6969

70-
return bucketclient.NewBucketAlertStore(bucketClient, cfgProvider, logger), nil
70+
return bucketclient.NewBucketAlertStore(bucketClient, cfg.UsersScanner, cfgProvider, logger, reg)
7171
}

pkg/alertmanager/alertstore/store_test.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import (
88

99
"github.com/go-kit/log"
1010
"github.com/prometheus/alertmanager/cluster/clusterpb"
11+
"github.com/prometheus/client_golang/prometheus"
1112
"github.com/stretchr/testify/assert"
1213
"github.com/stretchr/testify/require"
1314
"github.com/thanos-io/objstore"
1415

1516
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
1617
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/bucketclient"
18+
cortextsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
1719
)
1820

1921
var (
@@ -42,6 +44,15 @@ func TestAlertStore_ListAllUsers(t *testing.T) {
4244
require.NoError(t, err)
4345
assert.ElementsMatch(t, []string{"user-1", "user-2"}, users)
4446
}
47+
48+
{
49+
// delete user-1 alertmanager config
50+
require.NoError(t, store.DeleteAlertConfig(ctx, "user-1"))
51+
52+
users, err := store.ListAllUsers(ctx)
53+
require.NoError(t, err)
54+
assert.ElementsMatch(t, []string{"user-2"}, users)
55+
}
4556
})
4657
}
4758

@@ -172,7 +183,10 @@ func TestAlertStore_DeleteAlertConfig(t *testing.T) {
172183
func runForEachAlertStore(t *testing.T, testFn func(t *testing.T, store AlertStore, b *mockBucket, client interface{})) {
173184
bucketClient := objstore.NewInMemBucket()
174185
mBucketClient := &mockBucket{Bucket: bucketClient}
175-
bucketStore := bucketclient.NewBucketAlertStore(mBucketClient, nil, log.NewNopLogger())
186+
usersScannerConfig := cortextsdb.UsersScannerConfig{Strategy: cortextsdb.UserScanStrategyList}
187+
reg := prometheus.NewPedanticRegistry()
188+
bucketStore, err := bucketclient.NewBucketAlertStore(mBucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg)
189+
assert.NoError(t, err)
176190

177191
stores := map[string]struct {
178192
store AlertStore
@@ -212,7 +226,10 @@ func makeTestFullState(content string) alertspb.FullStateDesc {
212226
func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
213227
bucket := objstore.NewInMemBucket()
214228
mBucketClient := &mockBucket{Bucket: bucket}
215-
store := bucketclient.NewBucketAlertStore(mBucketClient, nil, log.NewNopLogger())
229+
usersScannerConfig := cortextsdb.UsersScannerConfig{Strategy: cortextsdb.UserScanStrategyList}
230+
reg := prometheus.NewPedanticRegistry()
231+
store, err := bucketclient.NewBucketAlertStore(mBucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg)
232+
assert.NoError(t, err)
216233
ctx := context.Background()
217234

218235
state1 := makeTestFullState("one")
@@ -297,6 +314,14 @@ type mockBucket struct {
297314
err error
298315
}
299316

317+
func (m *mockBucket) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket {
318+
return m
319+
}
320+
321+
func (m *mockBucket) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
322+
return m
323+
}
324+
300325
func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
301326
if m.err != nil {
302327
return nil, m.err

pkg/ruler/rulestore/bucketclient/bucket_client.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,16 @@ import (
1313
"github.com/go-kit/log/level"
1414
"github.com/gogo/protobuf/proto"
1515
"github.com/pkg/errors"
16+
"github.com/prometheus/client_golang/prometheus"
1617
"github.com/thanos-io/objstore"
18+
"github.com/thanos-io/thanos/pkg/extprom"
1719
"golang.org/x/sync/errgroup"
1820

1921
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
2022
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
2123
"github.com/cortexproject/cortex/pkg/storage/bucket"
24+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
25+
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
2226
"github.com/cortexproject/cortex/pkg/util/multierror"
2327
)
2428

@@ -42,14 +46,24 @@ type BucketRuleStore struct {
4246
bucket objstore.Bucket
4347
cfgProvider bucket.TenantConfigProvider
4448
logger log.Logger
49+
50+
usersScanner users.Scanner
4551
}
4652

47-
func NewBucketRuleStore(bkt objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *BucketRuleStore {
48-
return &BucketRuleStore{
49-
bucket: bucket.NewPrefixedBucketClient(bkt, rulesPrefix),
50-
cfgProvider: cfgProvider,
51-
logger: logger,
53+
func NewBucketRuleStore(bkt objstore.Bucket, userScannerCfg tsdb.UsersScannerConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) (*BucketRuleStore, error) {
54+
rulesBucket := bucket.NewPrefixedBucketClient(bkt, rulesPrefix)
55+
56+
usersScanner, err := users.NewScanner(userScannerCfg, rulesBucket, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "ruler"}, reg))
57+
if err != nil {
58+
return nil, errors.Wrap(err, "unable to initialize ruler users scanner")
5259
}
60+
61+
return &BucketRuleStore{
62+
bucket: rulesBucket,
63+
cfgProvider: cfgProvider,
64+
logger: logger,
65+
usersScanner: usersScanner,
66+
}, nil
5367
}
5468

5569
// getRuleGroup loads and return a rules group. If existing rule group is supplied, it is Reset and reused. If nil, new RuleGroupDesc is allocated.
@@ -94,16 +108,14 @@ func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, g
94108

95109
// ListAllUsers implements rules.RuleStore.
96110
func (b *BucketRuleStore) ListAllUsers(ctx context.Context) ([]string, error) {
97-
var users []string
98-
err := b.bucket.Iter(ctx, "", func(user string) error {
99-
users = append(users, strings.TrimSuffix(user, objstore.DirDelim))
100-
return nil
101-
})
111+
active, deleting, _, err := b.usersScanner.ScanUsers(ctx)
102112
if err != nil {
103113
return nil, fmt.Errorf("unable to list users in rule store bucket: %w", err)
104114
}
105-
106-
return users, nil
115+
userIDs := make([]string, 0, len(active)+len(deleting))
116+
userIDs = append(userIDs, active...)
117+
userIDs = append(userIDs, deleting...)
118+
return userIDs, nil
107119
}
108120

109121
// ListAllRuleGroups implements rules.RuleStore.

pkg/ruler/rulestore/bucketclient/bucket_client_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/go-kit/log"
1212
"github.com/pkg/errors"
13+
"github.com/prometheus/client_golang/prometheus"
1314
"github.com/prometheus/common/model"
1415
"github.com/prometheus/prometheus/model/rulefmt"
1516
"github.com/stretchr/testify/assert"
@@ -19,6 +20,7 @@ import (
1920
"github.com/cortexproject/cortex/pkg/cortexpb"
2021
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
2122
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
23+
cortextsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2224
"github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
2325
)
2426

@@ -105,7 +107,10 @@ func TestListRules(t *testing.T) {
105107
func TestLoadPartialRules(t *testing.T) {
106108
bucketClient := objstore.NewInMemBucket()
107109
mockedBucketClient := &testutil.MockBucketFailure{Bucket: bucketClient, GetFailures: map[string]error{}}
108-
bucketStore := NewBucketRuleStore(mockedBucketClient, nil, log.NewNopLogger())
110+
usersScannerConfig := cortextsdb.UsersScannerConfig{Strategy: cortextsdb.UserScanStrategyList}
111+
reg := prometheus.NewPedanticRegistry()
112+
bucketStore, err := NewBucketRuleStore(mockedBucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg)
113+
require.NoError(t, err)
109114

110115
groups := []testGroup{
111116
{user: "user1", namespace: "hello", ruleGroup: rulefmt.RuleGroup{Name: "second testGroup", Interval: model.Duration(2 * time.Minute)}},
@@ -263,7 +268,10 @@ func TestDelete(t *testing.T) {
263268

264269
func runForEachRuleStore(t *testing.T, testFn func(t *testing.T, store rulestore.RuleStore, bucketClient interface{})) {
265270
bucketClient := objstore.NewInMemBucket()
266-
bucketStore := NewBucketRuleStore(bucketClient, nil, log.NewNopLogger())
271+
reg := prometheus.NewPedanticRegistry()
272+
usersScannerConfig := cortextsdb.UsersScannerConfig{Strategy: cortextsdb.UserScanStrategyList}
273+
bucketStore, err := NewBucketRuleStore(bucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg)
274+
assert.NoError(t, err)
267275

268276
stores := map[string]struct {
269277
store rulestore.RuleStore
@@ -426,8 +434,11 @@ func TestListAllRuleGroupsWithNoNamespaceOrGroup(t *testing.T) {
426434
"rules/user3/bnM=/Z3JvdXAx", // namespace "ns", group "group1"
427435
},
428436
}
437+
usersScannerConfig := cortextsdb.UsersScannerConfig{Strategy: cortextsdb.UserScanStrategyList}
438+
reg := prometheus.NewPedanticRegistry()
429439

430-
s := NewBucketRuleStore(obj, nil, log.NewNopLogger())
440+
s, err := NewBucketRuleStore(obj, usersScannerConfig, nil, log.NewNopLogger(), reg)
441+
require.NoError(t, err)
431442
out, err := s.ListAllRuleGroups(context.Background())
432443
require.NoError(t, err)
433444

pkg/ruler/rulestore/config.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@ import (
88
"github.com/cortexproject/cortex/pkg/ruler/rulestore/configdb"
99
"github.com/cortexproject/cortex/pkg/ruler/rulestore/local"
1010
"github.com/cortexproject/cortex/pkg/storage/bucket"
11+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
1112
"github.com/cortexproject/cortex/pkg/util/flagext"
1213
)
1314

1415
// Config configures a rule store.
1516
type Config struct {
1617
bucket.Config `yaml:",inline"`
17-
ConfigDB client.Config `yaml:"configdb"`
18-
Local local.Config `yaml:"local"`
18+
ConfigDB client.Config `yaml:"configdb"`
19+
Local local.Config `yaml:"local"`
20+
UsersScanner tsdb.UsersScannerConfig `yaml:"users_scanner"`
1921
}
2022

2123
// RegisterFlags registers the backend storage config.
@@ -26,6 +28,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
2628
cfg.ConfigDB.RegisterFlagsWithPrefix(prefix, f)
2729
cfg.Local.RegisterFlagsWithPrefix(prefix, f)
2830
cfg.RegisterFlagsWithPrefix(prefix, f)
31+
cfg.UsersScanner.RegisterFlagsWithPrefix(prefix, f)
2932
}
3033

3134
// IsDefaults returns true if the storage options have not been set.

pkg/ruler/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,5 @@ func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.
3636
return nil, err
3737
}
3838

39-
return bucketclient.NewBucketRuleStore(bucketClient, cfgProvider, logger), nil
39+
return bucketclient.NewBucketRuleStore(bucketClient, cfg.UsersScanner, cfgProvider, logger, reg)
4040
}

0 commit comments

Comments
 (0)