Skip to content

Commit 6e67e34

Browse files
committed
redo
1 parent cb92a78 commit 6e67e34

File tree

4 files changed

+143
-55
lines changed

4 files changed

+143
-55
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ require (
140140
github.com/elastic/go-licenser v0.3.1 // indirect
141141
github.com/go-ini/ini v1.67.0 // indirect
142142
github.com/go-openapi/runtime v0.27.1 // indirect
143+
github.com/gobwas/glob v0.2.3 // indirect
143144
github.com/goccy/go-json v0.10.3 // indirect
144145
github.com/godbus/dbus/v5 v5.0.4 // indirect
145146
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,6 +1670,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
16701670
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
16711671
github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
16721672
github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
1673+
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
1674+
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
16731675
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
16741676
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
16751677
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package query
2+
3+
import (
4+
"time"
5+
6+
"github.com/go-kit/log"
7+
"github.com/go-kit/log/level"
8+
"github.com/gobwas/glob"
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/promauto"
11+
"github.com/prometheus/prometheus/model/labels"
12+
)
13+
14+
const (
15+
aggregationLabelName = "__rollup__"
16+
aggregatedMetricNameGlobPattern = "*:(aggr|sum|count|avg|min|max)"
17+
)
18+
19+
type AggregationLabelRewriter struct {
20+
logger log.Logger
21+
metrics *aggregationLabelRewriterMetrics
22+
23+
enabled bool
24+
desiredLabelValue string
25+
26+
aggregatedMetricNameGlob glob.Glob
27+
}
28+
29+
type aggregationLabelRewriterMetrics struct {
30+
aggregationLabelRewriteSkippedCount *prometheus.CounterVec
31+
aggregationLabelRewrittenCount *prometheus.CounterVec
32+
aggregationLabelAddedCount prometheus.Counter
33+
aggregationLabelRewriterRuntimeSeconds *prometheus.HistogramVec
34+
}
35+
36+
func newAggregationLabelRewriterMetrics(reg prometheus.Registerer, desiredLabelValue string) *aggregationLabelRewriterMetrics {
37+
m := &aggregationLabelRewriterMetrics{}
38+
39+
m.aggregationLabelRewriteSkippedCount = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
40+
Name: "skipped_total",
41+
Help: "Total number of times aggregation-label-rewriter was enabled but skipped",
42+
}, []string{"reason"})
43+
m.aggregationLabelRewrittenCount = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
44+
Name: "label_rewritten_total",
45+
Help: "Total number of times aggregation-label-rewriter rewrote the aggregation label",
46+
ConstLabels: prometheus.Labels{"new_value": desiredLabelValue},
47+
}, []string{"old_value"})
48+
m.aggregationLabelAddedCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{
49+
Name: "label_added_total",
50+
Help: "Total number of times aggregation-label-rewriter added the aggregation label",
51+
ConstLabels: prometheus.Labels{"new_value": desiredLabelValue},
52+
})
53+
m.aggregationLabelRewriterRuntimeSeconds = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
54+
Name: "total_runtime_seconds",
55+
Help: "Runtime of aggregation-label-rewriter in seconds",
56+
Buckets: prometheus.DefBuckets,
57+
}, []string{"is_rewritten"})
58+
59+
return m
60+
}
61+
62+
func NewAggregationLabelRewriter(logger log.Logger, reg prometheus.Registerer, desiredLabelValue string) *AggregationLabelRewriter {
63+
g := glob.MustCompile(aggregatedMetricNameGlobPattern)
64+
return &AggregationLabelRewriter{
65+
enabled: desiredLabelValue != "",
66+
logger: logger,
67+
metrics: newAggregationLabelRewriterMetrics(reg, desiredLabelValue),
68+
desiredLabelValue: desiredLabelValue,
69+
aggregatedMetricNameGlob: g,
70+
}
71+
}
72+
73+
func (a *AggregationLabelRewriter) Rewrite(ms []*labels.Matcher) []*labels.Matcher {
74+
if a.enabled {
75+
startOfRun := time.Now()
76+
needsRewrite := false
77+
skipReason := "no-name-matcher"
78+
var aggregationLabelMatcher *labels.Matcher
79+
for i := 0; i < len(ms); i++ {
80+
m := ms[i]
81+
if !needsRewrite && m.Name == labels.MetricName {
82+
if m.Type == labels.MatchEqual {
83+
if a.aggregatedMetricNameGlob.Match(m.Value) {
84+
needsRewrite = true
85+
} else {
86+
skipReason = "not-aggregated-metric"
87+
}
88+
} else {
89+
skipReason = "not-using-equal-name-match"
90+
level.Debug(a.logger).Log("msg", "skipped due to not using equal in name matcher", "matcher", m)
91+
break
92+
}
93+
} else if needsRewrite && aggregationLabelMatcher != nil {
94+
break
95+
} else if m.Name == aggregationLabelName {
96+
aggregationLabelMatcher = m
97+
}
98+
}
99+
if needsRewrite {
100+
if aggregationLabelMatcher != nil {
101+
a.metrics.aggregationLabelRewrittenCount.WithLabelValues(aggregationLabelMatcher.Value).Inc()
102+
aggregationLabelMatcher.Type = labels.MatchEqual
103+
aggregationLabelMatcher.Value = a.desiredLabelValue
104+
} else {
105+
a.metrics.aggregationLabelAddedCount.Inc()
106+
newMatcher := &labels.Matcher{
107+
Name: aggregationLabelName,
108+
Type: labels.MatchEqual,
109+
Value: a.desiredLabelValue,
110+
}
111+
ms = append(ms, newMatcher)
112+
}
113+
a.metrics.aggregationLabelRewriterRuntimeSeconds.WithLabelValues("true").Observe(
114+
time.Since(startOfRun).Seconds(),
115+
)
116+
} else {
117+
a.metrics.aggregationLabelRewriteSkippedCount.WithLabelValues(skipReason).Inc()
118+
a.metrics.aggregationLabelRewriterRuntimeSeconds.WithLabelValues("false").Observe(
119+
time.Since(startOfRun).Seconds(),
120+
)
121+
}
122+
123+
}
124+
return ms
125+
}

pkg/query/querier.go

Lines changed: 15 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func NewQueryableCreatorWithOptions(
104104
) storage.Queryable {
105105
return &queryable{
106106
logger: logger,
107+
reg: reg,
107108
replicaLabels: replicaLabels,
108109
storeDebugMatchers: storeDebugMatchers,
109110
proxy: proxy,
@@ -125,6 +126,7 @@ func NewQueryableCreatorWithOptions(
125126

126127
type queryable struct {
127128
logger log.Logger
129+
reg prometheus.Registerer
128130
replicaLabels []string
129131
storeDebugMatchers [][]*labels.Matcher
130132
proxy storepb.StoreServer
@@ -142,11 +144,12 @@ type queryable struct {
142144

143145
// Querier returns a new storage querier against the underlying proxy store API.
144146
func (q *queryable) Querier(mint, maxt int64) (storage.Querier, error) {
145-
return newQuerierWithOpts(q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter, q.opts), nil
147+
return newQuerierWithOpts(q.logger, q.reg, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter, q.opts), nil
146148
}
147149

148150
type querier struct {
149151
logger log.Logger
152+
reg prometheus.Registerer
150153
mint, maxt int64
151154
deduplicationFunc string
152155
replicaLabels []string
@@ -161,17 +164,15 @@ type querier struct {
161164
shardInfo *storepb.ShardInfo
162165
seriesStatsReporter seriesStatsReporter
163166

164-
enablePreAggregationLabelRewrite bool
165-
aggregationStandardSuffixes []string
166-
aggregationLabelName string
167-
rewriteAggregationLabelTo string
167+
aggregationLabelRewriter *AggregationLabelRewriter
168168
}
169169

170170
// newQuerier creates implementation of storage.Querier that fetches data from the proxy
171171
// store API endpoints.
172172
// nolint:unparam
173173
func newQuerier(
174174
logger log.Logger,
175+
reg prometheus.Registerer,
175176
mint,
176177
maxt int64,
177178
replicaLabels []string,
@@ -186,11 +187,12 @@ func newQuerier(
186187
shardInfo *storepb.ShardInfo,
187188
seriesStatsReporter seriesStatsReporter,
188189
) *querier {
189-
return newQuerierWithOpts(logger, mint, maxt, replicaLabels, storeDebugMatchers, proxy, deduplicate, maxResolutionMillis, partialResponse, skipChunks, selectGate, selectTimeout, shardInfo, seriesStatsReporter, Options{})
190+
return newQuerierWithOpts(logger, reg, mint, maxt, replicaLabels, storeDebugMatchers, proxy, deduplicate, maxResolutionMillis, partialResponse, skipChunks, selectGate, selectTimeout, shardInfo, seriesStatsReporter, Options{})
190191
}
191192

192193
func newQuerierWithOpts(
193194
logger log.Logger,
195+
reg prometheus.Registerer,
194196
mint,
195197
maxt int64,
196198
replicaLabels []string,
@@ -223,6 +225,7 @@ func newQuerierWithOpts(
223225
}
224226
return &querier{
225227
logger: logger,
228+
reg: reg,
226229
selectGate: selectGate,
227230
selectTimeout: selectTimeout,
228231

@@ -239,10 +242,11 @@ func newQuerierWithOpts(
239242
shardInfo: shardInfo,
240243
seriesStatsReporter: seriesStatsReporter,
241244

242-
enablePreAggregationLabelRewrite: opts.RewriteAggregationLabelTo != "",
243-
aggregationStandardSuffixes: []string{":aggr", ":count", ":sum", ":min", ":max", ":avg"},
244-
aggregationLabelName: "__rollup__",
245-
rewriteAggregationLabelTo: opts.RewriteAggregationLabelTo,
245+
aggregationLabelRewriter: NewAggregationLabelRewriter(
246+
logger,
247+
extprom.WrapRegistererWithPrefix("aggregation_label_rewriter_", reg),
248+
opts.RewriteAggregationLabelTo,
249+
),
246250
}
247251
}
248252

@@ -376,51 +380,7 @@ func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints
376380
}
377381

378382
func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storepb.SeriesStatsCounter, error) {
379-
if q.enablePreAggregationLabelRewrite {
380-
hasNameLabel, needLabelRewrite := false, false
381-
var nameLabelMatcher *labels.Matcher
382-
for _, m := range ms {
383-
if m.Name == labels.MetricName {
384-
hasNameLabel = true
385-
if m.Type == labels.MatchEqual {
386-
for _, suffix := range q.aggregationStandardSuffixes {
387-
if strings.HasSuffix(m.Value, suffix) {
388-
needLabelRewrite = true
389-
nameLabelMatcher = m
390-
break
391-
}
392-
}
393-
break
394-
} else {
395-
level.Info(q.logger).Log("msg", "Skipping aggregation label rewrite attempt", "reason", "non-equal name matcher", "name_matcher", m)
396-
}
397-
}
398-
}
399-
if !hasNameLabel {
400-
level.Info(q.logger).Log("msg", "Skipping aggregation label rewrite", "reason", "no metric name matcher found")
401-
}
402-
if needLabelRewrite {
403-
hasAggregationLabel := false
404-
for i, m := range ms {
405-
if m.Name == q.aggregationLabelName {
406-
level.Info(q.logger).Log("msg", "Rewriting aggregation label", "original_matcher", m, "name_matcher", nameLabelMatcher, "new_value", q.rewriteAggregationLabelTo)
407-
hasAggregationLabel = true
408-
ms[i].Type = labels.MatchEqual
409-
ms[i].Value = q.rewriteAggregationLabelTo
410-
break
411-
}
412-
}
413-
if !hasAggregationLabel {
414-
newMatcher := &labels.Matcher{
415-
Name: q.aggregationLabelName,
416-
Type: labels.MatchEqual,
417-
Value: q.rewriteAggregationLabelTo,
418-
}
419-
level.Info(q.logger).Log("msg", "Adding aggregation label", "new_matcher", newMatcher, "name_matcher", nameLabelMatcher)
420-
ms = append(ms, newMatcher)
421-
}
422-
}
423-
}
383+
ms = q.aggregationLabelRewriter.Rewrite(ms)
424384

425385
sms, err := storepb.PromMatchersToMatchers(ms...)
426386
if err != nil {

0 commit comments

Comments
 (0)