Skip to content

Commit e3153e5

Browse files
authored
Fix ruler query failure reporting (#4335)
* This patch tries to fix problem with user-errors reported as internal errors, and adds integration test for it. Signed-off-by: Peter Štibraný <[email protected]> * Allow passing custom error-wrapping function to ErrorTranslateQueryable. Signed-off-by: Peter Štibraný <[email protected]> * Wrap errors returned by Queryable to custom wrapper. This allows us to distinguish between those errors and errors returned by PromQL engine, and react appropriately. Signed-off-by: Peter Štibraný <[email protected]> * Improve ruler test to check for more scenarios. Signed-off-by: Peter Štibraný <[email protected]> * CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]>
1 parent 728cfbe commit e3153e5

File tree

9 files changed

+277
-36
lines changed

9 files changed

+277
-36
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345
2626
* [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346
2727
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
28+
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
2829

2930
## 1.10.0-rc.0 / 2021-06-28
3031

integration/e2e/db/db.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,22 @@ const (
1414
)
1515

1616
// NewMinio returns minio server, used as a local replacement for S3.
17-
func NewMinio(port int, bktName string) *e2e.HTTPService {
17+
func NewMinio(port int, bktNames ...string) *e2e.HTTPService {
1818
minioKESGithubContent := "https://raw.githubusercontent.com/minio/kes/master"
1919
commands := []string{
20-
"curl -sSL --tlsv1.2 -O '%s/root.key' -O '%s/root.cert'",
21-
"mkdir -p /data/%s && minio server --address :%v --quiet /data",
20+
fmt.Sprintf("curl -sSL --tlsv1.2 -O '%s/root.key' -O '%s/root.cert'", minioKESGithubContent, minioKESGithubContent),
2221
}
2322

23+
for _, bkt := range bktNames {
24+
commands = append(commands, fmt.Sprintf("mkdir -p /data/%s", bkt))
25+
}
26+
commands = append(commands, fmt.Sprintf("minio server --address :%v --quiet /data", port))
27+
2428
m := e2e.NewHTTPService(
2529
fmt.Sprintf("minio-%v", port),
2630
images.Minio,
2731
// Create the "cortex" bucket before starting minio
28-
e2e.NewCommandWithoutEntrypoint("sh", "-c", fmt.Sprintf(strings.Join(commands, " && "), minioKESGithubContent, minioKESGithubContent, bktName, port)),
32+
e2e.NewCommandWithoutEntrypoint("sh", "-c", strings.Join(commands, " && ")),
2933
e2e.NewHTTPReadinessProbe(port, "/minio/health/ready", 200, 200),
3034
port,
3135
)

integration/e2ecortex/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,11 @@ func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) err
318318
}
319319

320320
defer res.Body.Close()
321+
322+
if res.StatusCode != 202 {
323+
return fmt.Errorf("unexpected status code: %d", res.StatusCode)
324+
}
325+
321326
return nil
322327
}
323328

integration/ruler_test.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,166 @@ func TestRulerAlertmanagerTLS(t *testing.T) {
523523
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_notifications_alertmanagers_discovered"}, e2e.WaitMissingMetrics))
524524
}
525525

526+
func TestRulerMetricsForInvalidQueries(t *testing.T) {
527+
s, err := e2e.NewScenario(networkName)
528+
require.NoError(t, err)
529+
defer s.Close()
530+
531+
// Start dependencies.
532+
consul := e2edb.NewConsul()
533+
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
534+
require.NoError(t, s.StartAndWaitReady(consul, minio))
535+
536+
// Configure the ruler.
537+
flags := mergeFlags(
538+
BlocksStorageFlags(),
539+
RulerFlags(false),
540+
map[string]string{
541+
// Since we're not going to run any rule (our only rule is invalid), we don't need the
542+
// store-gateway to be configured to a valid address.
543+
"-querier.store-gateway-addresses": "localhost:12345",
544+
// Enable the bucket index so we can skip the initial bucket scan.
545+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
546+
// Evaluate rules often, so that we don't need to wait for metrics to show up.
547+
"-ruler.evaluation-interval": "2s",
548+
"-ruler.poll-interval": "2s",
549+
// No delay
550+
"-ruler.evaluation-delay-duration": "0",
551+
552+
"-blocks-storage.tsdb.block-ranges-period": "1h",
553+
"-blocks-storage.bucket-store.sync-interval": "1s",
554+
"-blocks-storage.tsdb.retention-period": "2h",
555+
556+
// We run single ingester only, no replication.
557+
"-distributor.replication-factor": "1",
558+
559+
// Very low limit so that ruler hits it.
560+
"-querier.max-fetched-chunks-per-query": "5",
561+
// We need this to make limit work.
562+
"-ingester.stream-chunks-when-using-blocks": "true",
563+
},
564+
)
565+
566+
const namespace = "test"
567+
const user = "user"
568+
569+
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
570+
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")
571+
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
572+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler))
573+
574+
// Wait until both the distributor and ruler have updated the ring. The querier will also watch
575+
// the store-gateway ring if blocks sharding is enabled.
576+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
577+
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
578+
579+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
580+
require.NoError(t, err)
581+
582+
// Push some series to Cortex -- enough so that we can hit some limits.
583+
for i := 0; i < 10; i++ {
584+
series, _ := generateSeries("metric", time.Now(), prompb.Label{Name: "foo", Value: fmt.Sprintf("%d", i)})
585+
586+
res, err := c.Push(series)
587+
require.NoError(t, err)
588+
require.Equal(t, 200, res.StatusCode)
589+
}
590+
591+
totalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"})
592+
require.NoError(t, err)
593+
594+
// Verify that user-failures don't increase cortex_ruler_queries_failed_total
595+
for groupName, expression := range map[string]string{
596+
// Syntactically correct expression (passes check in ruler), but failing because of invalid regex. This fails in PromQL engine.
597+
"invalid_group": `label_replace(metric, "foo", "$1", "service", "[")`,
598+
599+
// This one fails in querier code, because of limits.
600+
"too_many_chunks_group": `sum(metric)`,
601+
} {
602+
t.Run(groupName, func(t *testing.T) {
603+
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, "rule", expression), namespace))
604+
m := ruleGroupMatcher(user, namespace, groupName)
605+
606+
// Wait until ruler has loaded the group.
607+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
608+
609+
// Wait until rule group has tried to evaluate the rule.
610+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
611+
612+
// Verify that evaluation of the rule failed.
613+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
614+
615+
// But these failures were not reported as "failed queries"
616+
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"})
617+
require.NoError(t, err)
618+
require.Equal(t, float64(0), sum[0])
619+
620+
// Delete rule before checkin "cortex_ruler_queries_total", as we want to reuse value for next test.
621+
require.NoError(t, c.DeleteRuleGroup(namespace, groupName))
622+
623+
// Wait until ruler has unloaded the group. We don't use any matcher, so there should be no groups (in fact, metric disappears).
624+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_group_rules"}, e2e.SkipMissingMetrics))
625+
626+
// Check that cortex_ruler_queries_total went up since last test.
627+
newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"})
628+
require.NoError(t, err)
629+
require.Greater(t, newTotalQueries[0], totalQueries[0])
630+
631+
// Remember totalQueries for next test.
632+
totalQueries = newTotalQueries
633+
})
634+
}
635+
636+
// Now let's upload a non-failing rule, and make sure that it works.
637+
t.Run("real_error", func(t *testing.T) {
638+
const groupName = "good_rule"
639+
const expression = `sum(metric{foo=~"1|2"})`
640+
641+
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, "rule", expression), namespace))
642+
m := ruleGroupMatcher(user, namespace, groupName)
643+
644+
// Wait until ruler has loaded the group.
645+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
646+
647+
// Wait until rule group has tried to evaluate the rule, and succeeded.
648+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
649+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
650+
651+
// Still no failures.
652+
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"})
653+
require.NoError(t, err)
654+
require.Equal(t, float64(0), sum[0])
655+
656+
// Now let's stop ingester, and recheck metrics. This should increase cortex_ruler_queries_failed_total failures.
657+
require.NoError(t, s.Stop(ingester))
658+
659+
// We should start getting "real" failures now.
660+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_failed_total"}))
661+
})
662+
}
663+
664+
func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
665+
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
666+
}
667+
668+
func ruleGroupWithRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
669+
// Prepare rule group with invalid rule.
670+
var recordNode = yaml.Node{}
671+
var exprNode = yaml.Node{}
672+
673+
recordNode.SetString(ruleName)
674+
exprNode.SetString(expression)
675+
676+
return rulefmt.RuleGroup{
677+
Name: groupName,
678+
Interval: 10,
679+
Rules: []rulefmt.RuleNode{{
680+
Record: recordNode,
681+
Expr: exprNode,
682+
}},
683+
}
684+
}
685+
526686
func createTestRuleGroup(t *testing.T) rulefmt.RuleGroup {
527687
t.Helper()
528688

pkg/api/handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func NewQuerierHandler(
194194

195195
api := v1.NewAPI(
196196
engine,
197-
querier.NewErrorTranslateQueryable(queryable), // Translate errors to errors expected by API.
197+
querier.NewErrorTranslateSampleAndChunkQueryable(queryable), // Translate errors to errors expected by API.
198198
nil, // No remote write support.
199199
exemplarQueryable,
200200
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },

pkg/querier/error_translate_queryable.go

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -69,72 +69,103 @@ func TranslateToPromqlAPIError(err error) error {
6969
}
7070
}
7171

72-
func NewErrorTranslateQueryable(q storage.SampleAndChunkQueryable) storage.SampleAndChunkQueryable {
73-
return errorTranslateQueryable{q}
72+
// ErrTranslateFn is used to translate or wrap error before returning it by functions in
73+
// storage.SampleAndChunkQueryable interface.
74+
// Input error may be nil.
75+
type ErrTranslateFn func(err error) error
76+
77+
func NewErrorTranslateQueryable(q storage.Queryable) storage.Queryable {
78+
return NewErrorTranslateQueryableWithFn(q, TranslateToPromqlAPIError)
79+
}
80+
81+
func NewErrorTranslateQueryableWithFn(q storage.Queryable, fn ErrTranslateFn) storage.Queryable {
82+
return errorTranslateQueryable{q: q, fn: fn}
83+
}
84+
85+
func NewErrorTranslateSampleAndChunkQueryable(q storage.SampleAndChunkQueryable) storage.SampleAndChunkQueryable {
86+
return NewErrorTranslateSampleAndChunkQueryableWithFn(q, TranslateToPromqlAPIError)
87+
}
88+
89+
func NewErrorTranslateSampleAndChunkQueryableWithFn(q storage.SampleAndChunkQueryable, fn ErrTranslateFn) storage.SampleAndChunkQueryable {
90+
return errorTranslateSampleAndChunkQueryable{q: q, fn: fn}
7491
}
7592

7693
type errorTranslateQueryable struct {
77-
q storage.SampleAndChunkQueryable
94+
q storage.Queryable
95+
fn ErrTranslateFn
7896
}
7997

8098
func (e errorTranslateQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
8199
q, err := e.q.Querier(ctx, mint, maxt)
82-
return errorTranslateQuerier{q: q}, TranslateToPromqlAPIError(err)
100+
return errorTranslateQuerier{q: q, fn: e.fn}, e.fn(err)
101+
}
102+
103+
type errorTranslateSampleAndChunkQueryable struct {
104+
q storage.SampleAndChunkQueryable
105+
fn ErrTranslateFn
106+
}
107+
108+
func (e errorTranslateSampleAndChunkQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
109+
q, err := e.q.Querier(ctx, mint, maxt)
110+
return errorTranslateQuerier{q: q, fn: e.fn}, e.fn(err)
83111
}
84112

85-
func (e errorTranslateQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
113+
func (e errorTranslateSampleAndChunkQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
86114
q, err := e.q.ChunkQuerier(ctx, mint, maxt)
87-
return errorTranslateChunkQuerier{q: q}, TranslateToPromqlAPIError(err)
115+
return errorTranslateChunkQuerier{q: q, fn: e.fn}, e.fn(err)
88116
}
89117

90118
type errorTranslateQuerier struct {
91-
q storage.Querier
119+
q storage.Querier
120+
fn ErrTranslateFn
92121
}
93122

94123
func (e errorTranslateQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
95124
values, warnings, err := e.q.LabelValues(name, matchers...)
96-
return values, warnings, TranslateToPromqlAPIError(err)
125+
return values, warnings, e.fn(err)
97126
}
98127

99128
func (e errorTranslateQuerier) LabelNames() ([]string, storage.Warnings, error) {
100129
values, warnings, err := e.q.LabelNames()
101-
return values, warnings, TranslateToPromqlAPIError(err)
130+
return values, warnings, e.fn(err)
102131
}
103132

104133
func (e errorTranslateQuerier) Close() error {
105-
return TranslateToPromqlAPIError(e.q.Close())
134+
return e.fn(e.q.Close())
106135
}
107136

108137
func (e errorTranslateQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
109138
s := e.q.Select(sortSeries, hints, matchers...)
110-
return errorTranslateSeriesSet{s}
139+
return errorTranslateSeriesSet{s: s, fn: e.fn}
111140
}
112141

113142
type errorTranslateChunkQuerier struct {
114-
q storage.ChunkQuerier
143+
q storage.ChunkQuerier
144+
fn ErrTranslateFn
115145
}
116146

117147
func (e errorTranslateChunkQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
118148
values, warnings, err := e.q.LabelValues(name, matchers...)
119-
return values, warnings, TranslateToPromqlAPIError(err)
149+
return values, warnings, e.fn(err)
120150
}
121151

122152
func (e errorTranslateChunkQuerier) LabelNames() ([]string, storage.Warnings, error) {
123153
values, warnings, err := e.q.LabelNames()
124-
return values, warnings, TranslateToPromqlAPIError(err)
154+
return values, warnings, e.fn(err)
125155
}
126156

127157
func (e errorTranslateChunkQuerier) Close() error {
128-
return TranslateToPromqlAPIError(e.q.Close())
158+
return e.fn(e.q.Close())
129159
}
130160

131161
func (e errorTranslateChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
132162
s := e.q.Select(sortSeries, hints, matchers...)
133-
return errorTranslateChunkSeriesSet{s}
163+
return errorTranslateChunkSeriesSet{s: s, fn: e.fn}
134164
}
135165

136166
type errorTranslateSeriesSet struct {
137-
s storage.SeriesSet
167+
s storage.SeriesSet
168+
fn ErrTranslateFn
138169
}
139170

140171
func (e errorTranslateSeriesSet) Next() bool {
@@ -146,15 +177,16 @@ func (e errorTranslateSeriesSet) At() storage.Series {
146177
}
147178

148179
func (e errorTranslateSeriesSet) Err() error {
149-
return TranslateToPromqlAPIError(e.s.Err())
180+
return e.fn(e.s.Err())
150181
}
151182

152183
func (e errorTranslateSeriesSet) Warnings() storage.Warnings {
153184
return e.s.Warnings()
154185
}
155186

156187
type errorTranslateChunkSeriesSet struct {
157-
s storage.ChunkSeriesSet
188+
s storage.ChunkSeriesSet
189+
fn ErrTranslateFn
158190
}
159191

160192
func (e errorTranslateChunkSeriesSet) Next() bool {
@@ -166,7 +198,7 @@ func (e errorTranslateChunkSeriesSet) At() storage.ChunkSeries {
166198
}
167199

168200
func (e errorTranslateChunkSeriesSet) Err() error {
169-
return TranslateToPromqlAPIError(e.s.Err())
201+
return e.fn(e.s.Err())
170202
}
171203

172204
func (e errorTranslateChunkSeriesSet) Warnings() storage.Warnings {

pkg/querier/error_translate_queryable_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func TestApiStatusCodes(t *testing.T) {
113113
"error from seriesset": errorTestQueryable{q: errorTestQuerier{s: errorTestSeriesSet{err: tc.err}}},
114114
} {
115115
t.Run(fmt.Sprintf("%s/%d", k, ix), func(t *testing.T) {
116-
r := createPrometheusAPI(errorTranslateQueryable{q: q})
116+
r := createPrometheusAPI(NewErrorTranslateSampleAndChunkQueryable(q))
117117
rec := httptest.NewRecorder()
118118

119119
req := httptest.NewRequest("GET", "/api/v1/query?query=up", nil)

0 commit comments

Comments
 (0)