Skip to content

Commit 67f5336

Browse files
authored
[ES-1292925] Fix metrics with reusable counter resets (#107)
2 parents b0aff72 + ab31d53 commit 67f5336

File tree

6 files changed

+224
-55
lines changed

6 files changed

+224
-55
lines changed

cmd/thanos/query.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@ import (
2828
"github.com/prometheus/prometheus/discovery/targetgroup"
2929
"github.com/prometheus/prometheus/model/labels"
3030
"github.com/prometheus/prometheus/promql"
31-
"github.com/thanos-io/promql-engine/api"
3231

32+
"github.com/thanos-io/promql-engine/api"
3333
apiv1 "github.com/thanos-io/thanos/pkg/api/query"
3434
"github.com/thanos-io/thanos/pkg/api/query/querypb"
3535
"github.com/thanos-io/thanos/pkg/block"
3636
"github.com/thanos-io/thanos/pkg/compact/downsample"
3737
"github.com/thanos-io/thanos/pkg/component"
38+
"github.com/thanos-io/thanos/pkg/dedup"
3839
"github.com/thanos-io/thanos/pkg/discovery/cache"
3940
"github.com/thanos-io/thanos/pkg/discovery/dns"
4041
"github.com/thanos-io/thanos/pkg/exemplars"
@@ -128,10 +129,11 @@ func registerQuery(app *extkingpin.App) {
128129
Strings()
129130
queryPartitionLabels := cmd.Flag("query.partition-label", "Labels that partition the leaf queriers. This is used to scope down the labelsets of leaf queriers when using the distributed query mode. If set, these labels must form a partition of the leaf queriers. Partition labels must not intersect with replica labels. Every TSDB of a leaf querier must have these labels. This is useful when there are multiple external labels that are irrelevant for the partition as it allows the distributed engine to ignore them for some optimizations. If this is empty then all labels are used as partition labels.").Strings()
130131

131-
enableDedupMerge := cmd.Flag("query.dedup-merge", "Enable deduplication merge of multiple time series with the same labels.").
132-
Default("false").Bool()
133-
enableQuorumChunkDedup := cmd.Flag("query.quorum-chunk-dedup", "Enable quorum-based deduplication for chunks from replicas.").
134-
Default("false").Bool()
132+
queryDeduplicationFunc := cmd.Flag("query.deduplication.func", "Experimental. Deduplication algorithm for merging overlapping series. "+
133+
"Possible values are: \"penalty\", \"chain\", \"quorum\". If no value is specified, penalty based deduplication algorithm will be used. "+
134+
"When set to chain, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. At least one replica label has to be set via --query.replica-label flag."+
135+
"When set to quorum, the databricks deduplication algorithm is used, it is suitable for metrics ingested via receivers.").
136+
Default(dedup.AlgorithmPenalty).Enum(dedup.AlgorithmPenalty, dedup.AlgorithmChain, dedup.AlgorithmQuorum)
135137

136138
instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())
137139

@@ -338,6 +340,7 @@ func registerQuery(app *extkingpin.App) {
338340
*queryConnMetricLabels,
339341
*queryReplicaLabels,
340342
*queryPartitionLabels,
343+
*queryDeduplicationFunc,
341344
selectorLset,
342345
getFlagsMap(cmd.Flags()),
343346
*endpoints,
@@ -381,8 +384,6 @@ func registerQuery(app *extkingpin.App) {
381384
*enforceTenancy,
382385
*tenantLabel,
383386
*enableGroupReplicaPartialStrategy,
384-
*enableDedupMerge,
385-
*enableQuorumChunkDedup,
386387
)
387388
})
388389
}
@@ -423,6 +424,7 @@ func runQuery(
423424
queryConnMetricLabels []string,
424425
queryReplicaLabels []string,
425426
queryPartitionLabels []string,
427+
queryDeduplicationFunc string,
426428
selectorLset labels.Labels,
427429
flagsMap map[string]string,
428430
endpointAddrs []string,
@@ -466,8 +468,6 @@ func runQuery(
466468
enforceTenancy bool,
467469
tenantLabel string,
468470
groupReplicaPartialResponseStrategy bool,
469-
enableDedupMerge bool,
470-
enableQuorumChunkDedup bool,
471471
) error {
472472
comp := component.Query
473473
if alertQueryURL == "" {
@@ -554,7 +554,7 @@ func runQuery(
554554
options := []store.ProxyStoreOption{
555555
store.WithTSDBSelector(tsdbSelector),
556556
store.WithProxyStoreDebugLogging(debugLogging),
557-
store.WithQuorumChunkDedup(enableQuorumChunkDedup),
557+
store.WithQuorumChunkDedup(queryDeduplicationFunc == dedup.AlgorithmQuorum),
558558
}
559559

560560
// Parse and sanitize the provided replica labels flags.
@@ -596,7 +596,7 @@ func runQuery(
596596
)
597597
opts := query.Options{
598598
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
599-
EnableDedupMerge: enableDedupMerge,
599+
DeduplicationFunc: queryDeduplicationFunc,
600600
}
601601
level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts))
602602
queryableCreator = query.NewQueryableCreatorWithOptions(

pkg/dedup/iter.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ import (
2323
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
2424
const initialPenalty = 5000
2525

26+
const (
27+
AlgorithmPenalty = "penalty"
28+
AlgorithmChain = "chain"
29+
AlgorithmQuorum = "quorum"
30+
)
31+
2632
type dedupSeriesSet struct {
2733
set storage.SeriesSet
2834

@@ -32,7 +38,8 @@ type dedupSeriesSet struct {
3238
peek storage.Series
3339
ok bool
3440

35-
f string
41+
f string
42+
deduplicationFunc string
3643
}
3744

3845
// isCounter deduces whether a counter metric has been passed. There must be
@@ -109,8 +116,8 @@ func (o *overlapSplitSet) Err() error {
109116

110117
// NewSeriesSet returns seriesSet that deduplicates the same series.
111118
// The series in series set are expected be sorted by all labels.
112-
func NewSeriesSet(set storage.SeriesSet, f string) storage.SeriesSet {
113-
s := &dedupSeriesSet{set: set, f: f}
119+
func NewSeriesSet(set storage.SeriesSet, f string, deduplicationFunc string) storage.SeriesSet {
120+
s := &dedupSeriesSet{set: set, f: f, deduplicationFunc: deduplicationFunc}
114121
s.ok = s.set.Next()
115122
if s.ok {
116123
s.peek = s.set.At()
@@ -160,9 +167,12 @@ func (s *dedupSeriesSet) At() storage.Series {
160167
// Clients may store the series, so we must make a copy of the slice before advancing.
161168
repl := make([]storage.Series, len(s.replicas))
162169
copy(repl, s.replicas)
163-
if s.f == UseMergedSeries {
170+
if s.deduplicationFunc == AlgorithmQuorum {
164171
// merge all samples which are ingested via receiver, no skips.
165-
return NewMergedSeries(s.lset, repl)
172+
return NewQuorumSeries(s.lset, repl, s.f)
173+
}
174+
if s.deduplicationFunc == AlgorithmChain {
175+
return seriesWithLabels{Series: storage.ChainedSeriesMerge(repl...), lset: s.lset}
166176
}
167177
return newDedupSeries(s.lset, repl, s.f)
168178
}

pkg/dedup/iter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ func TestDedupSeriesSet(t *testing.T) {
538538
if tcase.isCounter {
539539
f = "rate"
540540
}
541-
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f)
541+
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f, AlgorithmPenalty)
542542
var ats []storage.Series
543543
for dedupSet.Next() {
544544
ats = append(ats, dedupSet.At())
Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,42 @@ import (
1212
"github.com/prometheus/prometheus/tsdb/chunkenc"
1313
)
1414

15-
const UseMergedSeries = "use_merged_series"
16-
17-
// mergedSeries is a storage.Series that implements a simple merge sort algorithm.
18-
// when replicas has conflict values at the same timestamp, the first replica will be selected.
19-
type mergedSeries struct {
15+
// quorumSeries is a storage.Series that implements quorum algorithm.
16+
// when replicas has conflict values at the same timestamp, the value in majority replica will be selected.
17+
type quorumSeries struct {
2018
lset labels.Labels
2119
replicas []storage.Series
20+
21+
isCounter bool
2222
}
2323

24-
func NewMergedSeries(lset labels.Labels, replicas []storage.Series) storage.Series {
25-
return &mergedSeries{
24+
func NewQuorumSeries(lset labels.Labels, replicas []storage.Series, f string) storage.Series {
25+
return &quorumSeries{
2626
lset: lset,
2727
replicas: replicas,
28+
29+
isCounter: isCounter(f),
2830
}
2931
}
3032

31-
func (m *mergedSeries) Labels() labels.Labels {
33+
func (m *quorumSeries) Labels() labels.Labels {
3234
return m.lset
3335
}
34-
func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
35-
iters := make([]chunkenc.Iterator, 0, len(m.replicas))
36+
func (m *quorumSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
37+
iters := make([]adjustableSeriesIterator, 0, len(m.replicas))
3638
oks := make([]bool, 0, len(m.replicas))
3739
for _, r := range m.replicas {
38-
it := r.Iterator(nil)
40+
var it adjustableSeriesIterator
41+
if m.isCounter {
42+
it = &counterErrAdjustSeriesIterator{Iterator: r.Iterator(nil)}
43+
} else {
44+
it = &noopAdjustableSeriesIterator{Iterator: r.Iterator(nil)}
45+
}
3946
ok := it.Next() != chunkenc.ValNone // iterate to the first value.
4047
iters = append(iters, it)
4148
oks = append(oks, ok)
4249
}
43-
return &mergedSeriesIterator{
50+
return &quorumSeriesIterator{
4451
iters: iters,
4552
oks: oks,
4653
lastT: math.MinInt64,
@@ -49,50 +56,50 @@ func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
4956
}
5057

5158
type quorumValuePicker struct {
52-
currentValue int64
59+
currentValue float64
5360
cnt int
5461
}
5562

5663
func NewQuorumValuePicker(v float64) *quorumValuePicker {
5764
return &quorumValuePicker{
58-
currentValue: int64(v),
65+
currentValue: v,
5966
cnt: 1,
6067
}
6168
}
6269

6370
// Return true if this is the new majority value.
6471
func (q *quorumValuePicker) addValue(v float64) bool {
65-
iv := int64(v)
66-
if q.currentValue == iv {
72+
if q.currentValue == v {
6773
q.cnt++
6874
} else {
6975
q.cnt--
7076
if q.cnt == 0 {
71-
q.currentValue = iv
77+
q.currentValue = v
7278
q.cnt = 1
7379
return true
7480
}
7581
}
7682
return false
7783
}
7884

79-
type mergedSeriesIterator struct {
80-
iters []chunkenc.Iterator
85+
type quorumSeriesIterator struct {
86+
iters []adjustableSeriesIterator
8187
oks []bool
8288

8389
lastT int64
84-
lastIter chunkenc.Iterator
90+
lastV float64
91+
lastIter adjustableSeriesIterator
8592
}
8693

87-
func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
94+
func (m *quorumSeriesIterator) Next() chunkenc.ValueType {
8895
// m.lastIter points to the last iterator that has the latest timestamp.
8996
// m.lastT always aligns with m.lastIter unless when m.lastIter is nil.
9097
// m.lastIter is nil only in the following cases:
9198
// 1. Next()/Seek() is never called. m.lastT is math.MinInt64 in this case.
9299
// 2. The iterator runs out of values. m.lastT is the last timestamp in this case.
93100
minT := int64(math.MaxInt64)
94-
var lastIter chunkenc.Iterator
95-
quoramValue := NewQuorumValuePicker(0)
101+
var lastIter adjustableSeriesIterator
102+
quoramValue := NewQuorumValuePicker(0.0)
96103
for i, it := range m.iters {
97104
if !m.oks[i] {
98105
continue
@@ -101,6 +108,8 @@ func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
101108
m.oks[i] = it.Seek(m.lastT+initialPenalty) != chunkenc.ValNone
102109
// The it.Seek() call above should guarantee that it.AtT() > m.lastT.
103110
if m.oks[i] {
111+
// adjust the current value for counter functions to avoid unexpected resets
112+
it.adjustAtValue(m.lastV)
104113
t, v := it.At()
105114
if t < minT {
106115
minT = t
@@ -117,11 +126,12 @@ func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
117126
if m.lastIter == nil {
118127
return chunkenc.ValNone
119128
}
129+
m.lastV = quoramValue.currentValue
120130
m.lastT = minT
121131
return chunkenc.ValFloat
122132
}
123133

124-
func (m *mergedSeriesIterator) Seek(t int64) chunkenc.ValueType {
134+
func (m *quorumSeriesIterator) Seek(t int64) chunkenc.ValueType {
125135
// Don't use underlying Seek, but iterate over next to not miss gaps.
126136
for m.lastT < t && m.Next() != chunkenc.ValNone {
127137
}
@@ -132,25 +142,25 @@ func (m *mergedSeriesIterator) Seek(t int64) chunkenc.ValueType {
132142
return chunkenc.ValFloat
133143
}
134144

135-
func (m *mergedSeriesIterator) At() (t int64, v float64) {
145+
func (m *quorumSeriesIterator) At() (t int64, v float64) {
136146
return m.lastIter.At()
137147
}
138148

139-
func (m *mergedSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
149+
func (m *quorumSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
140150
return m.lastIter.AtHistogram(h)
141151
}
142152

143-
func (m *mergedSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
153+
func (m *quorumSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
144154
return m.lastIter.AtFloatHistogram(fh)
145155
}
146156

147-
func (m *mergedSeriesIterator) AtT() int64 {
157+
func (m *quorumSeriesIterator) AtT() int64 {
148158
return m.lastT
149159
}
150160

151161
// Err All At() funcs should panic if called after Next() or Seek() return ValNone.
152162
// Only Err() should return nil even after Next() or Seek() return ValNone.
153-
func (m *mergedSeriesIterator) Err() error {
163+
func (m *quorumSeriesIterator) Err() error {
154164
if m.lastIter == nil {
155165
return nil
156166
}

0 commit comments

Comments
 (0)