Skip to content

Commit 759d333

Browse files
authored
Simplify merge iterator imp (#66)
2 parents 6d36733 + c881437 commit 759d333

File tree

2 files changed

+64
-29
lines changed

2 files changed

+64
-29
lines changed

pkg/dedup/merge_iter.go

Lines changed: 63 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,34 @@ func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
4848
}
4949
}
5050

51+
type quorumValuePicker struct {
52+
currentValue int64
53+
cnt int
54+
}
55+
56+
func NewQuorumValuePicker(v float64) *quorumValuePicker {
57+
return &quorumValuePicker{
58+
currentValue: int64(v),
59+
cnt: 1,
60+
}
61+
}
62+
63+
// Return true if this is the new majority value.
64+
func (q *quorumValuePicker) addValue(v float64) bool {
65+
iv := int64(v)
66+
if q.currentValue == iv {
67+
q.cnt++
68+
} else {
69+
q.cnt--
70+
if q.cnt == 0 {
71+
q.currentValue = iv
72+
q.cnt = 1
73+
return true
74+
}
75+
}
76+
return false
77+
}
78+
5179
type mergedSeriesIterator struct {
5280
iters []chunkenc.Iterator
5381
oks []bool
@@ -57,45 +85,53 @@ type mergedSeriesIterator struct {
5785
}
5886

5987
func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
60-
return m.Seek(m.lastT + initialPenalty) // apply penalty to avoid selecting samples too close
61-
}
62-
63-
func (m *mergedSeriesIterator) Seek(t int64) chunkenc.ValueType {
64-
if len(m.iters) == 0 {
65-
return chunkenc.ValNone
66-
}
67-
68-
picked := int64(math.MaxInt64)
88+
// m.lastIter points to the last iterator that has the latest timestamp.
89+
// m.lastT always aligns with m.lastIter unless when m.lastIter is nil.
90+
// m.lastIter is nil only in the following cases:
91+
// 1. Next()/Seek() is never called. m.lastT is math.MinInt64 in this case.
92+
// 2. The iterator runs out of values. m.lastT is the last timestamp in this case.
93+
minT := int64(math.MaxInt64)
94+
var lastIter chunkenc.Iterator
95+
quoramValue := NewQuorumValuePicker(0)
6996
for i, it := range m.iters {
7097
if !m.oks[i] {
7198
continue
7299
}
73-
if it == m.lastIter || it.AtT() <= m.lastT {
74-
m.oks[i] = it.Seek(t) != chunkenc.ValNone // move forward for last iterator.
75-
if !m.oks[i] {
76-
continue
77-
}
78-
}
79-
currT := it.AtT()
80-
if currT >= t {
81-
if currT < picked {
82-
picked = currT
83-
m.lastIter = it
84-
} else if currT == picked {
85-
_, currV := it.At()
86-
_, pickedV := m.lastIter.At()
87-
if currV < pickedV {
88-
m.lastIter = it
100+
// apply penalty to avoid selecting samples too close
101+
m.oks[i] = it.Seek(m.lastT+initialPenalty) != chunkenc.ValNone
102+
// The it.Seek() call above should guarantee that it.AtT() > m.lastT.
103+
if m.oks[i] {
104+
t, v := it.At()
105+
if t < minT {
106+
minT = t
107+
lastIter = it
108+
quoramValue = NewQuorumValuePicker(v)
109+
} else if t == minT {
110+
if quoramValue.addValue(v) {
111+
lastIter = it
89112
}
90113
}
91114
}
92115
}
93-
if picked == math.MaxInt64 {
116+
m.lastIter = lastIter
117+
if m.lastIter == nil {
94118
return chunkenc.ValNone
95119
}
96-
m.lastT = picked
120+
m.lastT = minT
97121
return chunkenc.ValFloat
98122
}
123+
124+
func (m *mergedSeriesIterator) Seek(t int64) chunkenc.ValueType {
125+
// Don't use underlying Seek, but iterate over next to not miss gaps.
126+
for m.lastT < t && m.Next() != chunkenc.ValNone {
127+
}
128+
// Don't call m.Next() again!
129+
if m.lastIter == nil {
130+
return chunkenc.ValNone
131+
}
132+
return chunkenc.ValFloat
133+
}
134+
99135
func (m *mergedSeriesIterator) At() (t int64, v float64) {
100136
return m.lastIter.At()
101137
}

pkg/dedup/merge_iter_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,7 @@ func TestMergedSeriesIterator(t *testing.T) {
187187
}, {
188188
lset: labels.Labels{{Name: "a", Value: "5"}, {Name: "c", Value: "6"}},
189189
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}, {50000, 5}},
190-
},
191-
{
190+
}, {
192191
lset: labels.Labels{{Name: "b", Value: "5"}, {Name: "c", Value: "6"}},
193192
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}, {50000, 5}},
194193
}, {

0 commit comments

Comments
 (0)