Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* [BUGFIX] Ruler: Prevent counting 2xx and 4XX responses as failed writes. #6785
* [BUGFIX] Ingester: Allow shipper to skip corrupted blocks. #6786
* [BUGFIX] Compactor: Delete the prefix `blocks_meta` from the metadata fetcher metrics. #6832
* [BUGFIX] Store Gateway: Avoid race condition by checking for duplicate entries in bucket stores user scan. #6863

## 1.19.0 2025-02-27

Expand Down
18 changes: 18 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,27 @@ func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) {
users = append(users, activeUsers...)
users = append(users, deletingUsers...)

if err = checkDuplicateUsers(users); err != nil {
return nil, err
}

return users, err
}

func checkDuplicateUsers(users []string) error {
seen := make(map[string]struct{}, len(users))

for _, user := range users {
if _, ok := seen[user]; ok {
return fmt.Errorf("duplicate user scanned: %s", user)
} else {
seen[user] = struct{}{}
}
}

return nil
}

func (u *BucketStores) getStore(userID string) *store.BucketStore {
u.storesMu.RLock()
defer u.storesMu.RUnlock()
Expand Down
52 changes: 52 additions & 0 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,50 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {
}
}

func TestBucketStores_scanUsers(t *testing.T) {
t.Parallel()

tests := map[string]struct {
scanner *mockScanner
expectedRes []string
expectedErr error
}{
"should succeed": {
scanner: &mockScanner{
res: []string{"user-1", "user-2", "user-3"},
},
expectedRes: []string{"user-1", "user-2", "user-3"},
},
"should return error if duplicate users are returned": {
scanner: &mockScanner{
res: []string{"user-1", "user-2", "user-1"},
},
expectedErr: fmt.Errorf("duplicate user scanned: user-1"),
},
}

for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
t.Parallel()

stores := &BucketStores{
userScanner: testData.scanner,
}

users, err := stores.scanUsers(context.Background())

if testData.expectedErr != nil {
assert.ErrorContains(t, err, testData.expectedErr.Error())
assert.Empty(t, users)
} else {
assert.NoError(t, err)
assert.ElementsMatch(t, testData.expectedRes, users)
}
})
}
}

func TestBucketStores_Series_ShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *testing.T) {
for _, lazyLoadingEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("lazy loading enabled = %v", lazyLoadingEnabled), func(t *testing.T) {
Expand Down Expand Up @@ -996,3 +1040,11 @@ func (f *failFirstGetBucket) Get(ctx context.Context, name string) (io.ReadClose

return f.Bucket.Get(ctx, name)
}

type mockScanner struct {
res []string
}

func (m *mockScanner) ScanUsers(_ context.Context) (active, deleting, deleted []string, err error) {
return m.res, nil, nil, nil
}
Loading