Skip to content

Commit dbcf9a8

Browse files
authored
Mimir query engine: add support for binary operations between instant vectors and scalars (#9140)
* Rename `BinaryOperation` to `VectorVectorBinaryOperation` * Initial commit * Enable newly supported upstream test cases * Add benchmark * Add changelog entry * Address PR feedback: move comment to more sensible location, reduce some duplication * Address PR feedback: don't reuse `opFunc` name * Address PR feedback: add comment explaining why we call `GetValues` in `SeriesMetadata` * Address PR feedback: add tests for other arithmetic operations * Address PR feedback: add native histogram benchmark
1 parent e7706d5 commit dbcf9a8

14 files changed

+536
-224
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* [CHANGE] Querier: allow wrapping errors with context errors only when the former actually correspond to `context.Canceled` and `context.DeadlineExceeded`. #9175
3333
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
3434
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
35-
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9145 #9191 #9194
35+
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9194
3636
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
3737
* What it is:
3838
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.

pkg/streamingpromql/benchmarks/benchmarks.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,12 @@ func TestCases(metricSizes []int) []BenchCase {
148148
{
149149
Expr: "nh_X / a_X",
150150
},
151+
{
152+
Expr: "2 * a_X",
153+
},
154+
{
155+
Expr: "nh_X / 2",
156+
},
151157
// Test the case where one side of a binary operation has many more series than the other.
152158
{
153159
Expr: `a_100{l=~"[13579]."} - b_100`,

pkg/streamingpromql/engine_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
4242
// different cases and make sure we produce a reasonable error message when these cases are encountered.
4343
unsupportedExpressions := map[string]string{
4444
"1 + 2": "binary expression between two scalars",
45-
"1 + metric{}": "binary expression between scalar and instant vector",
46-
"metric{} + 1": "binary expression between scalar and instant vector",
4745
"metric{} < other_metric{}": "binary expression with '<'",
4846
"metric{} or other_metric{}": "binary expression with many-to-many matching",
4947
"metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching",

pkg/streamingpromql/operators/operator_buffer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/grafana/mimir/pkg/streamingpromql/types"
1010
)
1111

12-
// InstantVectorOperatorBuffer buffers series data until it is needed by BinaryOperation.
12+
// InstantVectorOperatorBuffer buffers series data until it is needed by an operator.
1313
//
1414
// For example, if this buffer is being used for a binary operation and the source operator produces series in order A, B, C,
1515
// but their corresponding output series from the binary operation are in order B, A, C, InstantVectorOperatorBuffer
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// SPDX-License-Identifier: AGPL-3.0-only
2+
3+
package operators
4+
5+
import (
6+
"context"
7+
"fmt"
8+
9+
"github.com/prometheus/prometheus/model/histogram"
10+
"github.com/prometheus/prometheus/promql"
11+
"github.com/prometheus/prometheus/promql/parser"
12+
"github.com/prometheus/prometheus/promql/parser/posrange"
13+
"github.com/prometheus/prometheus/util/annotations"
14+
15+
"github.com/grafana/mimir/pkg/streamingpromql/compat"
16+
"github.com/grafana/mimir/pkg/streamingpromql/functions"
17+
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
18+
"github.com/grafana/mimir/pkg/streamingpromql/types"
19+
)
20+
21+
// VectorScalarBinaryOperation represents a binary operation between an instant vector and a scalar such as "<expr> + 2" or "3 * <expr>".
22+
type VectorScalarBinaryOperation struct {
23+
Scalar types.ScalarOperator
24+
Vector types.InstantVectorOperator
25+
ScalarIsLeftSide bool
26+
Op parser.ItemType
27+
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker
28+
29+
start int64 // Milliseconds since Unix epoch
30+
end int64 // Milliseconds since Unix epoch
31+
interval int64 // In milliseconds
32+
stepCount int
33+
34+
opFunc vectorScalarBinaryOperationFunc
35+
36+
expressionPosition posrange.PositionRange
37+
emitAnnotation functions.EmitAnnotationFunc
38+
scalarData types.ScalarData
39+
vectorIterator types.InstantVectorSeriesDataIterator
40+
}
41+
42+
type vectorScalarBinaryOperationFunc func(scalar float64, vectorF float64, vectorH *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool, error)
43+
44+
func NewVectorScalarBinaryOperation(
45+
scalar types.ScalarOperator,
46+
vector types.InstantVectorOperator,
47+
scalarIsLeftSide bool,
48+
op parser.ItemType,
49+
start int64,
50+
end int64,
51+
interval int64,
52+
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
53+
annotations *annotations.Annotations,
54+
expressionPosition posrange.PositionRange,
55+
) (*VectorScalarBinaryOperation, error) {
56+
f := arithmeticOperationFuncs[op]
57+
if f == nil {
58+
return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op))
59+
}
60+
61+
b := &VectorScalarBinaryOperation{
62+
Scalar: scalar,
63+
Vector: vector,
64+
ScalarIsLeftSide: scalarIsLeftSide,
65+
Op: op,
66+
MemoryConsumptionTracker: memoryConsumptionTracker,
67+
68+
start: start,
69+
end: end,
70+
interval: interval,
71+
stepCount: stepCount(start, end, interval),
72+
73+
expressionPosition: expressionPosition,
74+
}
75+
76+
b.emitAnnotation = func(generator functions.AnnotationGenerator) {
77+
annotations.Add(generator("", expressionPosition))
78+
}
79+
80+
if b.ScalarIsLeftSide {
81+
b.opFunc = func(scalar float64, vectorF float64, vectorH *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool, error) {
82+
return f(scalar, vectorF, nil, vectorH)
83+
}
84+
} else {
85+
b.opFunc = func(scalar float64, vectorF float64, vectorH *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool, error) {
86+
return f(vectorF, scalar, vectorH, nil)
87+
}
88+
}
89+
90+
return b, nil
91+
}
92+
93+
func (v *VectorScalarBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
94+
// Get the scalar values once, now, rather than having to do this later in NextSeries.
95+
var err error
96+
v.scalarData, err = v.Scalar.GetValues(ctx)
97+
if err != nil {
98+
return nil, err
99+
}
100+
101+
metadata, err := v.Vector.SeriesMetadata(ctx)
102+
if err != nil {
103+
return nil, err
104+
}
105+
106+
// We don't need to do deduplication and merging of series in this operator: we expect that this operator
107+
// is wrapped in a DeduplicateAndMerge.
108+
metadata, err = functions.DropSeriesName(metadata, v.MemoryConsumptionTracker)
109+
if err != nil {
110+
return nil, err
111+
}
112+
113+
return metadata, nil
114+
}
115+
116+
func (v *VectorScalarBinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
117+
series, err := v.Vector.NextSeries(ctx)
118+
if err != nil {
119+
return types.InstantVectorSeriesData{}, err
120+
}
121+
122+
returnInputFPointSlice := true
123+
returnInputHPointSlice := true
124+
125+
// We cannot re-use any slices when the series contains a mix of floats and histograms.
126+
// For example, imagine the series has [histogram, histogram, float, float, histogram] and we're performing the
127+
// operation "2 + series".
128+
// Histograms are treated as 0, so each input point produces a float.
129+
// If we reuse the input series' FPoint slice, we'll overwrite the later float points while processing the earlier
130+
// histogram points.
131+
// This shouldn't happen often, so we don't mind the cost of allocating a new slice in this case.
132+
// It should be pretty uncommon that metric contains both histograms and floats, so we will accept the cost of a new
133+
// slice.
134+
haveMixedFloatsAndHistograms := len(series.Histograms) > 0 && len(series.Floats) > 0
135+
pointCount := len(series.Histograms) + len(series.Floats)
136+
137+
var fPoints []promql.FPoint
138+
var hPoints []promql.HPoint
139+
140+
prepareFPointSlice := func() error {
141+
if haveMixedFloatsAndHistograms || cap(series.Floats) < pointCount {
142+
// We have to get a new slice.
143+
var err error
144+
fPoints, err = types.FPointSlicePool.Get(pointCount, v.MemoryConsumptionTracker)
145+
return err
146+
}
147+
148+
// We can reuse the existing slice.
149+
returnInputFPointSlice = false
150+
fPoints = series.Floats[:0]
151+
return nil
152+
}
153+
154+
prepareHPointSlice := func() error {
155+
if haveMixedFloatsAndHistograms || cap(series.Histograms) < pointCount {
156+
// We have to get a new slice.
157+
var err error
158+
hPoints, err = types.HPointSlicePool.Get(pointCount, v.MemoryConsumptionTracker)
159+
return err
160+
}
161+
162+
// We can reuse the existing slice.
163+
returnInputHPointSlice = false
164+
hPoints = series.Histograms[:0]
165+
return nil
166+
}
167+
168+
v.vectorIterator.Reset(series)
169+
170+
for {
171+
t, vectorF, vectorH, ok := v.vectorIterator.Next()
172+
173+
if !ok {
174+
// We are done.
175+
break
176+
}
177+
178+
scalarIdx := (t - v.start) / v.interval // Scalars always have a value at every step, so we can just compute the index of the corresponding scalar value from the timestamp.
179+
scalarValue := v.scalarData.Samples[scalarIdx].F
180+
181+
f, h, ok, err := v.opFunc(scalarValue, vectorF, vectorH)
182+
if err != nil {
183+
err = functions.NativeHistogramErrorToAnnotation(err, v.emitAnnotation)
184+
if err == nil {
185+
// Error was converted to an annotation, continue without emitting a sample here.
186+
ok = false
187+
} else {
188+
return types.InstantVectorSeriesData{}, err
189+
}
190+
}
191+
192+
if ok {
193+
if h != nil {
194+
if hPoints == nil {
195+
// First histogram for this series, get a slice for it.
196+
if err := prepareHPointSlice(); err != nil {
197+
return types.InstantVectorSeriesData{}, err
198+
}
199+
}
200+
201+
hPoints = append(hPoints, promql.HPoint{T: t, H: h})
202+
} else {
203+
// We have a float value.
204+
if fPoints == nil {
205+
// First float for this series, get a slice for it.
206+
if err := prepareFPointSlice(); err != nil {
207+
return types.InstantVectorSeriesData{}, err
208+
}
209+
}
210+
211+
fPoints = append(fPoints, promql.FPoint{T: t, F: f})
212+
}
213+
}
214+
}
215+
216+
if returnInputFPointSlice {
217+
types.FPointSlicePool.Put(series.Floats, v.MemoryConsumptionTracker)
218+
}
219+
220+
if returnInputHPointSlice {
221+
types.HPointSlicePool.Put(series.Histograms, v.MemoryConsumptionTracker)
222+
}
223+
224+
return types.InstantVectorSeriesData{
225+
Floats: fPoints,
226+
Histograms: hPoints,
227+
}, nil
228+
}
229+
230+
func (v *VectorScalarBinaryOperation) ExpressionPosition() posrange.PositionRange {
231+
return v.expressionPosition
232+
}
233+
234+
func (v *VectorScalarBinaryOperation) Close() {
235+
v.Scalar.Close()
236+
v.Vector.Close()
237+
types.FPointSlicePool.Put(v.scalarData.Samples, v.MemoryConsumptionTracker)
238+
}
239+
240+
var _ types.InstantVectorOperator = &VectorScalarBinaryOperation{}

0 commit comments

Comments
 (0)