Skip to content

Commit 8ba4041

Browse files
committed
complete isUserOwned function in am/ruler
Signed-off-by: SungJin1212 <[email protected]>
1 parent 20db2ea commit 8ba4041

File tree

2 files changed

+31
-9
lines changed

2 files changed

+31
-9
lines changed

pkg/alertmanager/multitenant.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -801,10 +801,6 @@ func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context)
801801

802802
// Filter out users not owned by this shard.
803803
for _, userID := range allUserIDs {
804-
if !am.allowedTenants.IsAllowed(userID) {
805-
level.Debug(am.logger).Log("msg", "ignoring alertmanager for user, not allowed", "user", userID)
806-
continue
807-
}
808804
if am.isUserOwned(userID) {
809805
ownedUserIDs = append(ownedUserIDs, userID)
810806
}
@@ -823,6 +819,10 @@ func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context)
823819
}
824820

825821
func (am *MultitenantAlertmanager) isUserOwned(userID string) bool {
822+
if !am.allowedTenants.IsAllowed(userID) {
823+
return false
824+
}
825+
826826
// If sharding is disabled, any alertmanager instance owns all users.
827827
if !am.cfg.ShardingEnabled {
828828
return true

pkg/ruler/ruler.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,11 @@ func (r *Ruler) userIndexUpdateLoop(ctx context.Context) {
747747
level.Error(r.logger).Log("msg", "context timeout, exit user index update loop", "err", ctx.Err())
748748
return
749749
case <-ticker.C:
750-
owned := r.isUserOwned(userID)
750+
owned, err := r.isUserOwned(userID)
751+
if err != nil {
752+
level.Error(r.logger).Log("msg", "failed to check if compactor owns updating user index", "err", err)
753+
continue
754+
}
751755
if !owned {
752756
continue
753757
}
@@ -760,20 +764,38 @@ func (r *Ruler) userIndexUpdateLoop(ctx context.Context) {
760764
}
761765
}
762766

763-
func (r *Ruler) isUserOwned(userID string) bool {
767+
func (r *Ruler) isUserOwned(userID string) (bool, error) {
768+
if !r.allowedTenants.IsAllowed(userID) {
769+
return false, nil
770+
}
771+
764772
// If sharding is disabled, any ruler instance owns all users.
765773
if !r.cfg.EnableSharding {
766-
return true
774+
return true, nil
775+
}
776+
777+
if r.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
778+
shardSize := r.getShardSizeForUser(userID)
779+
subRing := r.ring.ShuffleShard(userID, shardSize)
780+
781+
rs, err := subRing.GetAllHealthy(RingOp)
782+
if err != nil {
783+
r.ringCheckErrors.Inc()
784+
level.Error(r.logger).Log("msg", "failed to get rulers from ring", "user", userID, "err", err)
785+
return false, err
786+
}
787+
788+
return rs.Includes(r.lifecycler.GetInstanceAddr()), nil
767789
}
768790

769791
rulers, err := r.ring.Get(users.ShardByUser(userID), RingOp, nil, nil, nil)
770792
if err != nil {
771793
r.ringCheckErrors.Inc()
772794
level.Error(r.logger).Log("msg", "failed to get rulers from ring", "user", userID, "err", err)
773-
return false
795+
return false, err
774796
}
775797

776-
return rulers.Includes(r.lifecycler.GetInstanceAddr())
798+
return rulers.Includes(r.lifecycler.GetInstanceAddr()), nil
777799
}
778800

779801
func (r *Ruler) syncRules(ctx context.Context, reason string) error {

0 commit comments

Comments
 (0)