Skip to content

Commit ab31d53

Browse files
committed
rename to quorum
Signed-off-by: Yi Jin <[email protected]>
1 parent 77347ce commit ab31d53

File tree

3 files changed

+18
-19
lines changed

3 files changed

+18
-19
lines changed

pkg/dedup/iter.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,7 @@ func (s *dedupSeriesSet) At() storage.Series {
169169
copy(repl, s.replicas)
170170
if s.deduplicationFunc == AlgorithmQuorum {
171171
// merge all samples which are ingested via receiver, no skips.
172-
// feed the merged series into dedup series which apply counter adjustment
173-
return NewMergedSeries(s.lset, repl, s.f)
172+
return NewQuorumSeries(s.lset, repl, s.f)
174173
}
175174
if s.deduplicationFunc == AlgorithmChain {
176175
return seriesWithLabels{Series: storage.ChainedSeriesMerge(repl...), lset: s.lset}

pkg/dedup/quorum_iter.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,28 @@ import (
1212
"github.com/prometheus/prometheus/tsdb/chunkenc"
1313
)
1414

15-
// mergedSeries is a storage.Series that implements a simple merge sort algorithm.
16-
// when replicas has conflict values at the same timestamp, the first replica will be selected.
17-
type mergedSeries struct {
15+
// quorumSeries is a storage.Series that implements quorum algorithm.
16+
// when replicas has conflict values at the same timestamp, the value in majority replica will be selected.
17+
type quorumSeries struct {
1818
lset labels.Labels
1919
replicas []storage.Series
2020

2121
isCounter bool
2222
}
2323

24-
func NewMergedSeries(lset labels.Labels, replicas []storage.Series, f string) storage.Series {
25-
return &mergedSeries{
24+
func NewQuorumSeries(lset labels.Labels, replicas []storage.Series, f string) storage.Series {
25+
return &quorumSeries{
2626
lset: lset,
2727
replicas: replicas,
2828

2929
isCounter: isCounter(f),
3030
}
3131
}
3232

33-
func (m *mergedSeries) Labels() labels.Labels {
33+
func (m *quorumSeries) Labels() labels.Labels {
3434
return m.lset
3535
}
36-
func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
36+
func (m *quorumSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
3737
iters := make([]adjustableSeriesIterator, 0, len(m.replicas))
3838
oks := make([]bool, 0, len(m.replicas))
3939
for _, r := range m.replicas {
@@ -47,7 +47,7 @@ func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
4747
iters = append(iters, it)
4848
oks = append(oks, ok)
4949
}
50-
return &mergedSeriesIterator{
50+
return &quorumSeriesIterator{
5151
iters: iters,
5252
oks: oks,
5353
lastT: math.MinInt64,
@@ -82,7 +82,7 @@ func (q *quorumValuePicker) addValue(v float64) bool {
8282
return false
8383
}
8484

85-
type mergedSeriesIterator struct {
85+
type quorumSeriesIterator struct {
8686
iters []adjustableSeriesIterator
8787
oks []bool
8888

@@ -91,7 +91,7 @@ type mergedSeriesIterator struct {
9191
lastIter adjustableSeriesIterator
9292
}
9393

94-
func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
94+
func (m *quorumSeriesIterator) Next() chunkenc.ValueType {
9595
// m.lastIter points to the last iterator that has the latest timestamp.
9696
// m.lastT always aligns with m.lastIter unless when m.lastIter is nil.
9797
// m.lastIter is nil only in the following cases:
@@ -131,7 +131,7 @@ func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
131131
return chunkenc.ValFloat
132132
}
133133

134-
func (m *mergedSeriesIterator) Seek(t int64) chunkenc.ValueType {
134+
func (m *quorumSeriesIterator) Seek(t int64) chunkenc.ValueType {
135135
// Don't use underlying Seek, but iterate over next to not miss gaps.
136136
for m.lastT < t && m.Next() != chunkenc.ValNone {
137137
}
@@ -142,25 +142,25 @@ func (m *mergedSeriesIterator) Seek(t int64) chunkenc.ValueType {
142142
return chunkenc.ValFloat
143143
}
144144

145-
func (m *mergedSeriesIterator) At() (t int64, v float64) {
145+
func (m *quorumSeriesIterator) At() (t int64, v float64) {
146146
return m.lastIter.At()
147147
}
148148

149-
func (m *mergedSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
149+
func (m *quorumSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
150150
return m.lastIter.AtHistogram(h)
151151
}
152152

153-
func (m *mergedSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
153+
func (m *quorumSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
154154
return m.lastIter.AtFloatHistogram(fh)
155155
}
156156

157-
func (m *mergedSeriesIterator) AtT() int64 {
157+
func (m *quorumSeriesIterator) AtT() int64 {
158158
return m.lastT
159159
}
160160

161161
// Err All At() funcs should panic if called after Next() or Seek() return ValNone.
162162
// Only Err() should return nil even after Next() or Seek() return ValNone.
163-
func (m *mergedSeriesIterator) Err() error {
163+
func (m *quorumSeriesIterator) Err() error {
164164
if m.lastIter == nil {
165165
return nil
166166
}

pkg/dedup/quorum_iter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
)
1515

1616
func TestIteratorEdgeCases(t *testing.T) {
17-
ms := NewMergedSeries(labels.Labels{}, []storage.Series{}, "")
17+
ms := NewQuorumSeries(labels.Labels{}, []storage.Series{}, "")
1818
it := ms.Iterator(nil)
1919
testutil.Ok(t, it.Err())
2020
testutil.Equals(t, int64(math.MinInt64), it.AtT())

0 commit comments

Comments
 (0)