Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [FEATURE] Querier: Allow choosing PromQL engine via header. #6777
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
* [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893
* [ENHANCEMENT] Ruler: Emit an error message when the rule synchronization fails. #6902
* [ENHANCEMENT] Querier: Support snappy and zstd response compression for `-querier.response-compression` flag. #6848
* [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845
* [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859
Expand Down
25 changes: 19 additions & 6 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,29 +693,40 @@ func (r *Ruler) run(ctx context.Context) error {
ringTickerChan = ringTicker.C
}

r.syncRules(ctx, rulerSyncReasonInitial)
syncRuleErrMsg := func(syncRulesErr error) {
level.Error(r.logger).Log("msg", "failed to sync rules", "err", syncRulesErr)
}

initialSyncErr := r.syncRules(ctx, rulerSyncReasonInitial)
if initialSyncErr != nil {
syncRuleErrMsg(initialSyncErr)
}
for {
var syncRulesErr error
select {
case <-ctx.Done():
return nil
case <-tick.C:
r.syncRules(ctx, rulerSyncReasonPeriodic)
syncRulesErr = r.syncRules(ctx, rulerSyncReasonPeriodic)
case <-ringTickerChan:
// We ignore the error because in case of error it will return an empty
// replication set which we use to compare with the previous state.
currRingState, _ := r.ring.GetAllHealthy(RingOp)

if ring.HasReplicationSetChanged(ringLastState, currRingState) {
ringLastState = currRingState
r.syncRules(ctx, rulerSyncReasonRingChange)
syncRulesErr = r.syncRules(ctx, rulerSyncReasonRingChange)
}
case err := <-r.subservicesWatcher.Chan():
return errors.Wrap(err, "ruler subservice failed")
}
if syncRulesErr != nil {
syncRuleErrMsg(syncRulesErr)
}
}
}

func (r *Ruler) syncRules(ctx context.Context, reason string) {
func (r *Ruler) syncRules(ctx context.Context, reason string) error {
level.Info(r.logger).Log("msg", "syncing rules", "reason", reason)
r.rulerSync.WithLabelValues(reason).Inc()
timer := prometheus.NewTimer(nil)
Expand All @@ -727,19 +738,21 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) {

loadedConfigs, backupConfigs, err := r.loadRuleGroups(ctx)
if err != nil {
return
return err
}

if ctx.Err() != nil {
level.Info(r.logger).Log("msg", "context is canceled. not syncing rules")
return
return err
}
// This will also delete local group files for users that are no longer in 'configs' map.
r.manager.SyncRuleGroups(ctx, loadedConfigs)

if r.cfg.RulesBackupEnabled() {
r.manager.BackUpRuleGroups(ctx, backupConfigs)
}

return nil
}

func (r *Ruler) loadRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) {
Expand Down
21 changes: 14 additions & 7 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,8 @@ func TestGetRules(t *testing.T) {

// Sync Rules
forEachRuler(func(_ string, r *Ruler) {
r.syncRules(context.Background(), rulerSyncReasonInitial)
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
require.NoError(t, err)
})

if tc.sharding {
Expand Down Expand Up @@ -1572,7 +1573,8 @@ func TestGetRulesFromBackup(t *testing.T) {

// Sync Rules
forEachRuler(func(_ string, r *Ruler) {
r.syncRules(context.Background(), rulerSyncReasonInitial)
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
require.NoError(t, err)
})

// update the State of the rulers in the ring based on tc.rulerStateMap
Expand Down Expand Up @@ -1788,7 +1790,8 @@ func getRulesHATest(replicationFactor int) func(t *testing.T) {

// Sync Rules
forEachRuler(func(_ string, r *Ruler) {
r.syncRules(context.Background(), rulerSyncReasonInitial)
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
require.NoError(t, err)
})

// update the State of the rulers in the ring based on tc.rulerStateMap
Expand All @@ -1811,8 +1814,10 @@ func getRulesHATest(replicationFactor int) func(t *testing.T) {
t.Errorf("ruler %s was not terminated with error %s", "ruler1", err.Error())
}

rulerAddrMap["ruler2"].syncRules(context.Background(), rulerSyncReasonPeriodic)
rulerAddrMap["ruler3"].syncRules(context.Background(), rulerSyncReasonPeriodic)
err = rulerAddrMap["ruler2"].syncRules(context.Background(), rulerSyncReasonPeriodic)
require.NoError(t, err)
err = rulerAddrMap["ruler3"].syncRules(context.Background(), rulerSyncReasonPeriodic)
require.NoError(t, err)

requireGroupStateEqual := func(a *GroupStateDesc, b *GroupStateDesc) {
require.Equal(t, a.Group.Interval, b.Group.Interval)
Expand Down Expand Up @@ -2800,7 +2805,8 @@ func TestRecoverAlertsPostOutage(t *testing.T) {
evalFunc := func(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {}

r, _ := buildRulerWithIterFunc(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, store, nil, evalFunc)
r.syncRules(context.Background(), rulerSyncReasonInitial)
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
require.NoError(t, err)

// assert initial state of rule group
ruleGroup := r.manager.GetRules("user1")[0]
Expand Down Expand Up @@ -3265,7 +3271,8 @@ func TestGetShardSizeForUser(t *testing.T) {

// Sync Rules
forEachRuler(func(_ string, r *Ruler) {
r.syncRules(context.Background(), rulerSyncReasonInitial)
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
require.NoError(t, err)
})

result := testRuler.getShardSizeForUser(tc.userID)
Expand Down
Loading