Skip to content

Commit 2fe6ad3

Browse files
committed
Move user related files to pkg/util/users
Signed-off-by: SungJin1212 <[email protected]>
1 parent 2e2d39b commit 2fe6ad3

File tree

126 files changed

+1418
-1186
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

126 files changed

+1418
-1186
lines changed

CHANGELOG.md

Lines changed: 524 additions & 523 deletions
Large diffs are not rendered by default.

pkg/alertmanager/alertstore/bucketclient/bucket_client.go

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,23 @@ package bucketclient
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"io"
78
"strings"
89
"sync"
910

1011
"github.com/go-kit/log"
1112
"github.com/gogo/protobuf/proto"
1213
"github.com/pkg/errors"
14+
"github.com/prometheus/client_golang/prometheus"
1315
"github.com/thanos-io/objstore"
14-
15-
"github.com/cortexproject/cortex/pkg/storage/tsdb"
16+
"github.com/thanos-io/thanos/pkg/extprom"
1617

1718
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
1819
"github.com/cortexproject/cortex/pkg/storage/bucket"
1920
"github.com/cortexproject/cortex/pkg/util/concurrency"
2021
"github.com/cortexproject/cortex/pkg/util/runutil"
22+
"github.com/cortexproject/cortex/pkg/util/users"
2123
)
2224

2325
const (
@@ -45,27 +47,54 @@ type BucketAlertStore struct {
4547
amBucket objstore.Bucket
4648
cfgProvider bucket.TenantConfigProvider
4749
logger log.Logger
50+
51+
usersScanner users.Scanner
52+
userIndexUpdater *users.UserIndexUpdater
4853
}
4954

50-
func NewBucketAlertStore(bkt objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *BucketAlertStore {
51-
return &BucketAlertStore{
52-
alertsBucket: bucket.NewPrefixedBucketClient(bkt, alertsPrefix),
53-
amBucket: bucket.NewPrefixedBucketClient(bkt, alertmanagerPrefix),
54-
cfgProvider: cfgProvider,
55-
logger: logger,
55+
func NewBucketAlertStore(bkt objstore.InstrumentedBucket, userScannerCfg users.UsersScannerConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) (*BucketAlertStore, error) {
56+
alertBucket := bucket.NewPrefixedBucketClient(bkt, alertsPrefix)
57+
58+
regWithComponent := extprom.WrapRegistererWith(prometheus.Labels{"component": "alertmanager"}, reg)
59+
usersScanner, err := users.NewScanner(userScannerCfg, alertBucket, logger, regWithComponent)
60+
if err != nil {
61+
return nil, errors.Wrap(err, "unable to initialize alertmanager users scanner")
5662
}
63+
64+
var userIndexUpdater *users.UserIndexUpdater
65+
if userScannerCfg.Strategy == users.UserScanStrategyUserIndex {
66+
// We hardcode strategy to be list so can ignore error.
67+
baseScanner, _ := users.NewScanner(users.UsersScannerConfig{
68+
Strategy: users.UserScanStrategyList,
69+
}, alertBucket, logger, regWithComponent)
70+
userIndexUpdater = users.NewUserIndexUpdater(alertBucket, userScannerCfg.CleanUpInterval, baseScanner, regWithComponent)
71+
}
72+
73+
return &BucketAlertStore{
74+
alertsBucket: alertBucket,
75+
amBucket: bucket.NewPrefixedBucketClient(bkt, alertmanagerPrefix),
76+
cfgProvider: cfgProvider,
77+
logger: logger,
78+
usersScanner: usersScanner,
79+
userIndexUpdater: userIndexUpdater,
80+
}, nil
81+
}
82+
83+
// GetUserIndexUpdater implements alertstore.AlertStore.
84+
func (s *BucketAlertStore) GetUserIndexUpdater() *users.UserIndexUpdater {
85+
return s.userIndexUpdater
5786
}
5887

5988
// ListAllUsers implements alertstore.AlertStore.
6089
func (s *BucketAlertStore) ListAllUsers(ctx context.Context) ([]string, error) {
61-
var userIDs []string
62-
63-
err := s.alertsBucket.Iter(ctx, "", func(key string) error {
64-
userIDs = append(userIDs, key)
65-
return nil
66-
})
67-
68-
return userIDs, err
90+
active, deleting, _, err := s.usersScanner.ScanUsers(ctx)
91+
if err != nil {
92+
return nil, fmt.Errorf("unable to list users in alertmanager store bucket: %w", err)
93+
}
94+
userIDs := make([]string, 0, len(active)+len(deleting))
95+
userIDs = append(userIDs, active...)
96+
userIDs = append(userIDs, deleting...)
97+
return userIDs, nil
6998
}
7099

71100
// GetAlertConfigs implements alertstore.AlertStore.
@@ -217,5 +246,5 @@ func (s *BucketAlertStore) getUserBucket(userID string) objstore.Bucket {
217246

218247
func (s *BucketAlertStore) getAlertmanagerUserBucket(userID string) objstore.Bucket {
219248
uBucket := bucket.NewUserBucketClient(userID, s.amBucket, s.cfgProvider)
220-
return uBucket.WithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(uBucket.IsAccessDeniedErr, uBucket.IsObjNotFoundErr))
249+
return uBucket.WithExpectedErrs(bucket.IsOneOfTheExpectedErrors(uBucket.IsAccessDeniedErr, uBucket.IsObjNotFoundErr))
221250
}

pkg/alertmanager/alertstore/config.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ import (
88
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/local"
99
"github.com/cortexproject/cortex/pkg/configs/client"
1010
"github.com/cortexproject/cortex/pkg/storage/bucket"
11+
"github.com/cortexproject/cortex/pkg/util/users"
1112
)
1213

13-
// Config configures a the alertmanager storage backend.
14+
// Config configures the alertmanager storage backend.
1415
type Config struct {
1516
bucket.Config `yaml:",inline"`
16-
ConfigDB client.Config `yaml:"configdb"`
17-
Local local.StoreConfig `yaml:"local"`
17+
ConfigDB client.Config `yaml:"configdb"`
18+
Local local.StoreConfig `yaml:"local"`
19+
UsersScanner users.UsersScannerConfig `yaml:"users_scanner"`
1820
}
1921

2022
// RegisterFlags registers the backend storage config.
@@ -25,6 +27,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
2527
cfg.ConfigDB.RegisterFlagsWithPrefix(prefix, f)
2628
cfg.Local.RegisterFlagsWithPrefix(prefix, f)
2729
cfg.RegisterFlagsWithPrefix(prefix, f)
30+
cfg.UsersScanner.RegisterFlagsWithPrefix(prefix, f)
2831
}
2932

3033
// IsFullStateSupported returns if the given configuration supports access to FullState objects.

pkg/alertmanager/alertstore/configdb/store.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
88
"github.com/cortexproject/cortex/pkg/configs/client"
99
"github.com/cortexproject/cortex/pkg/configs/userconfig"
10+
"github.com/cortexproject/cortex/pkg/util/users"
1011
)
1112

1213
const (
@@ -34,6 +35,11 @@ func NewStore(c client.Client) *Store {
3435
}
3536
}
3637

38+
// GetUserIndexUpdater implements alertstore.AlertStore.
39+
func (c *Store) GetUserIndexUpdater() *users.UserIndexUpdater {
40+
return nil
41+
}
42+
3743
// ListAllUsers implements alertstore.AlertStore.
3844
func (c *Store) ListAllUsers(ctx context.Context) ([]string, error) {
3945
configs, err := c.reloadConfigs(ctx)

pkg/alertmanager/alertstore/local/store.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/prometheus/alertmanager/config"
1212

1313
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
14+
"github.com/cortexproject/cortex/pkg/util/users"
1415
)
1516

1617
const (
@@ -43,6 +44,11 @@ func NewStore(cfg StoreConfig) (*Store, error) {
4344
return &Store{cfg}, nil
4445
}
4546

47+
// GetUserIndexUpdater implements alertstore.AlertStore.
48+
func (f *Store) GetUserIndexUpdater() *users.UserIndexUpdater {
49+
return nil
50+
}
51+
4652
// ListAllUsers implements alertstore.AlertStore.
4753
func (f *Store) ListAllUsers(_ context.Context) ([]string, error) {
4854
configs, err := f.reloadConfigs()

pkg/alertmanager/alertstore/store.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,24 @@ package alertstore
22

33
import (
44
"context"
5+
"fmt"
6+
"io"
57

68
"github.com/go-kit/log"
79
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/thanos-io/objstore"
811

912
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
1013
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/bucketclient"
1114
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/configdb"
1215
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/local"
1316
"github.com/cortexproject/cortex/pkg/configs/client"
1417
"github.com/cortexproject/cortex/pkg/storage/bucket"
18+
"github.com/cortexproject/cortex/pkg/util/users"
19+
)
20+
21+
var (
22+
errAccessDenied = fmt.Errorf("access denied")
1523
)
1624

1725
// AlertStore stores and configures users rule configs
@@ -46,6 +54,9 @@ type AlertStore interface {
4654
// DeleteFullState deletes the alertmanager state for an user.
4755
// If state for the user doesn't exist, no error is reported.
4856
DeleteFullState(ctx context.Context, user string) error
57+
58+
// GetUserIndexUpdater is getter for UserIndexUpdater
59+
GetUserIndexUpdater() *users.UserIndexUpdater
4960
}
5061

5162
// NewAlertStore returns a alertmanager store backend client based on the provided cfg.
@@ -67,5 +78,29 @@ func NewAlertStore(ctx context.Context, cfg Config, cfgProvider bucket.TenantCon
6778
return nil, err
6879
}
6980

70-
return bucketclient.NewBucketAlertStore(bucketClient, cfgProvider, logger), nil
81+
return bucketclient.NewBucketAlertStore(bucketClient, cfg.UsersScanner, cfgProvider, logger, reg)
82+
}
83+
84+
type MockBucket struct {
85+
objstore.Bucket
86+
err error
87+
}
88+
89+
func (m *MockBucket) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket {
90+
return m
91+
}
92+
93+
func (m *MockBucket) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
94+
return m
95+
}
96+
97+
func (m *MockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
98+
if m.err != nil {
99+
return nil, m.err
100+
}
101+
return m.Bucket.Get(ctx, name)
102+
}
103+
104+
func (m *MockBucket) IsAccessDeniedErr(err error) bool {
105+
return err == errAccessDenied
71106
}

pkg/alertmanager/alertstore/store_test.go

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,22 @@ package alertstore
22

33
import (
44
"context"
5-
"fmt"
6-
"io"
75
"testing"
86

97
"github.com/go-kit/log"
108
"github.com/prometheus/alertmanager/cluster/clusterpb"
9+
"github.com/prometheus/client_golang/prometheus"
1110
"github.com/stretchr/testify/assert"
1211
"github.com/stretchr/testify/require"
1312
"github.com/thanos-io/objstore"
1413

1514
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
1615
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/bucketclient"
17-
)
18-
19-
var (
20-
errAccessDenied = fmt.Errorf("access denied")
16+
"github.com/cortexproject/cortex/pkg/util/users"
2117
)
2218

2319
func TestAlertStore_ListAllUsers(t *testing.T) {
24-
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *mockBucket, client any) {
20+
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *MockBucket, client any) {
2521
ctx := context.Background()
2622
user1Cfg := alertspb.AlertConfigDesc{User: "user-1", RawConfig: "content-1"}
2723
user2Cfg := alertspb.AlertConfigDesc{User: "user-2", RawConfig: "content-2"}
@@ -42,11 +38,20 @@ func TestAlertStore_ListAllUsers(t *testing.T) {
4238
require.NoError(t, err)
4339
assert.ElementsMatch(t, []string{"user-1", "user-2"}, users)
4440
}
41+
42+
{
43+
// delete user-1 alertmanager config
44+
require.NoError(t, store.DeleteAlertConfig(ctx, "user-1"))
45+
46+
users, err := store.ListAllUsers(ctx)
47+
require.NoError(t, err)
48+
assert.ElementsMatch(t, []string{"user-2"}, users)
49+
}
4550
})
4651
}
4752

4853
func TestAlertStore_SetAndGetAlertConfig(t *testing.T) {
49-
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *mockBucket, client any) {
54+
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *MockBucket, client any) {
5055
ctx := context.Background()
5156
user1Cfg := alertspb.AlertConfigDesc{User: "user-1", RawConfig: "content-1"}
5257
user2Cfg := alertspb.AlertConfigDesc{User: "user-2", RawConfig: "content-2"}
@@ -84,7 +89,7 @@ func TestAlertStore_SetAndGetAlertConfig(t *testing.T) {
8489
}
8590

8691
func TestStore_GetAlertConfigs(t *testing.T) {
87-
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *mockBucket, client any) {
92+
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *MockBucket, client any) {
8893
ctx := context.Background()
8994
user1Cfg := alertspb.AlertConfigDesc{User: "user-1", RawConfig: "content-1"}
9095
user2Cfg := alertspb.AlertConfigDesc{User: "user-2", RawConfig: "content-2"}
@@ -129,7 +134,7 @@ func TestStore_GetAlertConfigs(t *testing.T) {
129134
}
130135

131136
func TestAlertStore_DeleteAlertConfig(t *testing.T) {
132-
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *mockBucket, client any) {
137+
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *MockBucket, client any) {
133138
ctx := context.Background()
134139
user1Cfg := alertspb.AlertConfigDesc{User: "user-1", RawConfig: "content-1"}
135140
user2Cfg := alertspb.AlertConfigDesc{User: "user-2", RawConfig: "content-2"}
@@ -169,10 +174,13 @@ func TestAlertStore_DeleteAlertConfig(t *testing.T) {
169174
})
170175
}
171176

172-
func runForEachAlertStore(t *testing.T, testFn func(t *testing.T, store AlertStore, b *mockBucket, client any)) {
177+
func runForEachAlertStore(t *testing.T, testFn func(t *testing.T, store AlertStore, b *MockBucket, client any)) {
173178
bucketClient := objstore.NewInMemBucket()
174-
mBucketClient := &mockBucket{Bucket: bucketClient}
175-
bucketStore := bucketclient.NewBucketAlertStore(mBucketClient, nil, log.NewNopLogger())
179+
mBucketClient := &MockBucket{Bucket: bucketClient}
180+
usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList}
181+
reg := prometheus.NewPedanticRegistry()
182+
bucketStore, err := bucketclient.NewBucketAlertStore(mBucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg)
183+
assert.NoError(t, err)
176184

177185
stores := map[string]struct {
178186
store AlertStore
@@ -211,8 +219,11 @@ func makeTestFullState(content string) alertspb.FullStateDesc {
211219

212220
func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
213221
bucket := objstore.NewInMemBucket()
214-
mBucketClient := &mockBucket{Bucket: bucket}
215-
store := bucketclient.NewBucketAlertStore(mBucketClient, nil, log.NewNopLogger())
222+
mBucketClient := &MockBucket{Bucket: bucket}
223+
usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList}
224+
reg := prometheus.NewPedanticRegistry()
225+
store, err := bucketclient.NewBucketAlertStore(mBucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg)
226+
assert.NoError(t, err)
216227
ctx := context.Background()
217228

218229
state1 := makeTestFullState("one")
@@ -291,19 +302,3 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
291302
require.NoError(t, store.DeleteFullState(ctx, "user-1"))
292303
}
293304
}
294-
295-
type mockBucket struct {
296-
objstore.Bucket
297-
err error
298-
}
299-
300-
func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
301-
if m.err != nil {
302-
return nil, m.err
303-
}
304-
return m.Bucket.Get(ctx, name)
305-
}
306-
307-
func (m *mockBucket) IsAccessDeniedErr(err error) bool {
308-
return err == errAccessDenied
309-
}

pkg/alertmanager/api.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ import (
1818
"gopkg.in/yaml.v2"
1919

2020
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
21-
"github.com/cortexproject/cortex/pkg/tenant"
2221
"github.com/cortexproject/cortex/pkg/util"
2322
"github.com/cortexproject/cortex/pkg/util/concurrency"
2423
util_log "github.com/cortexproject/cortex/pkg/util/log"
24+
"github.com/cortexproject/cortex/pkg/util/users"
2525
)
2626

2727
const (
@@ -71,7 +71,7 @@ type UserConfig struct {
7171
func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http.Request) {
7272
logger := util_log.WithContext(r.Context(), am.logger)
7373

74-
userID, err := tenant.TenantID(r.Context())
74+
userID, err := users.TenantID(r.Context())
7575
if err != nil {
7676
level.Error(logger).Log("msg", errNoOrgID, "err", err.Error())
7777
http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized)
@@ -111,7 +111,7 @@ func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http.
111111

112112
func (am *MultitenantAlertmanager) SetUserConfig(w http.ResponseWriter, r *http.Request) {
113113
logger := util_log.WithContext(r.Context(), am.logger)
114-
userID, err := tenant.TenantID(r.Context())
114+
userID, err := users.TenantID(r.Context())
115115
if err != nil {
116116
level.Error(logger).Log("msg", errNoOrgID, "err", err.Error())
117117
http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized)
@@ -171,7 +171,7 @@ func (am *MultitenantAlertmanager) SetUserConfig(w http.ResponseWriter, r *http.
171171
// Note that if no config exists for a user, StatusOK is returned.
172172
func (am *MultitenantAlertmanager) DeleteUserConfig(w http.ResponseWriter, r *http.Request) {
173173
logger := util_log.WithContext(r.Context(), am.logger)
174-
userID, err := tenant.TenantID(r.Context())
174+
userID, err := users.TenantID(r.Context())
175175
if err != nil {
176176
level.Error(logger).Log("msg", errNoOrgID, "err", err.Error())
177177
http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized)

0 commit comments

Comments
 (0)