Skip to content

Commit 289f48c

Browse files
committed
design v2
1 parent 833c65b commit 289f48c

File tree

4 files changed

+47
-8
lines changed

4 files changed

+47
-8
lines changed

cmd/thanos/query.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,9 @@ func registerQuery(app *extkingpin.App) {
245245
enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool()
246246
tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String()
247247

248-
rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for __rollup__ label for aggregated metrics. If set to x, all queries on aggregated metrics will have a __rollup__=x matcher. Leave empty to disable this behavior. Default is empty.").Default("").String()
248+
rewriteAggregationLabelName := cmd.Flag("query.aggregation-label-name", "The name for aggregation label. See query.aggregation-label-value-override for details. Default is empty.").Default("").String()
249+
rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for aggregation label. If set to x, all queries on aggregated metrics will have a `rewriteAggregationLabelName=x` matcher. Leave either flag empty to disable this behavior. Default is empty.").Default("").String()
250+
rewriteAggregationInsertOnly := cmd.Flag("query.aggregation-label-insert-only", "If enabled, the above rewriter only insert missing rewriteAggregationLabelName=x and does not modify existing aggregation labels if any. Default is false.").Default("false").Bool()
249251

250252
var storeRateLimits store.SeriesSelectLimits
251253
storeRateLimits.RegisterFlags(cmd)
@@ -386,7 +388,9 @@ func registerQuery(app *extkingpin.App) {
386388
*enforceTenancy,
387389
*tenantLabel,
388390
*enableGroupReplicaPartialStrategy,
391+
*rewriteAggregationLabelName,
389392
*rewriteAggregationLabelTo,
393+
*rewriteAggregationInsertOnly,
390394
)
391395
})
392396
}
@@ -471,7 +475,9 @@ func runQuery(
471475
enforceTenancy bool,
472476
tenantLabel string,
473477
groupReplicaPartialResponseStrategy bool,
478+
rewriteAggregationLabelName string,
474479
rewriteAggregationLabelTo string,
480+
rewriteAggregationInsertOnly bool,
475481
) error {
476482
comp := component.Query
477483
if alertQueryURL == "" {
@@ -601,6 +607,8 @@ func runQuery(
601607
opts := query.Options{
602608
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
603609
DeduplicationFunc: queryDeduplicationFunc,
610+
RewriteAggregationInsertOnly: rewriteAggregationInsertOnly,
611+
RewriteAggregationLabelName: rewriteAggregationLabelName,
604612
RewriteAggregationLabelTo: rewriteAggregationLabelTo,
605613
}
606614
level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts))

pkg/query/aggregation_label_rewriter.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@ import (
1414
"github.com/prometheus/prometheus/model/labels"
1515
)
1616

17-
const (
18-
aggregationLabelName = "__agg_rule_type__"
19-
)
20-
2117
type AggregationLabelRewriter struct {
2218
logger log.Logger
2319
metrics *aggregationLabelRewriterMetrics
2420

2521
enabled bool
22+
insertOnly bool
23+
labelKey string
2624
desiredLabelValue string
2725
}
2826

@@ -74,14 +72,16 @@ func NewNopAggregationLabelRewriter() *AggregationLabelRewriter {
7472
}
7573
}
7674

77-
func NewAggregationLabelRewriter(logger log.Logger, reg prometheus.Registerer, desiredLabelValue string) *AggregationLabelRewriter {
75+
func NewAggregationLabelRewriter(logger log.Logger, reg prometheus.Registerer, labelKey string, desiredLabelValue string, insertOnly bool) *AggregationLabelRewriter {
7876
if logger == nil {
7977
logger = log.NewNopLogger()
8078
}
8179
return &AggregationLabelRewriter{
8280
enabled: desiredLabelValue != "",
8381
logger: logger,
8482
metrics: newAggregationLabelRewriterMetrics(reg, desiredLabelValue),
83+
insertOnly: insertOnly,
84+
labelKey: labelKey,
8585
desiredLabelValue: desiredLabelValue,
8686
}
8787
}
@@ -118,16 +118,22 @@ func (a *AggregationLabelRewriter) Rewrite(ms []*labels.Matcher) []*labels.Match
118118
break
119119
}
120120
// In any case, if we see an aggregation label, we store that for later use
121-
} else if m.Name == aggregationLabelName {
121+
} else if m.Name == a.labelKey {
122122
aggregationLabelMatcher = m
123123
aggregationLabelIndex = i
124124
}
125125
}
126+
127+
if aggregationLabelMatcher != nil && a.insertOnly {
128+
needsRewrite = false
129+
skipReason = "insert-only"
130+
}
131+
126132
// After the for loop, if needsRewrite is false, no need to do anything
127133
// but if it is true, we either append or modify an aggregation label
128134
if needsRewrite {
129135
newMatcher := &labels.Matcher{
130-
Name: aggregationLabelName,
136+
Name: a.labelKey,
131137
Type: labels.MatchRegexp,
132138
Value: a.desiredLabelValue,
133139
}

pkg/query/aggregation_label_rewriter_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,17 @@ import (
1111
"github.com/prometheus/prometheus/model/labels"
1212
)
1313

14+
const (
15+
aggregationLabelName = "__agg_rule_type__"
16+
)
17+
1418
func TestAggregationLabelRewriter_Rewrite(t *testing.T) {
1519
t.Parallel()
1620

1721
for _, tc := range []struct {
1822
name string
1923
desiredLabelValue string // Empty means disabled
24+
insertOnly bool
2025
inputMatchers []*labels.Matcher
2126
expectedMatchers []*labels.Matcher
2227
expectedSkipCount float64
@@ -104,13 +109,29 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) {
104109
},
105110
expectedSkipCount: 1,
106111
},
112+
{
113+
name: "if insert only, should NOT rewrite existing aggregation label for aggregated metric",
114+
desiredLabelValue: "5m",
115+
insertOnly: true,
116+
inputMatchers: []*labels.Matcher{
117+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
118+
labels.MustNewMatcher(labels.MatchEqual, aggregationLabelName, "1h"),
119+
},
120+
expectedMatchers: []*labels.Matcher{
121+
labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"),
122+
labels.MustNewMatcher(labels.MatchEqual, aggregationLabelName, "1h"),
123+
},
124+
expectedSkipCount: 1,
125+
},
107126
} {
108127
t.Run(tc.name, func(t *testing.T) {
109128
reg := prometheus.NewRegistry()
110129
rewriter := NewAggregationLabelRewriter(
111130
nil,
112131
reg,
132+
aggregationLabelName,
113133
tc.desiredLabelValue,
134+
tc.insertOnly,
114135
)
115136

116137
result := rewriter.Rewrite(tc.inputMatchers)

pkg/query/querier.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ type QueryableCreator func(
6161
type Options struct {
6262
GroupReplicaPartialResponseStrategy bool
6363
DeduplicationFunc string
64+
RewriteAggregationLabelName string
6465
RewriteAggregationLabelTo string
66+
RewriteAggregationInsertOnly bool
6567
}
6668

6769
// NewQueryableCreator creates QueryableCreator.
@@ -94,7 +96,9 @@ func NewQueryableCreatorWithOptions(
9496
aggregationLabelRewriter := NewAggregationLabelRewriter(
9597
logger,
9698
extprom.WrapRegistererWithPrefix("aggregation_label_rewriter_", reg),
99+
opts.RewriteAggregationLabelName,
97100
opts.RewriteAggregationLabelTo,
101+
opts.RewriteAggregationInsertOnly,
98102
)
99103
return func(
100104
deduplicate bool,

0 commit comments

Comments
 (0)