Skip to content

Commit ed15b62

Browse files
authored
thanos querier aggregation label rewrite design v2 (#154)
2 parents 7a35f32 + 08b1526 commit ed15b62

File tree

4 files changed

+88
-15
lines changed

4 files changed

+88
-15
lines changed

cmd/thanos/query.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,8 @@ func registerQuery(app *extkingpin.App) {
245245
enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool()
246246
tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String()
247247

248-
rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for __rollup__ label for aggregated metrics. If set to x, all queries on aggregated metrics will have a __rollup__=x matcher. Leave empty to disable this behavior. Default is empty.").Default("").String()
248+
rewriteAggregationLabelStrategy := cmd.Flag("query.aggregation-label-strategy", "The strategy to use when rewriting aggregation labels. Used during aggregator migration only.").Default(string(query.NoopLabelRewriter)).Hidden().Enum(string(query.NoopLabelRewriter), string(query.UpsertLabelRewriter), string(query.InsertOnlyLabelRewriter))
249+
rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for aggregation label. If set to x, all queries on aggregated metrics will have a `__agg_rule_type__=x` matcher. If empty, this behavior is disabled. Default is empty.").Hidden().Default("").String()
249250

250251
var storeRateLimits store.SeriesSelectLimits
251252
storeRateLimits.RegisterFlags(cmd)
@@ -386,6 +387,7 @@ func registerQuery(app *extkingpin.App) {
386387
*enforceTenancy,
387388
*tenantLabel,
388389
*enableGroupReplicaPartialStrategy,
390+
*rewriteAggregationLabelStrategy,
389391
*rewriteAggregationLabelTo,
390392
)
391393
})
@@ -471,6 +473,7 @@ func runQuery(
471473
enforceTenancy bool,
472474
tenantLabel string,
473475
groupReplicaPartialResponseStrategy bool,
476+
rewriteAggregationLabelStrategy string,
474477
rewriteAggregationLabelTo string,
475478
) error {
476479
comp := component.Query
@@ -601,6 +604,7 @@ func runQuery(
601604
opts := query.Options{
602605
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
603606
DeduplicationFunc: queryDeduplicationFunc,
607+
RewriteAggregationLabelStrategy: rewriteAggregationLabelStrategy,
604608
RewriteAggregationLabelTo: rewriteAggregationLabelTo,
605609
}
606610
level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts))

pkg/query/aggregation_label_rewriter.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,26 @@ import (
1414
"github.com/prometheus/prometheus/model/labels"
1515
)
1616

17+
// RewriterStrategy defines the strategy used by the AggregationLabelRewriter.
18+
type RewriterStrategy string
19+
1720
const (
18-
aggregationLabelName = "__rollup__"
21+
// NoopLabelRewriter is a no-op strategy that basically disables the rewriter.
22+
NoopLabelRewriter RewriterStrategy = "noop"
23+
// UpsertLabelRewriter is a strategy that upserts the aggregation label.
24+
UpsertLabelRewriter RewriterStrategy = "upsert"
25+
// InsertOnlyLabelRewriter is a strategy that only inserts the aggregation label if it does not exist.
26+
InsertOnlyLabelRewriter RewriterStrategy = "insert-only"
1927
)
2028

21-
type AggregationLabelRewriter struct {
22-
logger log.Logger
23-
metrics *aggregationLabelRewriterMetrics
29+
const (
30+
aggregationLabelName = "__agg_rule_type__"
31+
)
2432

25-
enabled bool
33+
type AggregationLabelRewriter struct {
34+
logger log.Logger
35+
metrics *aggregationLabelRewriterMetrics
36+
strategy RewriterStrategy
2637
desiredLabelValue string
2738
}
2839

@@ -70,24 +81,27 @@ func newAggregationLabelRewriterMetrics(reg prometheus.Registerer, desiredLabelV
7081

7182
func NewNopAggregationLabelRewriter() *AggregationLabelRewriter {
7283
return &AggregationLabelRewriter{
73-
enabled: false,
84+
strategy: NoopLabelRewriter,
7485
}
7586
}
7687

77-
func NewAggregationLabelRewriter(logger log.Logger, reg prometheus.Registerer, desiredLabelValue string) *AggregationLabelRewriter {
88+
func NewAggregationLabelRewriter(logger log.Logger, reg prometheus.Registerer, strategy RewriterStrategy, desiredLabelValue string) *AggregationLabelRewriter {
7889
if logger == nil {
7990
logger = log.NewNopLogger()
8091
}
92+
if desiredLabelValue == "" {
93+
strategy = NoopLabelRewriter
94+
}
8195
return &AggregationLabelRewriter{
82-
enabled: desiredLabelValue != "",
96+
strategy: strategy,
8397
logger: logger,
8498
metrics: newAggregationLabelRewriterMetrics(reg, desiredLabelValue),
8599
desiredLabelValue: desiredLabelValue,
86100
}
87101
}
88102

89103
func (a *AggregationLabelRewriter) Rewrite(ms []*labels.Matcher) []*labels.Matcher {
90-
if !a.enabled {
104+
if a.strategy == NoopLabelRewriter {
91105
return ms
92106
}
93107

@@ -123,12 +137,18 @@ func (a *AggregationLabelRewriter) Rewrite(ms []*labels.Matcher) []*labels.Match
123137
aggregationLabelIndex = i
124138
}
125139
}
140+
141+
if aggregationLabelMatcher != nil && a.strategy == InsertOnlyLabelRewriter {
142+
needsRewrite = false
143+
skipReason = "insert-only"
144+
}
145+
126146
// After the for loop, if needsRewrite is false, no need to do anything
127147
// but if it is true, we either append or modify an aggregation label
128148
if needsRewrite {
129149
newMatcher := &labels.Matcher{
130150
Name: aggregationLabelName,
131-
Type: labels.MatchEqual,
151+
Type: labels.MatchRegexp,
132152
Value: a.desiredLabelValue,
133153
}
134154
if aggregationLabelMatcher != nil {

pkg/query/aggregation_label_rewriter_test.go

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) {
1717
for _, tc := range []struct {
1818
name string
1919
desiredLabelValue string // Empty means disabled
20+
strategy RewriterStrategy
2021
inputMatchers []*labels.Matcher
2122
expectedMatchers []*labels.Matcher
2223
expectedSkipCount float64
@@ -25,7 +26,19 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) {
2526
}{
2627
{
2728
name: "disabled rewriter should not modify label matchers",
29+
desiredLabelValue: "v1",
30+
strategy: NoopLabelRewriter,
31+
inputMatchers: []*labels.Matcher{
32+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
33+
},
34+
expectedMatchers: []*labels.Matcher{
35+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
36+
},
37+
},
38+
{
39+
name: "no desired label value makes a disabled rewriter and should not modify label matchers",
2840
desiredLabelValue: "",
41+
strategy: UpsertLabelRewriter,
2942
inputMatchers: []*labels.Matcher{
3043
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
3144
},
@@ -36,31 +49,48 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) {
3649
{
3750
name: "should add label for aggregated metric if no existing aggregation label",
3851
desiredLabelValue: "5m",
52+
strategy: UpsertLabelRewriter,
3953
inputMatchers: []*labels.Matcher{
4054
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
4155
},
4256
expectedMatchers: []*labels.Matcher{
4357
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
44-
labels.MustNewMatcher(labels.MatchEqual, "__rollup__", "5m"),
58+
labels.MustNewMatcher(labels.MatchRegexp, aggregationLabelName, "5m"),
4559
},
4660
expectedAddCount: 1,
4761
},
4862
{
49-
name: "should rewrite existing aggregation label for aggregated metric",
63+
name: "should rewrite existing equal aggregation label for aggregated metric",
64+
desiredLabelValue: "5m",
65+
strategy: UpsertLabelRewriter,
66+
inputMatchers: []*labels.Matcher{
67+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
68+
labels.MustNewMatcher(labels.MatchEqual, aggregationLabelName, "1h"),
69+
},
70+
expectedMatchers: []*labels.Matcher{
71+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
72+
labels.MustNewMatcher(labels.MatchRegexp, aggregationLabelName, "5m"),
73+
},
74+
expectedRewriteMap: map[string]float64{"1h": 1},
75+
},
76+
{
77+
name: "should rewrite existing regex aggregation label for aggregated metric",
5078
desiredLabelValue: "5m",
79+
strategy: UpsertLabelRewriter,
5180
inputMatchers: []*labels.Matcher{
5281
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
53-
labels.MustNewMatcher(labels.MatchEqual, "__rollup__", "1h"),
82+
labels.MustNewMatcher(labels.MatchRegexp, aggregationLabelName, "1h"),
5483
},
5584
expectedMatchers: []*labels.Matcher{
5685
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
57-
labels.MustNewMatcher(labels.MatchEqual, "__rollup__", "5m"),
86+
labels.MustNewMatcher(labels.MatchRegexp, aggregationLabelName, "5m"),
5887
},
5988
expectedRewriteMap: map[string]float64{"1h": 1},
6089
},
6190
{
6291
name: "should skip non-aggregated metric",
6392
desiredLabelValue: "5m",
93+
strategy: UpsertLabelRewriter,
6494
inputMatchers: []*labels.Matcher{
6595
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test_metric"),
6696
},
@@ -72,6 +102,7 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) {
72102
{
73103
name: "should skip non-equal name matcher",
74104
desiredLabelValue: "5m",
105+
strategy: UpsertLabelRewriter,
75106
inputMatchers: []*labels.Matcher{
76107
labels.MustNewMatcher(labels.MatchRegexp, "__name__", "test:sum"),
77108
},
@@ -83,6 +114,7 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) {
83114
{
84115
name: "should skip when no name matcher",
85116
desiredLabelValue: "5m",
117+
strategy: UpsertLabelRewriter,
86118
inputMatchers: []*labels.Matcher{
87119
labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus"),
88120
},
@@ -91,12 +123,27 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) {
91123
},
92124
expectedSkipCount: 1,
93125
},
126+
{
127+
name: "if insert only, should NOT rewrite existing aggregation label for aggregated metric",
128+
desiredLabelValue: "5m",
129+
strategy: InsertOnlyLabelRewriter,
130+
inputMatchers: []*labels.Matcher{
131+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
132+
labels.MustNewMatcher(labels.MatchEqual, aggregationLabelName, "1h"),
133+
},
134+
expectedMatchers: []*labels.Matcher{
135+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
136+
labels.MustNewMatcher(labels.MatchEqual, aggregationLabelName, "1h"),
137+
},
138+
expectedSkipCount: 1,
139+
},
94140
} {
95141
t.Run(tc.name, func(t *testing.T) {
96142
reg := prometheus.NewRegistry()
97143
rewriter := NewAggregationLabelRewriter(
98144
nil,
99145
reg,
146+
tc.strategy,
100147
tc.desiredLabelValue,
101148
)
102149

pkg/query/querier.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type QueryableCreator func(
6161
type Options struct {
6262
GroupReplicaPartialResponseStrategy bool
6363
DeduplicationFunc string
64+
RewriteAggregationLabelStrategy string
6465
RewriteAggregationLabelTo string
6566
}
6667

@@ -94,6 +95,7 @@ func NewQueryableCreatorWithOptions(
9495
aggregationLabelRewriter := NewAggregationLabelRewriter(
9596
logger,
9697
extprom.WrapRegistererWithPrefix("aggregation_label_rewriter_", reg),
98+
RewriterStrategy(opts.RewriteAggregationLabelStrategy),
9799
opts.RewriteAggregationLabelTo,
98100
)
99101
return func(

0 commit comments

Comments
 (0)