Skip to content

Commit 72a3cb8

Browse files
authored
[PLAT-108920] adding another dedup iterator to merge samples instead of skipping them (#41)
2 parents 9299649 + 41f59c3 commit 72a3cb8

File tree

5 files changed

+424
-135
lines changed

5 files changed

+424
-135
lines changed

cmd/thanos/query.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ func registerQuery(app *extkingpin.App) {
127127
queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules.").
128128
Strings()
129129

130+
enableDedupMerge := cmd.Flag("query.dedup-merge", "Enable deduplication merge of multiple time series with the same labels.").
131+
Default("false").Bool()
132+
130133
instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())
131134

132135
defaultMetadataTimeRange := cmd.Flag("query.metadata.default-time-range", "The default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. The zero value means range covers the time since the beginning.").Default("0s").Duration()
@@ -374,6 +377,7 @@ func registerQuery(app *extkingpin.App) {
374377
*enforceTenancy,
375378
*tenantLabel,
376379
*enableGroupReplicaPartialStrategy,
380+
*enableDedupMerge,
377381
)
378382
})
379383
}
@@ -457,6 +461,7 @@ func runQuery(
457461
enforceTenancy bool,
458462
tenantLabel string,
459463
groupReplicaPartialResponseStrategy bool,
464+
enableDedupMerge bool,
460465
) error {
461466
if alertQueryURL == "" {
462467
lastColon := strings.LastIndex(httpBindAddr, ":")
@@ -566,24 +571,19 @@ func runQuery(
566571
exemplarsProxy = exemplars.NewProxy(logger, endpoints.GetExemplarsStores, selectorLset)
567572
queryableCreator query.QueryableCreator
568573
)
569-
if groupReplicaPartialResponseStrategy {
570-
level.Info(logger).Log("msg", "Enabled group-replica partial response strategy")
571-
queryableCreator = query.NewQueryableCreatorWithGroupReplicaPartialResponseStrategy(
572-
logger,
573-
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
574-
proxy,
575-
maxConcurrentSelects,
576-
queryTimeout,
577-
)
578-
} else {
579-
queryableCreator = query.NewQueryableCreator(
580-
logger,
581-
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
582-
proxy,
583-
maxConcurrentSelects,
584-
queryTimeout,
585-
)
574+
opts := query.Options{
575+
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
576+
EnableDedupMerge: enableDedupMerge,
586577
}
578+
level.Info(logger).Log("msg", "databricks querier features", "opts", opts)
579+
queryableCreator = query.NewQueryableCreatorWithOptions(
580+
logger,
581+
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
582+
proxy,
583+
maxConcurrentSelects,
584+
queryTimeout,
585+
opts,
586+
)
587587

588588
// Run File Service Discovery and update the store set when the files are modified.
589589
if fileSD != nil {

pkg/dedup/iter.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,20 @@ import (
1111
"github.com/prometheus/prometheus/storage"
1212
"github.com/prometheus/prometheus/tsdb/chunkenc"
1313
"github.com/prometheus/prometheus/util/annotations"
14-
1514
"github.com/thanos-io/thanos/pkg/store/storepb"
1615
)
1716

17+
// For the series we didn't pick, add a penalty twice as high as the delta of the last two
18+
// samples to the next seek against it.
19+
// This ensures that we don't pick a sample too close, which would increase the overall
20+
// sample frequency. It also guards against clock drift and inaccuracies during
21+
// timestamp assignment.
22+
// If we don't know a delta yet, we pick 5000 as a constant, which is based on the knowledge
23+
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
24+
const initialPenalty = 5000
25+
1826
type dedupSeriesSet struct {
19-
set storage.SeriesSet
20-
isCounter bool
27+
set storage.SeriesSet
2128

2229
replicas []storage.Series
2330

@@ -103,12 +110,12 @@ func (o *overlapSplitSet) Err() error {
103110
// NewSeriesSet returns seriesSet that deduplicates the same series.
104111
// The series in series set are expected be sorted by all labels.
105112
func NewSeriesSet(set storage.SeriesSet, f string) storage.SeriesSet {
106-
// TODO: remove dependency on knowing whether it is a counter.
107-
s := &dedupSeriesSet{set: set, isCounter: isCounter(f), f: f}
113+
s := &dedupSeriesSet{set: set, f: f}
108114
s.ok = s.set.Next()
109115
if s.ok {
110116
s.peek = s.set.At()
111117
}
118+
112119
return s
113120
}
114121

@@ -153,7 +160,10 @@ func (s *dedupSeriesSet) At() storage.Series {
153160
// Clients may store the series, so we must make a copy of the slice before advancing.
154161
repl := make([]storage.Series, len(s.replicas))
155162
copy(repl, s.replicas)
156-
163+
if s.f == UseMergedSeries {
164+
// merge all samples which are ingested via receiver, no skips.
165+
return NewMergedSeries(s.lset, repl)
166+
}
157167
return newDedupSeries(s.lset, repl, s.f)
158168
}
159169

@@ -337,15 +347,6 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
337347

338348
it.useA = ta <= tb
339349

340-
// For the series we didn't pick, add a penalty twice as high as the delta of the last two
341-
// samples to the next seek against it.
342-
// This ensures that we don't pick a sample too close, which would increase the overall
343-
// sample frequency. It also guards against clock drift and inaccuracies during
344-
// timestamp assignment.
345-
// If we don't know a delta yet, we pick 5000 as a constant, which is based on the knowledge
346-
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
347-
const initialPenalty = 5000
348-
349350
if it.useA {
350351
if it.lastT != math.MinInt64 {
351352
it.penB = 2 * (ta - it.lastT)

pkg/dedup/merge_iter.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package dedup
5+
6+
import (
7+
"math"
8+
9+
"github.com/prometheus/prometheus/model/histogram"
10+
"github.com/prometheus/prometheus/model/labels"
11+
"github.com/prometheus/prometheus/storage"
12+
"github.com/prometheus/prometheus/tsdb/chunkenc"
13+
)
14+
15+
const UseMergedSeries = "use_merged_series"
16+
17+
// mergedSeries is a storage.Series that implements a simple merge sort algorithm.
18+
// when replicas has conflict values at the same timestamp, the first replica will be selected.
19+
type mergedSeries struct {
20+
lset labels.Labels
21+
replicas []storage.Series
22+
}
23+
24+
func NewMergedSeries(lset labels.Labels, replicas []storage.Series) storage.Series {
25+
return &mergedSeries{
26+
lset: lset,
27+
replicas: replicas,
28+
}
29+
}
30+
31+
func (m *mergedSeries) Labels() labels.Labels {
32+
return m.lset
33+
}
34+
func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
35+
iters := make([]chunkenc.Iterator, 0, len(m.replicas))
36+
oks := make([]bool, 0, len(m.replicas))
37+
for _, r := range m.replicas {
38+
it := r.Iterator(nil)
39+
ok := it.Next() != chunkenc.ValNone // iterate to the first value.
40+
iters = append(iters, it)
41+
oks = append(oks, ok)
42+
}
43+
return &mergedSeriesIterator{
44+
iters: iters,
45+
oks: oks,
46+
lastT: math.MinInt64,
47+
lastIter: nil, // behavior is undefined if At() is called before Next(), here we panic if it happens.
48+
}
49+
}
50+
51+
type mergedSeriesIterator struct {
52+
iters []chunkenc.Iterator
53+
oks []bool
54+
55+
lastT int64
56+
lastIter chunkenc.Iterator
57+
}
58+
59+
func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
60+
return m.Seek(m.lastT + initialPenalty) // apply penalty to avoid selecting samples too close
61+
}
62+
63+
func (m *mergedSeriesIterator) Seek(t int64) chunkenc.ValueType {
64+
if len(m.iters) == 0 {
65+
return chunkenc.ValNone
66+
}
67+
68+
picked := int64(math.MaxInt64)
69+
for i, it := range m.iters {
70+
if !m.oks[i] {
71+
continue
72+
}
73+
if it == m.lastIter || it.AtT() <= m.lastT {
74+
m.oks[i] = it.Seek(t) != chunkenc.ValNone // move forward for last iterator.
75+
if !m.oks[i] {
76+
continue
77+
}
78+
}
79+
currT := it.AtT()
80+
if currT >= t && currT < picked {
81+
picked = currT
82+
m.lastIter = it
83+
}
84+
}
85+
if picked == math.MaxInt64 {
86+
return chunkenc.ValNone
87+
}
88+
m.lastT = picked
89+
return chunkenc.ValFloat
90+
}
91+
func (m *mergedSeriesIterator) At() (t int64, v float64) {
92+
return m.lastIter.At()
93+
}
94+
95+
func (it *mergedSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
96+
return it.lastIter.AtHistogram(h)
97+
}
98+
99+
func (it *mergedSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
100+
return it.lastIter.AtFloatHistogram(fh)
101+
}
102+
103+
func (it *mergedSeriesIterator) AtT() int64 {
104+
return it.lastT
105+
}
106+
107+
func (m *mergedSeriesIterator) Err() error {
108+
return m.lastIter.Err()
109+
}

0 commit comments

Comments
 (0)