Skip to content

Commit 0168fd1

Browse files
committed
[ES-1292925] fix reusable counter resets
Signed-off-by: Yi Jin <[email protected]>
1 parent b0aff72 commit 0168fd1

File tree

6 files changed

+151
-40
lines changed

6 files changed

+151
-40
lines changed

cmd/thanos/query.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@ import (
2828
"github.com/prometheus/prometheus/discovery/targetgroup"
2929
"github.com/prometheus/prometheus/model/labels"
3030
"github.com/prometheus/prometheus/promql"
31-
"github.com/thanos-io/promql-engine/api"
3231

32+
"github.com/thanos-io/promql-engine/api"
3333
apiv1 "github.com/thanos-io/thanos/pkg/api/query"
3434
"github.com/thanos-io/thanos/pkg/api/query/querypb"
3535
"github.com/thanos-io/thanos/pkg/block"
3636
"github.com/thanos-io/thanos/pkg/compact/downsample"
3737
"github.com/thanos-io/thanos/pkg/component"
38+
"github.com/thanos-io/thanos/pkg/dedup"
3839
"github.com/thanos-io/thanos/pkg/discovery/cache"
3940
"github.com/thanos-io/thanos/pkg/discovery/dns"
4041
"github.com/thanos-io/thanos/pkg/exemplars"
@@ -128,10 +129,10 @@ func registerQuery(app *extkingpin.App) {
128129
Strings()
129130
queryPartitionLabels := cmd.Flag("query.partition-label", "Labels that partition the leaf queriers. This is used to scope down the labelsets of leaf queriers when using the distributed query mode. If set, these labels must form a partition of the leaf queriers. Partition labels must not intersect with replica labels. Every TSDB of a leaf querier must have these labels. This is useful when there are multiple external labels that are irrelevant for the partition as it allows the distributed engine to ignore them for some optimizations. If this is empty then all labels are used as partition labels.").Strings()
130131

131-
enableDedupMerge := cmd.Flag("query.dedup-merge", "Enable deduplication merge of multiple time series with the same labels.").
132-
Default("false").Bool()
133-
enableQuorumChunkDedup := cmd.Flag("query.quorum-chunk-dedup", "Enable quorum-based deduplication for chunks from replicas.").
134-
Default("false").Bool()
132+
queryDeduplicationFunc := cmd.Flag("query.deduplication.func", "Experimental. Deduplication algorithm for merging overlapping series. "+
133+
"Possible values are: \"penalty\", \"chain\". If no value is specified, penalty based deduplication algorithm will be used. "+
134+
"When set to chain, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. At least one replica label has to be set via --query.replica-label flag.").
135+
Default(dedup.AlgorithmPenalty).Enum(dedup.AlgorithmPenalty, dedup.AlgorithmChain, dedup.AlgorithmQuorum)
135136

136137
instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())
137138

@@ -338,6 +339,7 @@ func registerQuery(app *extkingpin.App) {
338339
*queryConnMetricLabels,
339340
*queryReplicaLabels,
340341
*queryPartitionLabels,
342+
*queryDeduplicationFunc,
341343
selectorLset,
342344
getFlagsMap(cmd.Flags()),
343345
*endpoints,
@@ -381,8 +383,6 @@ func registerQuery(app *extkingpin.App) {
381383
*enforceTenancy,
382384
*tenantLabel,
383385
*enableGroupReplicaPartialStrategy,
384-
*enableDedupMerge,
385-
*enableQuorumChunkDedup,
386386
)
387387
})
388388
}
@@ -423,6 +423,7 @@ func runQuery(
423423
queryConnMetricLabels []string,
424424
queryReplicaLabels []string,
425425
queryPartitionLabels []string,
426+
queryDeduplicationFunc string,
426427
selectorLset labels.Labels,
427428
flagsMap map[string]string,
428429
endpointAddrs []string,
@@ -466,8 +467,6 @@ func runQuery(
466467
enforceTenancy bool,
467468
tenantLabel string,
468469
groupReplicaPartialResponseStrategy bool,
469-
enableDedupMerge bool,
470-
enableQuorumChunkDedup bool,
471470
) error {
472471
comp := component.Query
473472
if alertQueryURL == "" {
@@ -554,7 +553,7 @@ func runQuery(
554553
options := []store.ProxyStoreOption{
555554
store.WithTSDBSelector(tsdbSelector),
556555
store.WithProxyStoreDebugLogging(debugLogging),
557-
store.WithQuorumChunkDedup(enableQuorumChunkDedup),
556+
store.WithQuorumChunkDedup(queryDeduplicationFunc == dedup.AlgorithmQuorum),
558557
}
559558

560559
// Parse and sanitize the provided replica labels flags.
@@ -596,7 +595,7 @@ func runQuery(
596595
)
597596
opts := query.Options{
598597
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
599-
EnableDedupMerge: enableDedupMerge,
598+
DeduplicationFunc: queryDeduplicationFunc,
600599
}
601600
level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts))
602601
queryableCreator = query.NewQueryableCreatorWithOptions(

pkg/dedup/iter.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ import (
2323
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
2424
const initialPenalty = 5000
2525

26+
const (
27+
AlgorithmPenalty = "penalty"
28+
AlgorithmChain = "chain"
29+
AlgorithmQuorum = "quorum"
30+
)
31+
2632
type dedupSeriesSet struct {
2733
set storage.SeriesSet
2834

@@ -32,7 +38,8 @@ type dedupSeriesSet struct {
3238
peek storage.Series
3339
ok bool
3440

35-
f string
41+
f string
42+
deduplicationFunc string
3643
}
3744

3845
// isCounter deduces whether a counter metric has been passed. There must be
@@ -109,8 +116,8 @@ func (o *overlapSplitSet) Err() error {
109116

110117
// NewSeriesSet returns seriesSet that deduplicates the same series.
111118
// The series in series set are expected be sorted by all labels.
112-
func NewSeriesSet(set storage.SeriesSet, f string) storage.SeriesSet {
113-
s := &dedupSeriesSet{set: set, f: f}
119+
func NewSeriesSet(set storage.SeriesSet, f string, deduplicationFunc string) storage.SeriesSet {
120+
s := &dedupSeriesSet{set: set, f: f, deduplicationFunc: deduplicationFunc}
114121
s.ok = s.set.Next()
115122
if s.ok {
116123
s.peek = s.set.At()
@@ -160,9 +167,12 @@ func (s *dedupSeriesSet) At() storage.Series {
160167
// Clients may store the series, so we must make a copy of the slice before advancing.
161168
repl := make([]storage.Series, len(s.replicas))
162169
copy(repl, s.replicas)
163-
if s.f == UseMergedSeries {
170+
if s.deduplicationFunc == AlgorithmChain {
171+
return storage.ChainedSeriesMerge(repl...)
172+
} else if s.deduplicationFunc == AlgorithmQuorum {
164173
// merge all samples which are ingested via receiver, no skips.
165-
return NewMergedSeries(s.lset, repl)
174+
// feed the merged series into dedup series which apply counter adjustment
175+
return NewMergedSeries(s.lset, repl, s.f)
166176
}
167177
return newDedupSeries(s.lset, repl, s.f)
168178
}

pkg/dedup/iter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ func TestDedupSeriesSet(t *testing.T) {
538538
if tcase.isCounter {
539539
f = "rate"
540540
}
541-
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f)
541+
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f, AlgorithmPenalty)
542542
var ats []storage.Series
543543
for dedupSet.Next() {
544544
ats = append(ats, dedupSet.At())

pkg/dedup/merge_iter.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,37 @@ import (
1212
"github.com/prometheus/prometheus/tsdb/chunkenc"
1313
)
1414

15-
const UseMergedSeries = "use_merged_series"
16-
1715
// mergedSeries is a storage.Series that implements a simple merge sort algorithm.
1816
// when replicas has conflict values at the same timestamp, the first replica will be selected.
1917
type mergedSeries struct {
2018
lset labels.Labels
2119
replicas []storage.Series
20+
21+
isCounter bool
2222
}
2323

24-
func NewMergedSeries(lset labels.Labels, replicas []storage.Series) storage.Series {
24+
func NewMergedSeries(lset labels.Labels, replicas []storage.Series, f string) storage.Series {
2525
return &mergedSeries{
2626
lset: lset,
2727
replicas: replicas,
28+
29+
isCounter: isCounter(f),
2830
}
2931
}
3032

3133
func (m *mergedSeries) Labels() labels.Labels {
3234
return m.lset
3335
}
3436
func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
35-
iters := make([]chunkenc.Iterator, 0, len(m.replicas))
37+
iters := make([]adjustableSeriesIterator, 0, len(m.replicas))
3638
oks := make([]bool, 0, len(m.replicas))
3739
for _, r := range m.replicas {
38-
it := r.Iterator(nil)
40+
var it adjustableSeriesIterator
41+
if m.isCounter {
42+
it = &counterErrAdjustSeriesIterator{Iterator: r.Iterator(nil)}
43+
} else {
44+
it = &noopAdjustableSeriesIterator{Iterator: r.Iterator(nil)}
45+
}
3946
ok := it.Next() != chunkenc.ValNone // iterate to the first value.
4047
iters = append(iters, it)
4148
oks = append(oks, ok)
@@ -49,26 +56,25 @@ func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
4956
}
5057

5158
type quorumValuePicker struct {
52-
currentValue int64
59+
currentValue float64
5360
cnt int
5461
}
5562

5663
func NewQuorumValuePicker(v float64) *quorumValuePicker {
5764
return &quorumValuePicker{
58-
currentValue: int64(v),
65+
currentValue: v,
5966
cnt: 1,
6067
}
6168
}
6269

6370
// Return true if this is the new majority value.
6471
func (q *quorumValuePicker) addValue(v float64) bool {
65-
iv := int64(v)
66-
if q.currentValue == iv {
72+
if q.currentValue == v {
6773
q.cnt++
6874
} else {
6975
q.cnt--
7076
if q.cnt == 0 {
71-
q.currentValue = iv
77+
q.currentValue = v
7278
q.cnt = 1
7379
return true
7480
}
@@ -77,11 +83,12 @@ func (q *quorumValuePicker) addValue(v float64) bool {
7783
}
7884

7985
type mergedSeriesIterator struct {
80-
iters []chunkenc.Iterator
86+
iters []adjustableSeriesIterator
8187
oks []bool
8288

8389
lastT int64
84-
lastIter chunkenc.Iterator
90+
lastV float64
91+
lastIter adjustableSeriesIterator
8592
}
8693

8794
func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
@@ -91,8 +98,8 @@ func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
9198
// 1. Next()/Seek() is never called. m.lastT is math.MinInt64 in this case.
9299
// 2. The iterator runs out of values. m.lastT is the last timestamp in this case.
93100
minT := int64(math.MaxInt64)
94-
var lastIter chunkenc.Iterator
95-
quoramValue := NewQuorumValuePicker(0)
101+
var lastIter adjustableSeriesIterator
102+
quoramValue := NewQuorumValuePicker(0.0)
96103
for i, it := range m.iters {
97104
if !m.oks[i] {
98105
continue
@@ -117,6 +124,8 @@ func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
117124
if m.lastIter == nil {
118125
return chunkenc.ValNone
119126
}
127+
m.lastIter.adjustAtValue(m.lastV)
128+
_, m.lastV = m.lastIter.At()
120129
m.lastT = minT
121130
return chunkenc.ValFloat
122131
}

pkg/dedup/merge_iter_test.go

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
)
1515

1616
func TestIteratorEdgeCases(t *testing.T) {
17-
ms := NewMergedSeries(labels.Labels{}, []storage.Series{})
17+
ms := NewMergedSeries(labels.Labels{}, []storage.Series{}, "")
1818
it := ms.Iterator(nil)
1919
testutil.Ok(t, it.Err())
2020
testutil.Equals(t, int64(math.MinInt64), it.AtT())
@@ -244,10 +244,106 @@ func TestMergedSeriesIterator(t *testing.T) {
244244
},
245245
},
246246
},
247+
{
248+
// Regression test against https://github.com/thanos-io/thanos/issues/2401.
249+
// Two counter series, when one (initially chosen) series is having hiccup (few dropped samples), while second is live.
250+
// This also happens when 2 replicas scrape in different time (they usually do) and one sees later counter value then the other.
251+
// Now, depending on what replica we look, we can see totally different counter value in total where total means
252+
// after accounting for counter resets. We account for that in downsample.CounterSeriesIterator, mainly because
253+
// we handle downsample Counter Aggregations specially (for detecting resets between chunks).
254+
name: "Regression test against 2401",
255+
isCounter: true,
256+
input: []series{
257+
{
258+
lset: labels.FromStrings("a", "1"),
259+
samples: []sample{
260+
{10000, 8.0}, // Smaller timestamp, this will be chosen. CurrValue = 8.0.
261+
{20000, 9.0}, // Same. CurrValue = 9.0.
262+
// {Gap} app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator.
263+
{50001, 9 + 1.0}, // Next after 20000+1 has a bit higher than timestamp then in second series. Penalty 5000 will be added.
264+
{60000, 9 + 2.0},
265+
{70000, 9 + 3.0},
266+
{80000, 9 + 4.0},
267+
{90000, 9 + 5.0}, // This should be now taken, and we expect 14 to be correct value now.
268+
{100000, 9 + 6.0},
269+
},
270+
}, {
271+
lset: labels.FromStrings("a", "1"),
272+
samples: []sample{
273+
{10001, 8.0}, // Penalty 5000 will be added.
274+
// 20001 was app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator. Penalty 2 * (20000 - 10000) will be added.
275+
// 30001 no sample. Within penalty, ignored.
276+
{45001, 8 + 0.5}, // Smaller timestamp, this will be chosen. CurrValue = 8.5 which is smaller than last chosen value.
277+
{55001, 8 + 1.5},
278+
{65001, 8 + 2.5},
279+
// {Gap} app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator.
280+
},
281+
},
282+
},
283+
exp: []series{
284+
{
285+
lset: labels.FromStrings("a", "1"),
286+
samples: []sample{{10000, 8}, {20000, 9}, {45001, 9}, {t: 50001, f: 10}, {55001, 10}, {65001, 11}, {t: 80000, f: 13}, {90000, 14}, {100000, 15}},
287+
},
288+
},
289+
},
290+
{
291+
// Same thing but not for counter should not adjust anything.
292+
name: "Regression test with no counter adjustment",
293+
isCounter: false,
294+
input: []series{
295+
{
296+
lset: labels.FromStrings("a", "1"),
297+
samples: []sample{
298+
{10000, 8.0}, {20000, 9.0}, {50001, 9 + 1.0}, {60000, 9 + 2.0}, {70000, 9 + 3.0}, {80000, 9 + 4.0}, {90000, 9 + 5.0}, {100000, 9 + 6.0},
299+
},
300+
}, {
301+
lset: labels.FromStrings("a", "1"),
302+
samples: []sample{
303+
{10001, 8.0}, {45001, 8 + 0.5}, {55001, 8 + 1.5}, {65001, 8 + 2.5},
304+
},
305+
},
306+
},
307+
exp: []series{
308+
{
309+
lset: labels.FromStrings("a", "1"),
310+
samples: []sample{{10000, 8}, {20000, 9}, {45001, 8.5}, {t: 50001, f: 10}, {55001, 9.5}, {65001, 10.5}, {t: 80000, f: 13}, {90000, 14}, {100000, 15}},
311+
},
312+
},
313+
},
314+
//{
315+
// name: "Reusable counter with resets and large gaps",
316+
// isCounter: true,
317+
// input: []series{
318+
// {
319+
// lset: labels.FromStrings("a", "1"),
320+
// samples: []sample{
321+
// {10000, 8.0}, {20000, 9.0}, {1050001, 1.0}, {1060001, 5.0}, {2060001, 3.0},
322+
// },
323+
// },
324+
// {
325+
// lset: labels.FromStrings("a", "1"),
326+
// samples: []sample{
327+
// {10000, 8.0}, {20000, 9.0}, {1050001, 1.0}, {1060001, 5.0}, {2060001, 3.0},
328+
// },
329+
// },
330+
// },
331+
// exp: []series{
332+
// {
333+
// lset: labels.FromStrings("a", "1"),
334+
// samples: []sample{{10000, 8.0}, {20000, 9.0}, {1050001, 10.0}, {1060001, 16.0}, {2060001, 19.0}},
335+
// },
336+
// },
337+
//},
247338
} {
248339
t.Run(tcase.name, func(t *testing.T) {
249340
// If it is a counter then pass a function which expects a counter.
250-
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, UseMergedSeries)
341+
// If it is a counter then pass a function which expects a counter.
342+
f := ""
343+
if tcase.isCounter {
344+
f = "rate"
345+
}
346+
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f, AlgorithmQuorum)
251347
var ats []storage.Series
252348
for dedupSet.Next() {
253349
ats = append(ats, dedupSet.At())

0 commit comments

Comments
 (0)