Skip to content

Commit 1a4cb60

Browse files
authored
Mimir query engine: add support for unary negation (#9196)
* Add failing tests * Add feature toggle * Add support for unary negation of instant vectors # Conflicts: # pkg/streamingpromql/query.go * Add support for unary negation of scalars * Add changelog entry * Address PR feedback: add further test cases
1 parent 291f010 commit 1a4cb60

File tree

12 files changed

+173
-2
lines changed

12 files changed

+173
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* [CHANGE] Querier: allow wrapping errors with context errors only when the former actually correspond to `context.Canceled` and `context.DeadlineExceeded`. #9175
3333
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
3434
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
35-
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9194
35+
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9194 #9196
3636
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
3737
* What it is:
3838
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.

cmd/mimir/config-descriptor.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2043,6 +2043,17 @@
20432043
"fieldFlag": "querier.mimir-query-engine.enable-scalars",
20442044
"fieldType": "boolean",
20452045
"fieldCategory": "experimental"
2046+
},
2047+
{
2048+
"kind": "field",
2049+
"name": "enable_unary_negation",
2050+
"required": false,
2051+
"desc": "Enable support for unary negation in Mimir's query engine. Only applies if the Mimir query engine is in use.",
2052+
"fieldValue": null,
2053+
"fieldDefaultValue": true,
2054+
"fieldFlag": "querier.mimir-query-engine.enable-unary-negation",
2055+
"fieldType": "boolean",
2056+
"fieldCategory": "experimental"
20462057
}
20472058
],
20482059
"fieldValue": null,

cmd/mimir/help-all.txt.tmpl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1915,6 +1915,8 @@ Usage of ./cmd/mimir/mimir:
19151915
[experimental] Enable support for ..._over_time functions in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
19161916
-querier.mimir-query-engine.enable-scalars
19171917
[experimental] Enable support for scalars in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
1918+
-querier.mimir-query-engine.enable-unary-negation
1919+
[experimental] Enable support for unary negation in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
19181920
-querier.minimize-ingester-requests
19191921
If true, when querying ingesters, only the minimum required ingesters required to reach quorum will be queried initially, with other ingesters queried only if needed due to failures from the initial set of ingesters. Enabling this option reduces resource consumption for the happy path at the cost of increased latency for the unhappy path. (default true)
19201922
-querier.minimize-ingester-requests-hedging-delay duration

docs/sources/mimir/configure/configuration-parameters/index.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1537,6 +1537,11 @@ mimir_query_engine:
15371537
# applies if the Mimir query engine is in use.
15381538
# CLI flag: -querier.mimir-query-engine.enable-scalars
15391539
[enable_scalars: <boolean> | default = true]
1540+
1541+
# (experimental) Enable support for unary negation in Mimir's query engine.
1542+
# Only applies if the Mimir query engine is in use.
1543+
# CLI flag: -querier.mimir-query-engine.enable-unary-negation
1544+
[enable_unary_negation: <boolean> | default = true]
15401545
```
15411546

15421547
### frontend

pkg/streamingpromql/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type FeatureToggles struct {
2323
EnableOffsetModifier bool `yaml:"enable_offset_modifier" category:"experimental"`
2424
EnableOverTimeFunctions bool `yaml:"enable_over_time_functions" category:"experimental"`
2525
EnableScalars bool `yaml:"enable_scalars" category:"experimental"`
26+
EnableUnaryNegation bool `yaml:"enable_unary_negation" category:"experimental"`
2627
}
2728

2829
var overTimeFunctionNames = []string{
@@ -43,6 +44,7 @@ var EnableAllFeatures = FeatureToggles{
4344
true,
4445
true,
4546
true,
47+
true,
4648
}
4749

4850
func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) {
@@ -51,4 +53,5 @@ func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) {
5153
f.BoolVar(&t.EnableOffsetModifier, "querier.mimir-query-engine.enable-offset-modifier", true, "Enable support for offset modifier in Mimir's query engine. Only applies if the Mimir query engine is in use.")
5254
f.BoolVar(&t.EnableOverTimeFunctions, "querier.mimir-query-engine.enable-over-time-functions", true, "Enable support for ..._over_time functions in Mimir's query engine. Only applies if the Mimir query engine is in use.")
5355
f.BoolVar(&t.EnableScalars, "querier.mimir-query-engine.enable-scalars", true, "Enable support for scalars in Mimir's query engine. Only applies if the Mimir query engine is in use.")
56+
f.BoolVar(&t.EnableUnaryNegation, "querier.mimir-query-engine.enable-unary-negation", true, "Enable support for unary negation in Mimir's query engine. Only applies if the Mimir query engine is in use.")
5457
}

pkg/streamingpromql/engine_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
5151
`count_values("foo", metric{})`: "'count_values' aggregation with parameter",
5252
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr for range vectors",
5353
"quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function",
54-
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr for instant vectors",
5554
}
5655

5756
for expression, expectedError := range unsupportedExpressions {
@@ -118,6 +117,17 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) {
118117
requireRangeQueryIsUnsupported(t, featureToggles, "2", "scalar values")
119118
requireInstantQueryIsUnsupported(t, featureToggles, "2", "scalar values")
120119
})
120+
121+
t.Run("unary negation", func(t *testing.T) {
122+
featureToggles := EnableAllFeatures
123+
featureToggles.EnableUnaryNegation = false
124+
125+
requireRangeQueryIsUnsupported(t, featureToggles, "-sum(metric{})", "unary negation of instant vectors")
126+
requireInstantQueryIsUnsupported(t, featureToggles, "-sum(metric{})", "unary negation of instant vectors")
127+
128+
requireRangeQueryIsUnsupported(t, featureToggles, "-(1)", "unary negation of scalars")
129+
requireInstantQueryIsUnsupported(t, featureToggles, "-(1)", "unary negation of scalars")
130+
})
121131
}
122132

123133
func requireRangeQueryIsUnsupported(t *testing.T, featureToggles FeatureToggles, expression string, expectedError string) {

pkg/streamingpromql/functions.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,3 +229,14 @@ func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumpti
229229

230230
return operators.NewInstantVectorToScalar(inner, start, end, interval, memoryConsumptionTracker, expressionPosition), nil
231231
}
232+
233+
func unaryNegationOfInstantVectorOperatorFactory(inner types.InstantVectorOperator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange) types.InstantVectorOperator {
234+
f := functions.FunctionOverInstantVector{
235+
SeriesDataFunc: functions.UnaryNegation,
236+
SeriesMetadataFunc: functions.DropSeriesName,
237+
NeedsSeriesDeduplication: true,
238+
}
239+
240+
o := operators.NewFunctionOverInstantVector(inner, memoryConsumptionTracker, f, expressionPosition)
241+
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
242+
}

pkg/streamingpromql/functions/math.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ package functions
44

55
import (
66
"math"
7+
8+
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
9+
"github.com/grafana/mimir/pkg/streamingpromql/types"
710
)
811

912
var Abs = FloatTransformationDropHistogramsFunc(math.Abs)
@@ -48,3 +51,14 @@ var Sgn = FloatTransformationDropHistogramsFunc(func(f float64) float64 {
4851
// Otherwise, if the value is 0, we should return 0.
4952
return f
5053
})
54+
55+
var UnaryNegation InstantVectorSeriesFunction = func(seriesData types.InstantVectorSeriesData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
56+
for i := range seriesData.Floats {
57+
seriesData.Floats[i].F = -seriesData.Floats[i].F
58+
}
59+
60+
// Prometheus' engine currently leaves histograms as-is for unary negation, so we do the same.
61+
// See https://github.com/prometheus/prometheus/pull/14821 for more discussion of this.
62+
63+
return seriesData, nil
64+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// SPDX-License-Identifier: AGPL-3.0-only
2+
3+
package operators
4+
5+
import (
6+
"context"
7+
8+
"github.com/prometheus/prometheus/promql/parser/posrange"
9+
10+
"github.com/grafana/mimir/pkg/streamingpromql/types"
11+
)
12+
13+
type UnaryNegationOfScalar struct {
14+
Inner types.ScalarOperator
15+
expressionPosition posrange.PositionRange
16+
}
17+
18+
var _ types.ScalarOperator = &UnaryNegationOfScalar{}
19+
20+
func NewUnaryNegationOfScalar(inner types.ScalarOperator, expressionPosition posrange.PositionRange) *UnaryNegationOfScalar {
21+
return &UnaryNegationOfScalar{
22+
Inner: inner,
23+
expressionPosition: expressionPosition,
24+
}
25+
}
26+
27+
func (u *UnaryNegationOfScalar) GetValues(ctx context.Context) (types.ScalarData, error) {
28+
values, err := u.Inner.GetValues(ctx)
29+
if err != nil {
30+
return types.ScalarData{}, err
31+
}
32+
33+
for i := range values.Samples {
34+
values.Samples[i].F = -values.Samples[i].F
35+
}
36+
37+
return values, nil
38+
}
39+
40+
func (u *UnaryNegationOfScalar) ExpressionPosition() posrange.PositionRange {
41+
return u.expressionPosition
42+
}
43+
44+
func (u *UnaryNegationOfScalar) Close() {
45+
u.Inner.Close()
46+
}

pkg/streamingpromql/query.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,21 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV
244244
}
245245

246246
return operators.NewVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, q.memoryConsumptionTracker, q.annotations, e.PositionRange())
247+
case *parser.UnaryExpr:
248+
if e.Op != parser.SUB {
249+
return nil, compat.NewNotSupportedError(fmt.Sprintf("unary expression with '%s'", e.Op))
250+
}
251+
252+
if !q.engine.featureToggles.EnableUnaryNegation {
253+
return nil, compat.NewNotSupportedError("unary negation of instant vectors")
254+
}
255+
256+
inner, err := q.convertToInstantVectorOperator(e.Expr)
257+
if err != nil {
258+
return nil, err
259+
}
260+
261+
return unaryNegationOfInstantVectorOperatorFactory(inner, q.memoryConsumptionTracker, e.PositionRange()), nil
247262

248263
case *parser.StepInvariantExpr:
249264
// One day, we'll do something smarter here.
@@ -338,6 +353,23 @@ func (q *Query) convertToScalarOperator(expr parser.Expr) (types.ScalarOperator,
338353

339354
case *parser.Call:
340355
return q.convertFunctionCallToScalarOperator(e)
356+
357+
case *parser.UnaryExpr:
358+
if e.Op != parser.SUB {
359+
return nil, compat.NewNotSupportedError(fmt.Sprintf("unary expression with '%s'", e.Op))
360+
}
361+
362+
if !q.engine.featureToggles.EnableUnaryNegation {
363+
return nil, compat.NewNotSupportedError("unary negation of scalars")
364+
}
365+
366+
inner, err := q.convertToScalarOperator(e.Expr)
367+
if err != nil {
368+
return nil, err
369+
}
370+
371+
return operators.NewUnaryNegationOfScalar(inner, e.PositionRange()), nil
372+
341373
case *parser.StepInvariantExpr:
342374
// One day, we'll do something smarter here.
343375
return q.convertToScalarOperator(e.Expr)

0 commit comments

Comments
 (0)