Skip to content

Commit d10c948

Browse files
committed
impl aggr label rewrite capability
1 parent cdd22dc commit d10c948

File tree

2 files changed

+57
-0
lines changed

2 files changed

+57
-0
lines changed

cmd/thanos/query.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ func registerQuery(app *extkingpin.App) {
245245
enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool()
246246
tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String()
247247

248+
rewriteAggregationLabelTo := cmd.Flag("query.rewrite-aggregation-label-to", "Rewrite the aggregation label to the provided value for metric with name ending in standard aggregation suffixes.").Default("").String()
249+
248250
var storeRateLimits store.SeriesSelectLimits
249251
storeRateLimits.RegisterFlags(cmd)
250252

@@ -384,6 +386,7 @@ func registerQuery(app *extkingpin.App) {
384386
*enforceTenancy,
385387
*tenantLabel,
386388
*enableGroupReplicaPartialStrategy,
389+
*rewriteAggregationLabelTo,
387390
)
388391
})
389392
}
@@ -468,6 +471,7 @@ func runQuery(
468471
enforceTenancy bool,
469472
tenantLabel string,
470473
groupReplicaPartialResponseStrategy bool,
474+
rewriteAggregationLabelTo string,
471475
) error {
472476
comp := component.Query
473477
if alertQueryURL == "" {
@@ -597,6 +601,7 @@ func runQuery(
597601
opts := query.Options{
598602
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
599603
DeduplicationFunc: queryDeduplicationFunc,
604+
RewriteAggregationLabelTo: rewriteAggregationLabelTo,
600605
}
601606
level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts))
602607
queryableCreator = query.NewQueryableCreatorWithOptions(

pkg/query/querier.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type QueryableCreator func(
6161
type Options struct {
6262
GroupReplicaPartialResponseStrategy bool
6363
DeduplicationFunc string
64+
RewriteAggregationLabelTo string
6465
}
6566

6667
// NewQueryableCreator creates QueryableCreator.
@@ -159,6 +160,11 @@ type querier struct {
159160
selectTimeout time.Duration
160161
shardInfo *storepb.ShardInfo
161162
seriesStatsReporter seriesStatsReporter
163+
164+
enablePreAggregationLabelRewrite bool
165+
aggregationStandardSuffixes []string
166+
aggregationLabelName string
167+
rewriteAggregationLabelTo string
162168
}
163169

164170
// newQuerier creates implementation of storage.Querier that fetches data from the proxy
@@ -232,6 +238,11 @@ func newQuerierWithOpts(
232238
skipChunks: skipChunks,
233239
shardInfo: shardInfo,
234240
seriesStatsReporter: seriesStatsReporter,
241+
242+
enablePreAggregationLabelRewrite: opts.RewriteAggregationLabelTo != "",
243+
aggregationStandardSuffixes: []string{":aggr", ":count", ":sum", ":min", ":max", ":avg"},
244+
aggregationLabelName: "__rollup__",
245+
rewriteAggregationLabelTo: opts.RewriteAggregationLabelTo,
235246
}
236247
}
237248

@@ -365,6 +376,47 @@ func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints
365376
}
366377

367378
func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storepb.SeriesStatsCounter, error) {
379+
if q.enablePreAggregationLabelRewrite {
380+
hasNameLabel, needLabelRewrite := false, false
381+
for _, m := range ms {
382+
if m.Name == labels.MetricName {
383+
hasNameLabel = true
384+
if m.Type == labels.MatchEqual {
385+
for _, suffix := range q.aggregationStandardSuffixes {
386+
if strings.HasSuffix(m.Value, suffix) {
387+
needLabelRewrite = true
388+
break
389+
}
390+
}
391+
break
392+
} else {
393+
level.Info(q.logger).Log("msg", "Skipping aggregation label rewrite attempt", "reason", "non-equal name matcher", "name_matcher", m, "labels", ms)
394+
}
395+
}
396+
}
397+
if !hasNameLabel {
398+
level.Info(q.logger).Log("msg", "Skipping aggregation label rewrite", "reason", "no metric name matcher found", "labels", ms)
399+
}
400+
if needLabelRewrite {
401+
hasAggregationLabel := false
402+
for i, m := range ms {
403+
if m.Name == q.aggregationLabelName {
404+
hasAggregationLabel = true
405+
ms[i].Type = labels.MatchEqual
406+
ms[i].Value = q.rewriteAggregationLabelTo
407+
break
408+
}
409+
}
410+
if !hasAggregationLabel {
411+
ms = append(ms, &labels.Matcher{
412+
Name: q.aggregationLabelName,
413+
Type: labels.MatchEqual,
414+
Value: q.rewriteAggregationLabelTo,
415+
})
416+
}
417+
}
418+
}
419+
368420
sms, err := storepb.PromMatchersToMatchers(ms...)
369421
if err != nil {
370422
return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "convert matchers")

0 commit comments

Comments
 (0)