-
Notifications
You must be signed in to change notification settings - Fork 3.8k
feat(engine): basic math expressions #19407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…nto spiridonov-rate-aggregate
…nto spiridonov-rate-aggregate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would appreciate if you could split the PR into the binop expression implementation and the rate() implementation.
…nto spiridonov-rate-aggregate
@chaudum I removed |
var vecAggType types.VectorAggregationType | ||
switch e.Operation { | ||
//case syntax.OpTypeCount: | ||
// vecAggType = types.VectorAggregationTypeCount | ||
case syntax.OpTypeSum: | ||
vecAggType = types.VectorAggregationTypeSum | ||
//case syntax.OpTypeMax: | ||
// vecAggType = types.VectorAggregationTypeMax | ||
//case syntax.OpTypeMin: | ||
// vecAggType = types.VectorAggregationTypeMin | ||
default: | ||
return nil, errUnimplemented | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that you did not change anything here, but could we follow the pattern that you introduced and move this into a separate function convertVectorAggType(op string) types.VectorAggregationType
?
Same for the RangeAggregationType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
end: 7200, | ||
interval: 5 * time.Minute, | ||
} | ||
t.Run(`sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"} |= "metric.go"[5m]))`, func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make name descriptive
t.Run(`sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"} |= "metric.go"[5m]))`, func(t *testing.T) { | |
t.Run("simple metric query", func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
t.Logf("\n%s\n", sb.String()) | ||
}) | ||
|
||
t.Run(`sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"}[5m]) / 300)`, func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make name descriptive
t.Run(`sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"}[5m]) / 300)`, func(t *testing.T) { | |
t.Run("binop metric query", func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
%7 = LT builtin.timestamp 1970-01-01T02:00:00Z | ||
%8 = SELECT %6 [predicate=%7] | ||
%9 = RANGE_AGGREGATION %8 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] | ||
%10 = DIV %9 300 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I wonder whether we should change the string representation of a literal to something like
%10 = DIV %9 300 | |
%10 = DIV %9 LITERAL(300) |
or
%10 = DIV %9 300 | |
%10 = DIV %9 INT64(300) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, overall. But I would make that change in pkg/engine/internal/types/literal.go
for each literal type, and that will cause a lot of diffs in all tests where literals are used (mostly in predicates). This is too much for this PR and kinda unrelated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's just a small thing regarding representation. Does not need to be addressed in this PR
var SupportedVectorAggregationTypes = []VectorAggregationType{ | ||
VectorAggregationTypeSum, VectorAggregationTypeMax, VectorAggregationTypeMin, VectorAggregationTypeCount, | ||
VectorAggregationTypeSum, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this anywhere used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not anymore. fixed. also removed SupportedRangeAggregationTypes because now the name is confusing with its only usage.
fields = append(fields, arrow.Field{ | ||
Name: fmt.Sprintf("float64.generated.input_%d", i), | ||
Type: types.Arrow.Float, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi: You can use the semconv package to generate the field definition, either by using semconv.FieldFromFQN()
or semconv.FieldFromIdent()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
) | ||
|
||
func NewMathExpressionPipeline(expr *physical.MathExpression, inputs []Pipeline, evaluator expressionEvaluator) *GenericPipeline { | ||
return newGenericPipeline(Local, func(ctx context.Context, inputs []Pipeline) (arrow.Record, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think your implementation of the pipeline is correct.
What this pipeline is doing is returning a new arrow.Record
with fields float64.generated.input_0
and timestamp_ns.builtin.timestamp
, instead of preserving the existing column names.
While this works for a specific subset of queries that have a vector aggregation without grouping, it does not work when there is a vector aggregation with grouping, e.g. sum by (cluster, namespace) (count_over_time({env="prod"} [1m])) / 100)
.


So instead of generating a new schema, you "only" need to apply the math function on the float64.generated.value
(semconv.ColumnIdentValue
) column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pipeline takes its inputs (just 1 for now, but multiple in general) and makes input_0
, input_1
etc columns to evaluate the given math expression. The math expression is in the form of input_0 / 90 * 199 + input_1
regardless of what exactly those input pipelines are. After evaluation is done this pipeline returns two columns value
and ts
. This is a bug that I miss other columns (for groupings) in the output, and I will fix this. But it does not return input_0
in any case.
type genericFloat64Function[E arrayType[T], T comparable] struct { | ||
eval func(a, b T) (float64, error) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be expressed as a generic function
type genericFloat64Function[E arrayType[T], T comparable] struct { | |
eval func(a, b T) (float64, error) | |
} | |
type genericFunction[E arrayType[T], T comparable, R any] struct { | |
eval func(a, b T) (R, error) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not found a clean way to do that in a generic way. There are some pretty specific type coercions based on the result type, plus pow
and mod
are implemented totally differently.
lhsArr, ok := lhs.ToArray().(E) | ||
if !ok { | ||
return nil, arrow.ErrType | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The array needs to be released
lhsArr, ok := lhs.ToArray().(E) | |
if !ok { | |
return nil, arrow.ErrType | |
} | |
lhsArr, ok := lhs.ToArray().(E) | |
if !ok { | |
return nil, arrow.ErrType | |
} | |
defer lhsArr.Release() |
Same for rhsArr
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I basically reverted #19496 and not it is not needed anymore.
defer valCol.Release() | ||
|
||
schema := batch.Schema() | ||
valueCol := semconv.NewIdentifier(types.ColumnNameGeneratedValue, types.ColumnTypeGenerated, types.Loki.Float) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of valueCol
you can use semconv.ColumnIdentValue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
fields := make([]arrow.Field, 0, len(inputs)) | ||
for i := range inputs { | ||
fields = append(fields, semconv.FieldFromFQN(fmt.Sprintf("float64.generated.input_%d", i), false)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bit confusing that for the fields
you iterate over the inputs, but for the cols
you do an explicit single append().
I think it would be clearer if you also only append a single field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
schema := arrow.NewSchema([]arrow.Field{ | ||
semconv.FieldFromFQN(colTs, false), | ||
semconv.FieldFromFQN(colVal, false), | ||
}, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you extend the test so it also has additional (grouping) columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added two more labels into the test
…nto spiridonov-rate-aggregate
I'm having a hard time following the Also: it seems like MathExpression could be represented as a projection. Do we need MathExpression as a distinct concept? |
If an expression is, for example, I think there should be a separate concept for math expressions, especially when more complex cases are gonna be implemented (such as cc @rfratto |
Thanks, that makes sense if you're trying to prepare for supporting math over two vectors. I suspect computations over two vectors (in a way that's compatible with LogQL/PromQL) is going to be trickier than it seems. I believe, in relational algebra terms, they're expressed as a combination of an inner join and a projection. So, given a query like
A physical plan mapping literally to the algebra could be
This needs to explicitly be an inner join, since LogQL's metric queries require the sample to exist on both side of the expression (this matches PromQL's behaviour). ExampleFor example, the two inputs of OuterJoin
and
is joined into
and is projected to
I think it's probably okay if we wanted to have a node which combines the work of projection and inner joins, though I do think it's possible to represent these operations using projections, which we will have a separate node for, and separating them out may be easier to understand in the plans. All that said, I do wonder if math on two vectors is going to require a lot more thought. Would we be able to simplify the logic here if we descoped that from our consideration? |
What this PR does / why we need it:
In order to add support for
rate()
aggregation function, which will be implemented ascount_over_time/$interval
, I had to add support for math expressions. Binary expressions with only single input are supported for now. Things likesum_over_time/count_over_time
will be implemented later. There is an optimization to merge several math expression nodes into one in the physical plan.Minor:
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Diff for
pkg/engine/internal/planner/logical/planner.go
is ugly here, it is better to view the new file as the whole to understand it better. I basically split one large function into 3 pieces without changing much in that logic.Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR