Skip to content

Commit d83dae6

Browse files
authored
[querier] Impl aggr label rewrite capability (#127)
2 parents cdd22dc + 261d290 commit d83dae6

File tree

6 files changed

+339
-6
lines changed

6 files changed

+339
-6
lines changed

cmd/thanos/query.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ func registerQuery(app *extkingpin.App) {
245245
enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool()
246246
tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String()
247247

248+
rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for __rollup__ label for aggregated metrics. If set to x, all queries on aggregated metrics will have a __rollup__=x matcher. Leave empty to disable this behavior. Default is empty.").Default("").String()
249+
248250
var storeRateLimits store.SeriesSelectLimits
249251
storeRateLimits.RegisterFlags(cmd)
250252

@@ -384,6 +386,7 @@ func registerQuery(app *extkingpin.App) {
384386
*enforceTenancy,
385387
*tenantLabel,
386388
*enableGroupReplicaPartialStrategy,
389+
*rewriteAggregationLabelTo,
387390
)
388391
})
389392
}
@@ -468,6 +471,7 @@ func runQuery(
468471
enforceTenancy bool,
469472
tenantLabel string,
470473
groupReplicaPartialResponseStrategy bool,
474+
rewriteAggregationLabelTo string,
471475
) error {
472476
comp := component.Query
473477
if alertQueryURL == "" {
@@ -597,6 +601,7 @@ func runQuery(
597601
opts := query.Options{
598602
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
599603
DeduplicationFunc: queryDeduplicationFunc,
604+
RewriteAggregationLabelTo: rewriteAggregationLabelTo,
600605
}
601606
level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts))
602607
queryableCreator = query.NewQueryableCreatorWithOptions(
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package query
5+
6+
import (
7+
"strings"
8+
"time"
9+
10+
"github.com/go-kit/log"
11+
"github.com/go-kit/log/level"
12+
"github.com/prometheus/client_golang/prometheus"
13+
"github.com/prometheus/client_golang/prometheus/promauto"
14+
"github.com/prometheus/prometheus/model/labels"
15+
)
16+
17+
const (
18+
aggregationLabelName = "__rollup__"
19+
)
20+
21+
type AggregationLabelRewriter struct {
22+
logger log.Logger
23+
metrics *aggregationLabelRewriterMetrics
24+
25+
enabled bool
26+
desiredLabelValue string
27+
}
28+
29+
type aggregationLabelRewriterMetrics struct {
30+
aggregationLabelRewriteSkippedCount *prometheus.CounterVec
31+
aggregationLabelRewrittenCount *prometheus.CounterVec
32+
aggregationLabelAddedCount prometheus.Counter
33+
aggregationLabelRewriterRuntimeSeconds *prometheus.HistogramVec
34+
}
35+
36+
func isAggregatedMetric(metricName string) bool {
37+
return strings.HasSuffix(metricName, ":aggr") ||
38+
strings.HasSuffix(metricName, ":avg") ||
39+
strings.HasSuffix(metricName, ":count") ||
40+
strings.HasSuffix(metricName, ":sum") ||
41+
strings.HasSuffix(metricName, ":min") ||
42+
strings.HasSuffix(metricName, ":max")
43+
}
44+
45+
func newAggregationLabelRewriterMetrics(reg prometheus.Registerer, desiredLabelValue string) *aggregationLabelRewriterMetrics {
46+
m := &aggregationLabelRewriterMetrics{}
47+
48+
m.aggregationLabelRewriteSkippedCount = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
49+
Name: "skipped_total",
50+
Help: "Total number of times aggregation-label-rewriter was enabled but skipped",
51+
}, []string{"reason"})
52+
m.aggregationLabelRewrittenCount = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
53+
Name: "label_rewritten_total",
54+
Help: "Total number of times aggregation-label-rewriter rewrote the aggregation label",
55+
ConstLabels: prometheus.Labels{"new_value": desiredLabelValue},
56+
}, []string{"old_value"})
57+
m.aggregationLabelAddedCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{
58+
Name: "label_added_total",
59+
Help: "Total number of times aggregation-label-rewriter added the aggregation label",
60+
ConstLabels: prometheus.Labels{"new_value": desiredLabelValue},
61+
})
62+
m.aggregationLabelRewriterRuntimeSeconds = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
63+
Name: "total_runtime_seconds",
64+
Help: "Runtime of aggregation-label-rewriter in seconds",
65+
Buckets: []float64{0.0001, 0.00025, 0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 10.0},
66+
}, []string{"is_rewritten"})
67+
68+
return m
69+
}
70+
71+
func NewNopAggregationLabelRewriter() *AggregationLabelRewriter {
72+
return &AggregationLabelRewriter{
73+
enabled: false,
74+
}
75+
}
76+
77+
func NewAggregationLabelRewriter(logger log.Logger, reg prometheus.Registerer, desiredLabelValue string) *AggregationLabelRewriter {
78+
if logger == nil {
79+
logger = log.NewNopLogger()
80+
}
81+
return &AggregationLabelRewriter{
82+
enabled: desiredLabelValue != "",
83+
logger: logger,
84+
metrics: newAggregationLabelRewriterMetrics(reg, desiredLabelValue),
85+
desiredLabelValue: desiredLabelValue,
86+
}
87+
}
88+
89+
func (a *AggregationLabelRewriter) Rewrite(ms []*labels.Matcher) []*labels.Matcher {
90+
if !a.enabled {
91+
return ms
92+
}
93+
94+
startOfRun := time.Now()
95+
needsRewrite := false
96+
skipReason := "no-name-matcher"
97+
var aggregationLabelMatcher *labels.Matcher
98+
aggregationLabelIndex := -1
99+
for i := 0; i < len(ms); i++ {
100+
// If we already know we need to rewrite, and we found the aggregation label
101+
// we don't need to loop further, break
102+
if needsRewrite && aggregationLabelMatcher != nil {
103+
break
104+
}
105+
m := ms[i]
106+
// If we are not sure if we need rewrite, and see a name matcher
107+
// We check if this is an aggregated metric and decide needsRewrite
108+
if !needsRewrite && m.Name == labels.MetricName {
109+
if m.Type == labels.MatchEqual {
110+
if isAggregatedMetric(m.Value) {
111+
needsRewrite = true
112+
} else {
113+
skipReason = "not-aggregated-metric"
114+
}
115+
} else {
116+
skipReason = "not-using-equal-name-match"
117+
level.Debug(a.logger).Log("msg", "skipped due to not using equal in name matcher", "matcher", m)
118+
break
119+
}
120+
// In any case, if we see an aggregation label, we store that for later use
121+
} else if m.Name == aggregationLabelName {
122+
aggregationLabelMatcher = m
123+
aggregationLabelIndex = i
124+
}
125+
}
126+
// After the for loop, if needsRewrite is false, no need to do anything
127+
// but if it is true, we either append or modify an aggregation label
128+
if needsRewrite {
129+
newMatcher := &labels.Matcher{
130+
Name: aggregationLabelName,
131+
Type: labels.MatchEqual,
132+
Value: a.desiredLabelValue,
133+
}
134+
if aggregationLabelMatcher != nil {
135+
a.metrics.aggregationLabelRewrittenCount.WithLabelValues(aggregationLabelMatcher.Value).Inc()
136+
ms[aggregationLabelIndex] = newMatcher
137+
} else {
138+
a.metrics.aggregationLabelAddedCount.Inc()
139+
ms = append(ms, newMatcher)
140+
}
141+
a.metrics.aggregationLabelRewriterRuntimeSeconds.WithLabelValues("true").Observe(
142+
time.Since(startOfRun).Seconds(),
143+
)
144+
} else {
145+
a.metrics.aggregationLabelRewriteSkippedCount.WithLabelValues(skipReason).Inc()
146+
a.metrics.aggregationLabelRewriterRuntimeSeconds.WithLabelValues("false").Observe(
147+
time.Since(startOfRun).Seconds(),
148+
)
149+
}
150+
151+
return ms
152+
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package query
5+
6+
import (
7+
"testing"
8+
9+
"github.com/efficientgo/core/testutil"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/prometheus/model/labels"
12+
)
13+
14+
func TestAggregationLabelRewriter_Rewrite(t *testing.T) {
15+
t.Parallel()
16+
17+
for _, tc := range []struct {
18+
name string
19+
desiredLabelValue string // Empty means disabled
20+
inputMatchers []*labels.Matcher
21+
expectedMatchers []*labels.Matcher
22+
expectedSkipCount float64
23+
expectedAddCount float64
24+
expectedRewriteMap map[string]float64
25+
}{
26+
{
27+
name: "disabled rewriter should not modify label matchers",
28+
desiredLabelValue: "",
29+
inputMatchers: []*labels.Matcher{
30+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
31+
},
32+
expectedMatchers: []*labels.Matcher{
33+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
34+
},
35+
},
36+
{
37+
name: "should add label for aggregated metric if no existing aggregation label",
38+
desiredLabelValue: "5m",
39+
inputMatchers: []*labels.Matcher{
40+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
41+
},
42+
expectedMatchers: []*labels.Matcher{
43+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
44+
labels.MustNewMatcher(labels.MatchEqual, "__rollup__", "5m"),
45+
},
46+
expectedAddCount: 1,
47+
},
48+
{
49+
name: "should rewrite existing aggregation label for aggregated metric",
50+
desiredLabelValue: "5m",
51+
inputMatchers: []*labels.Matcher{
52+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
53+
labels.MustNewMatcher(labels.MatchEqual, "__rollup__", "1h"),
54+
},
55+
expectedMatchers: []*labels.Matcher{
56+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
57+
labels.MustNewMatcher(labels.MatchEqual, "__rollup__", "5m"),
58+
},
59+
expectedRewriteMap: map[string]float64{"1h": 1},
60+
},
61+
{
62+
name: "should skip non-aggregated metric",
63+
desiredLabelValue: "5m",
64+
inputMatchers: []*labels.Matcher{
65+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test_metric"),
66+
},
67+
expectedMatchers: []*labels.Matcher{
68+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test_metric"),
69+
},
70+
expectedSkipCount: 1,
71+
},
72+
{
73+
name: "should skip non-equal name matcher",
74+
desiredLabelValue: "5m",
75+
inputMatchers: []*labels.Matcher{
76+
labels.MustNewMatcher(labels.MatchRegexp, "__name__", "test:sum"),
77+
},
78+
expectedMatchers: []*labels.Matcher{
79+
labels.MustNewMatcher(labels.MatchRegexp, "__name__", "test:sum"),
80+
},
81+
expectedSkipCount: 1,
82+
},
83+
{
84+
name: "should skip when no name matcher",
85+
desiredLabelValue: "5m",
86+
inputMatchers: []*labels.Matcher{
87+
labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus"),
88+
},
89+
expectedMatchers: []*labels.Matcher{
90+
labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus"),
91+
},
92+
expectedSkipCount: 1,
93+
},
94+
} {
95+
t.Run(tc.name, func(t *testing.T) {
96+
reg := prometheus.NewRegistry()
97+
rewriter := NewAggregationLabelRewriter(
98+
nil,
99+
reg,
100+
tc.desiredLabelValue,
101+
)
102+
103+
result := rewriter.Rewrite(tc.inputMatchers)
104+
testutil.Equals(t, len(tc.expectedMatchers), len(result))
105+
for i := range tc.inputMatchers {
106+
testutil.Equals(t, tc.expectedMatchers[i].Name, result[i].Name)
107+
testutil.Equals(t, tc.expectedMatchers[i].Type, result[i].Type)
108+
testutil.Equals(t, tc.expectedMatchers[i].Value, result[i].Value)
109+
}
110+
111+
metrics, err := reg.Gather()
112+
testutil.Ok(t, err)
113+
114+
if tc.expectedSkipCount > 0 {
115+
var skipCount float64
116+
for _, m := range metrics {
117+
if m.GetName() == "skipped_total" {
118+
skipCount += *m.Metric[0].Counter.Value
119+
}
120+
}
121+
testutil.Equals(t, tc.expectedSkipCount, skipCount)
122+
}
123+
124+
if tc.expectedAddCount > 0 {
125+
var addCount float64
126+
for _, m := range metrics {
127+
if m.GetName() == "label_added_total" {
128+
addCount += *m.Metric[0].Counter.Value
129+
}
130+
}
131+
testutil.Equals(t, tc.expectedAddCount, addCount)
132+
}
133+
134+
if len(tc.expectedRewriteMap) > 0 {
135+
for _, m := range metrics {
136+
if m.GetName() == "label_rewritten_total" {
137+
for _, metric := range m.Metric {
138+
oldValue := ""
139+
for _, label := range metric.Label {
140+
if *label.Name == "old_value" {
141+
oldValue = *label.Value
142+
break
143+
}
144+
}
145+
if count, ok := tc.expectedRewriteMap[oldValue]; ok {
146+
testutil.Equals(t, count, *metric.Counter.Value)
147+
}
148+
}
149+
}
150+
}
151+
}
152+
})
153+
}
154+
}

0 commit comments

Comments
 (0)