Skip to content

Commit 6f6040b

Browse files
ringbuffer: simplify interface (#636)
The Len() and PushIntoLast() methods of the ringbuffer interface were redundant. Len() was only used to check if the buffer is empty, we did just implement a convenience function that implements this logic using the already present MaxT method and we adjusted the scanner action to address the extlookback logic. Signed-off-by: Michael Hoffmann <mhoffmann@cloudflare.com>
1 parent 775bde4 commit 6f6040b

File tree

4 files changed

+30
-38
lines changed

4 files changed

+30
-38
lines changed

execution/scan/subquery.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,14 +216,14 @@ func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
216216
}
217217
for i, s := range v.Samples {
218218
buffer := o.buffers[v.SampleIDs[i]]
219-
if buffer.Len() > 0 && v.T <= buffer.MaxT() {
219+
if !ringbuffer.Empty(buffer) && v.T <= buffer.MaxT() {
220220
continue
221221
}
222222
buffer.Push(v.T, ringbuffer.Value{F: s})
223223
}
224224
for i, s := range v.Histograms {
225225
buffer := o.buffers[v.HistogramIDs[i]]
226-
if buffer.Len() > 0 && v.T < buffer.MaxT() {
226+
if !ringbuffer.Empty(buffer) && v.T < buffer.MaxT() {
227227
continue
228228
}
229229
// Set any "NotCounterReset" and "CounterReset" hints in native

ringbuffer/generic.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,16 @@ import (
1212
"github.com/prometheus/prometheus/model/histogram"
1313
)
1414

15+
type Buffer interface {
16+
MaxT() int64
17+
Push(t int64, v Value)
18+
Reset(mint int64, evalt int64)
19+
Eval(ctx context.Context, _, _ float64, _ *int64) (float64, *histogram.FloatHistogram, bool, error)
20+
SampleCount() int
21+
}
22+
23+
func Empty(b Buffer) bool { return b.MaxT() != math.MinInt64 }
24+
1525
type Value struct {
1626
F float64
1727
H *histogram.FloatHistogram
@@ -49,8 +59,6 @@ func NewWithExtLookback(ctx context.Context, size int, selectRange, offset, extL
4959
}
5060
}
5161

52-
func (r *GenericRingBuffer) Len() int { return len(r.items) }
53-
5462
func (r *GenericRingBuffer) SampleCount() int {
5563
c := 0
5664
for _, s := range r.items {
@@ -72,11 +80,6 @@ func (r *GenericRingBuffer) MaxT() int64 {
7280
return r.items[len(r.items)-1].T
7381
}
7482

75-
// ReadIntoLast reads a sample into the last slot in the buffer, replacing the existing sample.
76-
func (r *GenericRingBuffer) ReadIntoLast(f func(*Sample)) {
77-
f(&r.items[len(r.items)-1])
78-
}
79-
8083
// Push adds a new sample to the buffer.
8184
func (r *GenericRingBuffer) Push(t int64, v Value) {
8285
n := len(r.items)

ringbuffer/rate.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,6 @@ import (
1414
"github.com/prometheus/prometheus/model/histogram"
1515
)
1616

17-
type Buffer interface {
18-
Len() int
19-
MaxT() int64
20-
Push(t int64, v Value)
21-
Reset(mint int64, evalt int64)
22-
Eval(ctx context.Context, _, _ float64, _ *int64) (float64, *histogram.FloatHistogram, bool, error)
23-
ReadIntoLast(f func(*Sample))
24-
SampleCount() int
25-
}
26-
2717
// RateBuffer is a Buffer which can calculate rate, increase and delta for a
2818
// series in a streaming manner, calculating the value incrementally for each
2919
// step where the sample is used.
@@ -95,8 +85,6 @@ func NewRateBuffer(ctx context.Context, opts query.Options, isCounter, isRate bo
9585
}
9686
}
9787

98-
func (r *RateBuffer) Len() int { return r.stepRanges[0].numSamples }
99-
10088
func (r *RateBuffer) SampleCount() int {
10189
return r.stepRanges[0].sampleCount
10290
}
@@ -198,8 +186,6 @@ func (r *RateBuffer) Eval(ctx context.Context, _, _ float64, _ *int64) (float64,
198186
return extrapolatedRate(ctx, r.rateBuffer, numSamples, r.isCounter, r.isRate, r.evalTs, r.selectRange, r.offset)
199187
}
200188

201-
func (r *RateBuffer) ReadIntoLast(func(*Sample)) {}
202-
203189
func querySteps(o query.Options) int64 {
204190
// Instant evaluation is executed as a range evaluation with one step.
205191
if o.Step.Milliseconds() == 0 {

storage/prometheus/matrix_selector.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,7 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) {
183183
for currStep := 0; currStep < o.numSteps && seriesTs <= o.maxt; currStep++ {
184184
maxt := seriesTs - o.offset
185185
mint := maxt - o.selectRange
186-
187-
if err := scanner.selectPoints(mint, maxt, seriesTs, o.fhReader, o.isExtFunction); err != nil {
186+
if err := scanner.selectPoints(mint, maxt, seriesTs, o.fhReader, o.extLookbackDelta, o.isExtFunction); err != nil {
188187
return nil, err
189188
}
190189
// TODO(saswatamcode): Handle multi-arg functions for matrixSelectors.
@@ -307,6 +306,7 @@ func (o *matrixSelector) String() string {
307306
func (m *matrixScanner) selectPoints(
308307
mint, maxt, evalt int64,
309308
fh *histogram.FloatHistogram,
309+
extLookbackDelta int64,
310310
isExtFunction bool,
311311
) error {
312312
m.buffer.Reset(mint, evalt)
@@ -324,7 +324,12 @@ func (m *matrixScanner) selectPoints(
324324
mint = maxInt64(mint, m.buffer.MaxT()+1)
325325
}
326326

327-
appendedPointBeforeMint := m.buffer.Len() > 0
327+
var (
328+
// The sample that we add for x-functions, -1 is a canary value for the situation
329+
// where we have no sample in the extended lookback delta
330+
extSample = ringbuffer.Sample{T: -1}
331+
)
332+
328333
for valType := m.iterator.Next(); valType != chunkenc.ValNone; valType = m.iterator.Next() {
329334
switch valType {
330335
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
@@ -361,19 +366,17 @@ func (m *matrixScanner) selectPoints(
361366
m.lastSample.T, m.lastSample.V.F, m.lastSample.V.H = t, v, nil
362367
return nil
363368
}
364-
if isExtFunction {
365-
if t > mint || !appendedPointBeforeMint {
366-
m.buffer.Push(t, ringbuffer.Value{F: v})
367-
appendedPointBeforeMint = true
368-
} else {
369-
m.buffer.ReadIntoLast(func(s *ringbuffer.Sample) {
370-
s.T, s.V.F, s.V.H = t, v, nil
371-
})
372-
}
373-
} else {
374-
if t > mint {
375-
m.buffer.Push(t, ringbuffer.Value{F: v})
369+
if t > mint {
370+
if extSample.T != -1 && isExtFunction {
371+
m.buffer.Push(extSample.T, ringbuffer.Value{F: extSample.V.F})
372+
extSample.T = -1
376373
}
374+
m.buffer.Push(t, ringbuffer.Value{F: v})
375+
continue
376+
}
377+
if isExtFunction && t > mint-extLookbackDelta {
378+
extSample.T = t
379+
extSample.V.F = v
377380
}
378381
}
379382
}

0 commit comments

Comments
 (0)