diff --git a/pkg/engine/internal/executor/executor.go b/pkg/engine/internal/executor/executor.go index 6d2ee591c1ffc..a62c89e64bd32 100644 --- a/pkg/engine/internal/executor/executor.go +++ b/pkg/engine/internal/executor/executor.go @@ -91,6 +91,8 @@ func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline { return tracePipeline("physical.VectorAggregation", c.executeVectorAggregation(ctx, n, inputs)) case *physical.ParseNode: return tracePipeline("physical.ParseNode", c.executeParse(ctx, n, inputs)) + case *physical.MathExpression: + return tracePipeline("physical.MathExpression", c.executeMathExpression(ctx, n, inputs)) case *physical.ColumnCompat: return tracePipeline("physical.ColumnCompat", c.executeColumnCompat(ctx, n, inputs)) case *physical.Parallelize: @@ -376,6 +378,18 @@ func (c *Context) executeParse(ctx context.Context, parse *physical.ParseNode, i return NewParsePipeline(parse, inputs[0], allocator) } +func (c *Context) executeMathExpression(ctx context.Context, expr *physical.MathExpression, inputs []Pipeline) Pipeline { + if len(inputs) == 0 { + return emptyPipeline() + } + + if len(inputs) > 1 { + return errorPipeline(ctx, fmt.Errorf("math expression expects exactly one input, got %d", len(inputs))) + } + + return NewMathExpressionPipeline(expr, inputs[0], c.evaluator) +} + func (c *Context) executeColumnCompat(ctx context.Context, compat *physical.ColumnCompat, inputs []Pipeline) Pipeline { if len(inputs) == 0 { return emptyPipeline() diff --git a/pkg/engine/internal/executor/expressions.go b/pkg/engine/internal/executor/expressions.go index 1bf91df56bb67..0ec31d61c799f 100644 --- a/pkg/engine/internal/executor/expressions.go +++ b/pkg/engine/internal/executor/expressions.go @@ -259,7 +259,7 @@ var _ ColumnVector = (*Array)(nil) // ToArray implements ColumnVector. func (a *Array) ToArray() arrow.Array { - a.array.Retain() + // this should already has 1 ref counter return a.array } diff --git a/pkg/engine/internal/executor/expressions_test.go b/pkg/engine/internal/executor/expressions_test.go index 5b054472db9e1..65079c9a3acfc 100644 --- a/pkg/engine/internal/executor/expressions_test.go +++ b/pkg/engine/internal/executor/expressions_test.go @@ -216,7 +216,6 @@ func TestEvaluateBinaryExpression(t *testing.T) { func collectBooleanColumnVector(vec ColumnVector) []bool { res := make([]bool, 0, vec.Len()) arr := vec.ToArray().(*array.Boolean) - defer arr.Release() for i := range int(vec.Len()) { res = append(res, arr.Value(i)) } @@ -327,10 +326,8 @@ null,null,null` colVec, err := e.eval(colExpr, record) require.NoError(t, err) require.IsType(t, &CoalesceVector{}, colVec) - defer colVec.Release() arr := colVec.ToArray() - defer arr.Release() require.IsType(t, &array.String{}, arr) stringArr := arr.(*array.String) diff --git a/pkg/engine/internal/executor/filter.go b/pkg/engine/internal/executor/filter.go index 01a6e309a14e3..e8daffa373d6c 100644 --- a/pkg/engine/internal/executor/filter.go +++ b/pkg/engine/internal/executor/filter.go @@ -28,8 +28,6 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres if err != nil { return nil, err } - defer vec.Release() - arr := vec.ToArray() // boolean filters are only used for filtering; they're not returned // and must be released diff --git a/pkg/engine/internal/executor/functions.go b/pkg/engine/internal/executor/functions.go index 6566d6a1398fa..2a9d9ffcad278 100644 --- a/pkg/engine/internal/executor/functions.go +++ b/pkg/engine/internal/executor/functions.go @@ -1,6 +1,7 @@ package executor import ( + "math" "regexp" "strings" @@ -18,52 +19,71 @@ var ( ) func init() { + // Functions for [types.BinaryOpDiv] + binaryFunctions.register(types.BinaryOpDiv, arrow.PrimitiveTypes.Float64, &genericFloat64Function[*array.Float64, float64]{eval: func(a, b float64) (float64, error) { return a / b, nil }}) + binaryFunctions.register(types.BinaryOpDiv, arrow.PrimitiveTypes.Int64, &genericFloat64Function[*array.Int64, int64]{eval: func(a, b int64) (float64, error) { return float64(a) / float64(b), nil }}) + // Functions for [types.BinaryOpAdd] + binaryFunctions.register(types.BinaryOpAdd, arrow.PrimitiveTypes.Float64, &genericFloat64Function[*array.Float64, float64]{eval: func(a, b float64) (float64, error) { return a + b, nil }}) + binaryFunctions.register(types.BinaryOpAdd, arrow.PrimitiveTypes.Int64, &genericFloat64Function[*array.Int64, int64]{eval: func(a, b int64) (float64, error) { return float64(a) + float64(b), nil }}) + // Functions for [types.BinaryOpSub] + binaryFunctions.register(types.BinaryOpSub, arrow.PrimitiveTypes.Float64, &genericFloat64Function[*array.Float64, float64]{eval: func(a, b float64) (float64, error) { return a - b, nil }}) + binaryFunctions.register(types.BinaryOpSub, arrow.PrimitiveTypes.Int64, &genericFloat64Function[*array.Int64, int64]{eval: func(a, b int64) (float64, error) { return float64(a) - float64(b), nil }}) + // Functions for [types.BinaryOpMul] + binaryFunctions.register(types.BinaryOpMul, arrow.PrimitiveTypes.Float64, &genericFloat64Function[*array.Float64, float64]{eval: func(a, b float64) (float64, error) { return a * b, nil }}) + binaryFunctions.register(types.BinaryOpMul, arrow.PrimitiveTypes.Int64, &genericFloat64Function[*array.Int64, int64]{eval: func(a, b int64) (float64, error) { return float64(a) * float64(b), nil }}) + // Functions for [types.BinaryOpMod] + binaryFunctions.register(types.BinaryOpMod, arrow.PrimitiveTypes.Float64, &genericFloat64Function[*array.Float64, float64]{eval: func(a, b float64) (float64, error) { return math.Mod(a, b), nil }}) + binaryFunctions.register(types.BinaryOpMod, arrow.PrimitiveTypes.Int64, &genericFloat64Function[*array.Int64, int64]{eval: func(a, b int64) (float64, error) { return float64(a % b), nil }}) + // Functions for [types.BinaryOpPow] + binaryFunctions.register(types.BinaryOpPow, arrow.PrimitiveTypes.Float64, &genericFloat64Function[*array.Float64, float64]{eval: func(a, b float64) (float64, error) { return math.Pow(a, b), nil }}) + binaryFunctions.register(types.BinaryOpPow, arrow.PrimitiveTypes.Int64, &genericFloat64Function[*array.Int64, int64]{eval: func(a, b int64) (float64, error) { return math.Pow(float64(a), float64(b)), nil }}) + // Functions for [types.BinaryOpEq] - binaryFunctions.register(types.BinaryOpEq, arrow.FixedWidthTypes.Boolean, &genericFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return a == b, nil }}) - binaryFunctions.register(types.BinaryOpEq, arrow.BinaryTypes.String, &genericFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a == b, nil }}) - binaryFunctions.register(types.BinaryOpEq, arrow.PrimitiveTypes.Int64, &genericFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a == b, nil }}) - binaryFunctions.register(types.BinaryOpEq, arrow.FixedWidthTypes.Timestamp_ns, &genericFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a == b, nil }}) - binaryFunctions.register(types.BinaryOpEq, arrow.PrimitiveTypes.Float64, &genericFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a == b, nil }}) + binaryFunctions.register(types.BinaryOpEq, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return a == b, nil }}) + binaryFunctions.register(types.BinaryOpEq, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a == b, nil }}) + binaryFunctions.register(types.BinaryOpEq, arrow.PrimitiveTypes.Int64, &genericBoolFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a == b, nil }}) + binaryFunctions.register(types.BinaryOpEq, arrow.FixedWidthTypes.Timestamp_ns, &genericBoolFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a == b, nil }}) + binaryFunctions.register(types.BinaryOpEq, arrow.PrimitiveTypes.Float64, &genericBoolFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a == b, nil }}) // Functions for [types.BinaryOpNeq] - binaryFunctions.register(types.BinaryOpNeq, arrow.FixedWidthTypes.Boolean, &genericFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return a != b, nil }}) - binaryFunctions.register(types.BinaryOpNeq, arrow.BinaryTypes.String, &genericFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a != b, nil }}) - binaryFunctions.register(types.BinaryOpNeq, arrow.PrimitiveTypes.Int64, &genericFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a != b, nil }}) - binaryFunctions.register(types.BinaryOpNeq, arrow.FixedWidthTypes.Timestamp_ns, &genericFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a != b, nil }}) - binaryFunctions.register(types.BinaryOpNeq, arrow.PrimitiveTypes.Float64, &genericFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a != b, nil }}) + binaryFunctions.register(types.BinaryOpNeq, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return a != b, nil }}) + binaryFunctions.register(types.BinaryOpNeq, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a != b, nil }}) + binaryFunctions.register(types.BinaryOpNeq, arrow.PrimitiveTypes.Int64, &genericBoolFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a != b, nil }}) + binaryFunctions.register(types.BinaryOpNeq, arrow.FixedWidthTypes.Timestamp_ns, &genericBoolFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a != b, nil }}) + binaryFunctions.register(types.BinaryOpNeq, arrow.PrimitiveTypes.Float64, &genericBoolFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a != b, nil }}) // Functions for [types.BinaryOpGt] - binaryFunctions.register(types.BinaryOpGt, arrow.FixedWidthTypes.Boolean, &genericFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return boolToInt(a) > boolToInt(b), nil }}) - binaryFunctions.register(types.BinaryOpGt, arrow.BinaryTypes.String, &genericFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a > b, nil }}) - binaryFunctions.register(types.BinaryOpGt, arrow.PrimitiveTypes.Int64, &genericFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a > b, nil }}) - binaryFunctions.register(types.BinaryOpGt, arrow.FixedWidthTypes.Timestamp_ns, &genericFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a > b, nil }}) - binaryFunctions.register(types.BinaryOpGt, arrow.PrimitiveTypes.Float64, &genericFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a > b, nil }}) + binaryFunctions.register(types.BinaryOpGt, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return boolToInt(a) > boolToInt(b), nil }}) + binaryFunctions.register(types.BinaryOpGt, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a > b, nil }}) + binaryFunctions.register(types.BinaryOpGt, arrow.PrimitiveTypes.Int64, &genericBoolFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a > b, nil }}) + binaryFunctions.register(types.BinaryOpGt, arrow.FixedWidthTypes.Timestamp_ns, &genericBoolFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a > b, nil }}) + binaryFunctions.register(types.BinaryOpGt, arrow.PrimitiveTypes.Float64, &genericBoolFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a > b, nil }}) // Functions for [types.BinaryOpGte] - binaryFunctions.register(types.BinaryOpGte, arrow.FixedWidthTypes.Boolean, &genericFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return boolToInt(a) >= boolToInt(b), nil }}) - binaryFunctions.register(types.BinaryOpGte, arrow.BinaryTypes.String, &genericFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a >= b, nil }}) - binaryFunctions.register(types.BinaryOpGte, arrow.PrimitiveTypes.Int64, &genericFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a >= b, nil }}) - binaryFunctions.register(types.BinaryOpGte, arrow.FixedWidthTypes.Timestamp_ns, &genericFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a >= b, nil }}) - binaryFunctions.register(types.BinaryOpGte, arrow.PrimitiveTypes.Float64, &genericFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a >= b, nil }}) + binaryFunctions.register(types.BinaryOpGte, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return boolToInt(a) >= boolToInt(b), nil }}) + binaryFunctions.register(types.BinaryOpGte, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a >= b, nil }}) + binaryFunctions.register(types.BinaryOpGte, arrow.PrimitiveTypes.Int64, &genericBoolFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a >= b, nil }}) + binaryFunctions.register(types.BinaryOpGte, arrow.FixedWidthTypes.Timestamp_ns, &genericBoolFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a >= b, nil }}) + binaryFunctions.register(types.BinaryOpGte, arrow.PrimitiveTypes.Float64, &genericBoolFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a >= b, nil }}) // Functions for [types.BinaryOpLt] - binaryFunctions.register(types.BinaryOpLt, arrow.FixedWidthTypes.Boolean, &genericFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return boolToInt(a) < boolToInt(b), nil }}) - binaryFunctions.register(types.BinaryOpLt, arrow.BinaryTypes.String, &genericFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a < b, nil }}) - binaryFunctions.register(types.BinaryOpLt, arrow.PrimitiveTypes.Int64, &genericFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a < b, nil }}) - binaryFunctions.register(types.BinaryOpLt, arrow.FixedWidthTypes.Timestamp_ns, &genericFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a < b, nil }}) - binaryFunctions.register(types.BinaryOpLt, arrow.PrimitiveTypes.Float64, &genericFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a < b, nil }}) + binaryFunctions.register(types.BinaryOpLt, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return boolToInt(a) < boolToInt(b), nil }}) + binaryFunctions.register(types.BinaryOpLt, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a < b, nil }}) + binaryFunctions.register(types.BinaryOpLt, arrow.PrimitiveTypes.Int64, &genericBoolFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a < b, nil }}) + binaryFunctions.register(types.BinaryOpLt, arrow.FixedWidthTypes.Timestamp_ns, &genericBoolFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a < b, nil }}) + binaryFunctions.register(types.BinaryOpLt, arrow.PrimitiveTypes.Float64, &genericBoolFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a < b, nil }}) // Functions for [types.BinaryOpLte] - binaryFunctions.register(types.BinaryOpLte, arrow.FixedWidthTypes.Boolean, &genericFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return boolToInt(a) <= boolToInt(b), nil }}) - binaryFunctions.register(types.BinaryOpLte, arrow.BinaryTypes.String, &genericFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a <= b, nil }}) - binaryFunctions.register(types.BinaryOpLte, arrow.PrimitiveTypes.Int64, &genericFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a <= b, nil }}) - binaryFunctions.register(types.BinaryOpLte, arrow.FixedWidthTypes.Timestamp_ns, &genericFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a <= b, nil }}) - binaryFunctions.register(types.BinaryOpLte, arrow.PrimitiveTypes.Float64, &genericFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a <= b, nil }}) + binaryFunctions.register(types.BinaryOpLte, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return boolToInt(a) <= boolToInt(b), nil }}) + binaryFunctions.register(types.BinaryOpLte, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a <= b, nil }}) + binaryFunctions.register(types.BinaryOpLte, arrow.PrimitiveTypes.Int64, &genericBoolFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a <= b, nil }}) + binaryFunctions.register(types.BinaryOpLte, arrow.FixedWidthTypes.Timestamp_ns, &genericBoolFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a <= b, nil }}) + binaryFunctions.register(types.BinaryOpLte, arrow.PrimitiveTypes.Float64, &genericBoolFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a <= b, nil }}) // Functions for [types.BinaryOpMatchSubstr] - binaryFunctions.register(types.BinaryOpMatchSubstr, arrow.BinaryTypes.String, &genericFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return strings.Contains(a, b), nil }}) + binaryFunctions.register(types.BinaryOpMatchSubstr, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return strings.Contains(a, b), nil }}) // Functions for [types.BinaryOpNotMatchSubstr] - binaryFunctions.register(types.BinaryOpNotMatchSubstr, arrow.BinaryTypes.String, &genericFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return !strings.Contains(a, b), nil }}) + binaryFunctions.register(types.BinaryOpNotMatchSubstr, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return !strings.Contains(a, b), nil }}) // Functions for [types.BinaryOpMatchRe] // TODO(chaudum): Performance of regex evaluation can be improved if RHS is a Scalar, // because the regexp would only need to compiled once for the given scalar value. // TODO(chaudum): Performance of regex evaluation can be improved by simplifying the regex, // see pkg/logql/log/filter.go:645 - binaryFunctions.register(types.BinaryOpMatchRe, arrow.BinaryTypes.String, &genericFunction[*array.String, string]{eval: func(a, b string) (bool, error) { + binaryFunctions.register(types.BinaryOpMatchRe, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { reg, err := regexp.Compile(b) if err != nil { return false, err @@ -71,7 +91,7 @@ func init() { return reg.Match([]byte(a)), nil }}) // Functions for [types.BinaryOpNotMatchRe] - binaryFunctions.register(types.BinaryOpNotMatchRe, arrow.BinaryTypes.String, &genericFunction[*array.String, string]{eval: func(a, b string) (bool, error) { + binaryFunctions.register(types.BinaryOpNotMatchRe, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { reg, err := regexp.Compile(b) if err != nil { return false, err @@ -161,17 +181,16 @@ type arrayType[T comparable] interface { IsNull(int) bool Value(int) T Len() int - Release() } -// genericFunction is a struct that implements the [BinaryFunction] interface methods -// and can be used for any array type with compareable elements. -type genericFunction[E arrayType[T], T comparable] struct { +// genericBoolFunction is a struct that implements the [BinaryFunction] interface methods +// and can be used for any array type with comparable elements. +type genericBoolFunction[E arrayType[T], T comparable] struct { eval func(a, b T) (bool, error) } // Evaluate implements BinaryFunction. -func (f *genericFunction[E, T]) Evaluate(lhs ColumnVector, rhs ColumnVector) (ColumnVector, error) { +func (f *genericBoolFunction[E, T]) Evaluate(lhs ColumnVector, rhs ColumnVector) (ColumnVector, error) { if lhs.Len() != rhs.Len() { return nil, arrow.ErrIndex } @@ -180,13 +199,11 @@ func (f *genericFunction[E, T]) Evaluate(lhs ColumnVector, rhs ColumnVector) (Co if !ok { return nil, arrow.ErrType } - defer lhsArr.Release() rhsArr, ok := rhs.ToArray().(E) if !ok { return nil, arrow.ErrType } - defer rhsArr.Release() mem := memory.NewGoAllocator() builder := array.NewBooleanBuilder(mem) @@ -204,7 +221,48 @@ func (f *genericFunction[E, T]) Evaluate(lhs ColumnVector, rhs ColumnVector) (Co builder.Append(res) } - return &Array{array: builder.NewArray()}, nil + return &Array{array: builder.NewArray(), dt: types.Loki.Bool, ct: types.ColumnTypeGenerated}, nil +} + +// genericFloat64Function is a struct that implements the [BinaryFunction] interface methods +// and can be used for any array type with numeric elements. +type genericFloat64Function[E arrayType[T], T comparable] struct { + eval func(a, b T) (float64, error) +} + +// Evaluate implements BinaryFunction. +func (f *genericFloat64Function[E, T]) Evaluate(lhs ColumnVector, rhs ColumnVector) (ColumnVector, error) { + if lhs.Len() != rhs.Len() { + return nil, arrow.ErrIndex + } + + lhsArr, ok := lhs.ToArray().(E) + if !ok { + return nil, arrow.ErrType + } + + rhsArr, ok := rhs.ToArray().(E) + if !ok { + return nil, arrow.ErrType + } + + mem := memory.NewGoAllocator() + builder := array.NewFloat64Builder(mem) + defer builder.Release() + + for i := range lhsArr.Len() { + if lhsArr.IsNull(i) || rhsArr.IsNull(i) { + builder.AppendNull() + continue + } + res, err := f.eval(lhsArr.Value(i), rhsArr.Value(i)) + if err != nil { + return nil, err + } + builder.Append(res) + } + + return &Array{array: builder.NewArray(), dt: types.Loki.Float, ct: types.ColumnTypeGenerated}, nil } // Compiler optimized version of converting boolean b into an integer of value 0 or 1 diff --git a/pkg/engine/internal/executor/functions_test.go b/pkg/engine/internal/executor/functions_test.go index 7003b5d013fc4..07d40cfb56f91 100644 --- a/pkg/engine/internal/executor/functions_test.go +++ b/pkg/engine/internal/executor/functions_test.go @@ -120,7 +120,6 @@ func createFloat64Array(mem memory.Allocator, values []float64, nulls []bool) *A // Helper function to extract boolean values from result func extractBoolValues(result ColumnVector) []bool { arr := result.ToArray().(*array.Boolean) - defer arr.Release() values := make([]bool, arr.Len()) for i := 0; i < arr.Len(); i++ { @@ -182,10 +181,34 @@ func TestBinaryFunctionRegistry_GetForSignature(t *testing.T) { dataType: arrow.BinaryTypes.String, expectError: false, }, + { + name: "valid div operation", + op: types.BinaryOpDiv, + dataType: arrow.PrimitiveTypes.Float64, + expectError: false, + }, + { + name: "valid add operation", + op: types.BinaryOpAdd, + dataType: arrow.PrimitiveTypes.Float64, + expectError: false, + }, + { + name: "valid Mul operation", + op: types.BinaryOpMul, + dataType: arrow.PrimitiveTypes.Float64, + expectError: false, + }, + { + name: "valid sub operation", + op: types.BinaryOpSub, + dataType: arrow.PrimitiveTypes.Float64, + expectError: false, + }, { name: "invalid operation", - op: types.BinaryOpAdd, // Not registered - dataType: arrow.PrimitiveTypes.Int64, + op: types.BinaryOpAnd, // Not registered + dataType: arrow.FixedWidthTypes.Boolean, expectError: true, }, { diff --git a/pkg/engine/internal/executor/math_expression.go b/pkg/engine/internal/executor/math_expression.go new file mode 100644 index 0000000000000..fdbf6b51bddf3 --- /dev/null +++ b/pkg/engine/internal/executor/math_expression.go @@ -0,0 +1,81 @@ +package executor + +import ( + "context" + "fmt" + "slices" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + + "github.com/grafana/loki/v3/pkg/engine/internal/types" + + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" + + "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" +) + +func NewMathExpressionPipeline(expr *physical.MathExpression, input Pipeline, evaluator expressionEvaluator) *GenericPipeline { + return newGenericPipeline(func(ctx context.Context, inputs []Pipeline) (arrow.Record, error) { + input := inputs[0] + batch, err := input.Read(ctx) + if err != nil { + return nil, err + } + defer batch.Release() + + res, err := evaluator.eval(expr.Expression, batch) + if err != nil { + return nil, err + } + data := res.ToArray() + if data.DataType().ID() != arrow.FLOAT64 { + return nil, fmt.Errorf("expression returned non-float64 type %s", data.DataType()) + } + valCol := data.(*array.Float64) + defer valCol.Release() + + // Build output by moving all columns from the input except columns that were used for expression evaluation + // and add the result `value` column. In case of simple math expressions this will replace `value` with `value`. + // In case of math expressions that go after Joins this will replace `value_left` and `value_right` with `value`. + allColumnRefs := getAllColumnRefs(expr.Expression) + schema := batch.Schema() + outputFields := make([]arrow.Field, 0, schema.NumFields()+1) + outputCols := make([]arrow.Array, 0, schema.NumFields()+1) + for i := 0; i < schema.NumFields(); i++ { + if !slices.ContainsFunc(allColumnRefs, func(ref *types.ColumnRef) bool { + ident, err := semconv.ParseFQN(schema.Field(i).Name) + if err != nil { + return false + } + return ref.Column == ident.ShortName() + }) { + outputFields = append(outputFields, schema.Field(i)) + outputCols = append(outputCols, batch.Column(i)) + } + } + + // Add `values` column + outputFields = append(outputFields, semconv.FieldFromIdent(semconv.ColumnIdentValue, false)) + outputCols = append(outputCols, valCol) + + outputSchema := arrow.NewSchema(outputFields, nil) + evaluatedRecord := array.NewRecord(outputSchema, outputCols, batch.NumRows()) + + return evaluatedRecord, nil + }, input) +} + +// getAllColumnRefs finds all column ref from the expression. Should 1 or 2, but might be deep in the expression tree. +func getAllColumnRefs(expr physical.Expression) []*types.ColumnRef { + switch expr := expr.(type) { + case *physical.BinaryExpr: + return append(getAllColumnRefs(expr.Left), getAllColumnRefs(expr.Right)...) + case *physical.UnaryExpr: + return getAllColumnRefs(expr.Left) + case *physical.ColumnExpr: + return []*types.ColumnRef{&expr.Ref} + } + + return nil +} diff --git a/pkg/engine/internal/executor/math_expression_test.go b/pkg/engine/internal/executor/math_expression_test.go new file mode 100644 index 0000000000000..ea955b6710c52 --- /dev/null +++ b/pkg/engine/internal/executor/math_expression_test.go @@ -0,0 +1,218 @@ +package executor + +import ( + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" + + "github.com/grafana/loki/v3/pkg/util/arrowtest" + + "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" + "github.com/grafana/loki/v3/pkg/engine/internal/types" +) + +func TestNewMathExpressionPipeline(t *testing.T) { + t.Run("calculates a simple expression with 1 input", func(t *testing.T) { + colTs := "timestamp_ns.builtin.timestamp" + colVal := "float64.generated.value" + colEnv := "utf8.label.env" + colSvc := "utf8.label.service" + + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer alloc.AssertSize(t, 0) + + schema := arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN(colTs, false), + semconv.FieldFromFQN(colVal, false), + semconv.FieldFromFQN(colEnv, false), + semconv.FieldFromFQN(colSvc, false), + }, nil) + + rowsPipeline1 := []arrowtest.Rows{ + { + {colTs: time.Unix(20, 0).UTC(), colVal: float64(230), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(15, 0).UTC(), colVal: float64(120), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(10, 0).UTC(), colVal: float64(260), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(12, 0).UTC(), colVal: float64(250), colEnv: "dev", colSvc: "distributor"}, + }, + } + input1 := NewArrowtestPipeline(alloc, schema, rowsPipeline1...) + + // value / 10 + mathExpr := &physical.MathExpression{ + Expression: &physical.BinaryExpr{ + Left: &physical.ColumnExpr{ + Ref: types.ColumnRef{ + Column: types.ColumnNameGeneratedValue, + Type: types.ColumnTypeGenerated, + }, + }, + Right: physical.NewLiteral(float64(10)), + Op: types.BinaryOpDiv, + }, + } + + pipeline := NewMathExpressionPipeline(mathExpr, input1, expressionEvaluator{}) + defer pipeline.Close() + + // Read the pipeline output + record, err := pipeline.Read(t.Context()) + require.NoError(t, err) + defer record.Release() + + expect := arrowtest.Rows{ + {colTs: time.Unix(20, 0).UTC(), colVal: float64(23), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(15, 0).UTC(), colVal: float64(12), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(10, 0).UTC(), colVal: float64(26), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(12, 0).UTC(), colVal: float64(25), colEnv: "dev", colSvc: "distributor"}, + } + + rows, err := arrowtest.RecordRows(record) + require.NoError(t, err, "should be able to convert record back to rows") + require.Equal(t, len(expect), len(rows), "number of rows should match") + require.ElementsMatch(t, expect, rows) + }) + + t.Run("calculates a complex expression with 1 input", func(t *testing.T) { + colTs := "timestamp_ns.builtin.timestamp" + colVal := "float64.generated.value" + colEnv := "utf8.label.env" + colSvc := "utf8.label.service" + + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer alloc.AssertSize(t, 0) + + schema := arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN(colTs, false), + semconv.FieldFromFQN(colVal, false), + semconv.FieldFromFQN(colEnv, false), + semconv.FieldFromFQN(colSvc, false), + }, nil) + + rowsPipeline1 := []arrowtest.Rows{ + { + {colTs: time.Unix(20, 0).UTC(), colVal: float64(230), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(15, 0).UTC(), colVal: float64(120), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(10, 0).UTC(), colVal: float64(260), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(12, 0).UTC(), colVal: float64(250), colEnv: "dev", colSvc: "distributor"}, + }, + } + input1 := NewArrowtestPipeline(alloc, schema, rowsPipeline1...) + + // value * 10 + 100 / 10 + mathExpr := &physical.MathExpression{ + Expression: &physical.BinaryExpr{ + Left: &physical.BinaryExpr{ + Left: &physical.ColumnExpr{ + Ref: types.ColumnRef{ + Column: types.ColumnNameGeneratedValue, + Type: types.ColumnTypeGenerated, + }, + }, + Right: physical.NewLiteral(float64(10)), + Op: types.BinaryOpMul, + }, + Right: &physical.BinaryExpr{ + Left: physical.NewLiteral(float64(100)), + Right: physical.NewLiteral(float64(10)), + Op: types.BinaryOpDiv, + }, + Op: types.BinaryOpAdd, + }, + } + + pipeline := NewMathExpressionPipeline(mathExpr, input1, expressionEvaluator{}) + defer pipeline.Close() + + // Read the pipeline output + record, err := pipeline.Read(t.Context()) + require.NoError(t, err) + defer record.Release() + + expect := arrowtest.Rows{ + {colTs: time.Unix(20, 0).UTC(), colVal: float64(2310), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(15, 0).UTC(), colVal: float64(1210), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(10, 0).UTC(), colVal: float64(2610), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(12, 0).UTC(), colVal: float64(2510), colEnv: "dev", colSvc: "distributor"}, + } + + rows, err := arrowtest.RecordRows(record) + require.NoError(t, err, "should be able to convert record back to rows") + require.Equal(t, len(expect), len(rows), "number of rows should match") + require.ElementsMatch(t, expect, rows) + }) + + t.Run("calculates a complex ex", func(t *testing.T) { + colTs := "timestamp_ns.builtin.timestamp" + colVal := "float64.generated.value" + colValLeft := "float64.generated.value_left" + colValRight := "float64.generated.value_right" + colEnv := "utf8.label.env" + colSvc := "utf8.label.service" + + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer alloc.AssertSize(t, 0) + + schema := arrow.NewSchema([]arrow.Field{ + semconv.FieldFromFQN(colTs, false), + semconv.FieldFromFQN(colValLeft, false), + semconv.FieldFromFQN(colValRight, false), + semconv.FieldFromFQN(colEnv, false), + semconv.FieldFromFQN(colSvc, false), + }, nil) + + rowsPipeline1 := []arrowtest.Rows{ + { + {colTs: time.Unix(20, 0).UTC(), colValLeft: float64(230), colValRight: float64(2), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(15, 0).UTC(), colValLeft: float64(120), colValRight: float64(10), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(10, 0).UTC(), colValLeft: float64(260), colValRight: float64(4), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(12, 0).UTC(), colValLeft: float64(250), colValRight: float64(20), colEnv: "dev", colSvc: "distributor"}, + }, + } + input1 := NewArrowtestPipeline(alloc, schema, rowsPipeline1...) + + // value_left / value_right + mathExpr := &physical.MathExpression{ + Expression: &physical.BinaryExpr{ + Left: &physical.ColumnExpr{ + Ref: types.ColumnRef{ + Column: "value_left", + Type: types.ColumnTypeGenerated, + }, + }, + Right: &physical.ColumnExpr{ + Ref: types.ColumnRef{ + Column: "value_right", + Type: types.ColumnTypeGenerated, + }, + }, + Op: types.BinaryOpDiv, + }, + } + + pipeline := NewMathExpressionPipeline(mathExpr, input1, expressionEvaluator{}) + defer pipeline.Close() + + // Read the pipeline output + record, err := pipeline.Read(t.Context()) + require.NoError(t, err) + defer record.Release() + + expect := arrowtest.Rows{ + {colTs: time.Unix(20, 0).UTC(), colVal: float64(115), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(15, 0).UTC(), colVal: float64(12), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(10, 0).UTC(), colVal: float64(65), colEnv: "prod", colSvc: "distributor"}, + {colTs: time.Unix(12, 0).UTC(), colVal: float64(12.5), colEnv: "dev", colSvc: "distributor"}, + } + + rows, err := arrowtest.RecordRows(record) + require.NoError(t, err, "should be able to convert record back to rows") + require.Equal(t, len(expect), len(rows), "number of rows should match") + require.ElementsMatch(t, expect, rows) + }) +} diff --git a/pkg/engine/internal/executor/range_aggregation.go b/pkg/engine/internal/executor/range_aggregation.go index 1aa5d05de0f20..a01ff88fa6bd3 100644 --- a/pkg/engine/internal/executor/range_aggregation.go +++ b/pkg/engine/internal/executor/range_aggregation.go @@ -163,7 +163,6 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro if err != nil { return nil, err } - defer vec.Release() if vec.Type() != types.Loki.String { return nil, fmt.Errorf("unsupported datatype for partitioning %s", vec.Type()) @@ -180,7 +179,6 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro if err != nil { return nil, err } - defer tsVec.Release() tsCol := tsVec.ToArray().(*array.Timestamp) defer tsCol.Release() diff --git a/pkg/engine/internal/executor/vector_aggregate.go b/pkg/engine/internal/executor/vector_aggregate.go index 818c94f2746d9..e2f0a1987d3f8 100644 --- a/pkg/engine/internal/executor/vector_aggregate.go +++ b/pkg/engine/internal/executor/vector_aggregate.go @@ -102,7 +102,6 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err if err != nil { return nil, err } - defer tsVec.Release() tsCol := tsVec.ToArray().(*array.Timestamp) defer tsCol.Release() @@ -111,7 +110,6 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err if err != nil { return nil, err } - defer valueVec.Release() valueArr := valueVec.ToArray().(*array.Float64) defer valueArr.Release() @@ -123,7 +121,6 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err if err != nil { return nil, err } - defer vec.Release() if vec.Type() != types.Loki.String { return nil, fmt.Errorf("unsupported datatype for grouping %s", vec.Type()) diff --git a/pkg/engine/internal/planner/logical/builder.go b/pkg/engine/internal/planner/logical/builder.go index 2e09c5101fa45..9bca6bae3c498 100644 --- a/pkg/engine/internal/planner/logical/builder.go +++ b/pkg/engine/internal/planner/logical/builder.go @@ -61,6 +61,28 @@ func (b *Builder) Sort(column ColumnRef, ascending, nullsFirst bool) *Builder { } } +// BinOpRight adds a binary arithmetic operation with a given right value +func (b *Builder) BinOpRight(op types.BinaryOp, right Value) *Builder { + return &Builder{ + val: &BinOp{ + Left: b.val, + Right: right, + Op: op, + }, + } +} + +// BinOpLeft adds a binary arithmetic operation with a given left value +func (b *Builder) BinOpLeft(op types.BinaryOp, left Value) *Builder { + return &Builder{ + val: &BinOp{ + Left: left, + Right: b.val, + Op: op, + }, + } +} + // RangeAggregation applies a [RangeAggregation] operation to the Builder. func (b *Builder) RangeAggregation( partitionBy []ColumnRef, diff --git a/pkg/engine/internal/planner/logical/planner.go b/pkg/engine/internal/planner/logical/planner.go index 36b2fbc818d3d..97e195a764e8e 100644 --- a/pkg/engine/internal/planner/logical/planner.go +++ b/pkg/engine/internal/planner/logical/planner.go @@ -21,15 +21,15 @@ var unimplementedFeature = func(s string) error { return fmt.Errorf("%w: %s", er // It may return an error as second argument in case the traversal of the AST of the query fails. func BuildPlan(params logql.Params) (*Plan, error) { var ( - builder *Builder - err error + value Value + err error ) switch e := params.GetExpression().(type) { case syntax.LogSelectorExpr: - builder, err = buildPlanForLogQuery(e, params, false, 0) + value, err = buildPlanForLogQuery(e, params, false, 0) case syntax.SampleExpr: - builder, err = buildPlanForSampleQuery(e, params) + value, err = buildPlanForSampleQuery(e, params) default: err = fmt.Errorf("unexpected expression type (%T)", e) } @@ -38,6 +38,8 @@ func BuildPlan(params logql.Params) (*Plan, error) { return nil, fmt.Errorf("failed to convert AST into logical plan: %w", err) } + builder := NewBuilder(value) + // TODO(chaudum): Make compatibility mode configurable builder = builder.Compat(true) @@ -52,7 +54,7 @@ func buildPlanForLogQuery( params logql.Params, isMetricQuery bool, rangeInterval time.Duration, -) (*Builder, error) { +) (Value, error) { var ( err error selector Value @@ -214,101 +216,161 @@ func buildPlanForLogQuery( builder = builder.Limit(0, limit) } - return builder, nil + return builder.Value(), nil } -func buildPlanForSampleQuery(e syntax.SampleExpr, params logql.Params) (*Builder, error) { - var ( - err error +func walkRangeAggregation(e *syntax.RangeAggregationExpr, params logql.Params) (Value, error) { + // offsets are not yet supported. + if e.Left.Offset != 0 { + return nil, errUnimplemented + } - rangeAggType types.RangeAggregationType - rangeInterval time.Duration + logSelectorExpr, err := e.Selector() + if err != nil { + return nil, err + } - vecAggType types.VectorAggregationType - groupBy []ColumnRef - ) + rangeInterval := e.Left.Interval - e.Walk(func(e syntax.Expr) bool { - switch e := e.(type) { - case *syntax.RangeAggregationExpr: - // offsets are not yet supported. - if e.Left.Offset != 0 { - err = errUnimplemented - return false - } + logQuery, err := buildPlanForLogQuery(logSelectorExpr, params, true, rangeInterval) + if err != nil { + return nil, err + } - switch e.Operation { - case syntax.OpRangeTypeCount: - rangeAggType = types.RangeAggregationTypeCount - case syntax.OpRangeTypeSum: - rangeAggType = types.RangeAggregationTypeSum - //case syntax.OpRangeTypeMax: - // rangeAggType = types.RangeAggregationTypeMax - //case syntax.OpRangeTypeMin: - // rangeAggType = types.RangeAggregationTypeMin - default: - err = errUnimplemented - return false - } + rangeAggType := convertRangeAggregationType(e.Operation) + if rangeAggType == types.RangeAggregationTypeInvalid { + return nil, errUnimplemented + } - rangeInterval = e.Left.Interval - return false // do not traverse log range query + rangeAggregation := &RangeAggregation{ + Table: logQuery, - case *syntax.VectorAggregationExpr: - // `without()` grouping is not supported. - if e.Grouping != nil && e.Grouping.Without { - err = errUnimplemented - return false - } + Operation: rangeAggType, + PartitionBy: nil, + Start: params.Start(), + End: params.End(), + Step: params.Step(), + RangeInterval: rangeInterval, + } - switch e.Operation { - //case syntax.OpTypeCount: - // vecAggType = types.VectorAggregationTypeCount - case syntax.OpTypeSum: - vecAggType = types.VectorAggregationTypeSum - //case syntax.OpTypeMax: - // vecAggType = types.VectorAggregationTypeMax - //case syntax.OpTypeMin: - // vecAggType = types.VectorAggregationTypeMin - default: - err = errUnimplemented - return false - } + return rangeAggregation, nil +} - groupBy = make([]ColumnRef, 0, len(e.Grouping.Groups)) - for _, group := range e.Grouping.Groups { - groupBy = append(groupBy, *NewColumnRef(group, types.ColumnTypeAmbiguous)) - } +func walkVectorAggregation(e *syntax.VectorAggregationExpr, params logql.Params) (Value, error) { + // `without()` grouping is not supported. + if e.Grouping != nil && e.Grouping.Without { + return nil, errUnimplemented + } - return true - default: - err = errUnimplemented - return false // do not traverse children - } - }) + left, err := walk(e.Left, params) if err != nil { return nil, err } - if rangeAggType == types.RangeAggregationTypeInvalid || vecAggType == types.VectorAggregationTypeInvalid { + vecAggType := convertVectorAggregationType(e.Operation) + if vecAggType == types.VectorAggregationTypeInvalid { return nil, errUnimplemented } - logSelectorExpr, err := e.Selector() + groupBy := make([]ColumnRef, 0, len(e.Grouping.Groups)) + for _, group := range e.Grouping.Groups { + groupBy = append(groupBy, *NewColumnRef(group, types.ColumnTypeAmbiguous)) + } + + return &VectorAggregation{ + Table: left, + GroupBy: groupBy, + Operation: vecAggType, + }, nil +} + +func hasNonMathExpressionChild(n Value) bool { + if _, ok := n.(*VectorAggregation); ok { + return true + } + + if _, ok := n.(*RangeAggregation); ok { + return true + } + + if b, ok := n.(*BinOp); ok { + return hasNonMathExpressionChild(b.Left) || hasNonMathExpressionChild(b.Right) + } + + return false +} + +func walkBinOp(e *syntax.BinOpExpr, params logql.Params) (Value, error) { + left, err := walk(e.SampleExpr, params) if err != nil { return nil, err } - - builder, err := buildPlanForLogQuery(logSelectorExpr, params, true, rangeInterval) + right, err := walk(e.RHS, params) if err != nil { return nil, err } - builder = builder.RangeAggregation( - nil, rangeAggType, params.Start(), params.End(), params.Step(), rangeInterval, - ).VectorAggregation(groupBy, vecAggType) + op := convertBinaryArithmeticOp(e.Op) + if op == types.BinaryOpInvalid { + return nil, errUnimplemented + } + + // this is to check that there is only one non-literal input on either side, otherwise it is not implemented yet. + // TODO remove when inner joins on timestamp are implemented + if hasNonMathExpressionChild(left) && hasNonMathExpressionChild(right) { + return nil, errUnimplemented + } + + return &BinOp{ + Left: left, + Right: right, + Op: op, + }, nil +} + +func walkLiteral(e *syntax.LiteralExpr, _ logql.Params) (Value, error) { + return &Literal{ + NewLiteral(e.Val), + }, nil +} + +func walk(e syntax.Expr, params logql.Params) (Value, error) { + switch e := e.(type) { + case *syntax.RangeAggregationExpr: + return walkRangeAggregation(e, params) + case *syntax.VectorAggregationExpr: + return walkVectorAggregation(e, params) + case *syntax.BinOpExpr: + return walkBinOp(e, params) + case *syntax.LiteralExpr: + return walkLiteral(e, params) + } - return builder, nil + return nil, errUnimplemented +} + +func buildPlanForSampleQuery(e syntax.SampleExpr, params logql.Params) (Value, error) { + val, err := walk(e, params) + + // this is to check that there are both range and vector aggregations, otherwise it is not implemented yet. + // TODO remove when all permutations of vector aggregations and range aggregations are implemented. + hasRangeAgg := false + hasVecAgg := false + e.Walk(func(e syntax.Expr) bool { + switch e.(type) { + case *syntax.RangeAggregationExpr: + hasRangeAgg = true + return false + case *syntax.VectorAggregationExpr: + hasVecAgg = true + } + return true + }) + if !hasRangeAgg || !hasVecAgg { + return nil, errUnimplemented + } + + return val, err } func convertLabelMatchers(matchers []*labels.Matcher) Value { @@ -334,6 +396,36 @@ func convertLabelMatchers(matchers []*labels.Matcher) Value { return value } +func convertVectorAggregationType(op string) types.VectorAggregationType { + switch op { + case syntax.OpTypeSum: + return types.VectorAggregationTypeSum + //case syntax.OpTypeCount: + // return types.VectorAggregationTypeCount + //case syntax.OpTypeMax: + // return types.VectorAggregationTypeMax + //case syntax.OpTypeMin: + // return types.VectorAggregationTypeMin + default: + return types.VectorAggregationTypeInvalid + } +} + +func convertRangeAggregationType(op string) types.RangeAggregationType { + switch op { + case syntax.OpRangeTypeCount: + return types.RangeAggregationTypeCount + case syntax.OpRangeTypeSum: + return types.RangeAggregationTypeSum + //case syntax.OpRangeTypeMax: + // return types.RangeAggregationTypeMax + //case syntax.OpRangeTypeMin: + // return types.RangeAggregationTypeMin + default: + return types.RangeAggregationTypeInvalid + } +} + func convertMatcherType(t labels.MatchType) types.BinaryOp { switch t { case labels.MatchEqual: @@ -371,6 +463,25 @@ func convertLineFilter(filter syntax.LineFilter) Value { } } +func convertBinaryArithmeticOp(op string) types.BinaryOp { + switch op { + case syntax.OpTypeAdd: + return types.BinaryOpAdd + case syntax.OpTypeSub: + return types.BinaryOpSub + case syntax.OpTypeMul: + return types.BinaryOpMul + case syntax.OpTypeDiv: + return types.BinaryOpDiv + case syntax.OpTypeMod: + return types.BinaryOpMod + case syntax.OpTypePow: + return types.BinaryOpPow + default: + return types.BinaryOpInvalid + } +} + func convertLineMatchType(op log.LineMatchType) types.BinaryOp { switch op { case log.LineMatchEqual: diff --git a/pkg/engine/internal/planner/logical/planner_test.go b/pkg/engine/internal/planner/logical/planner_test.go index 5e3add833ae2a..289bc339de9bd 100644 --- a/pkg/engine/internal/planner/logical/planner_test.go +++ b/pkg/engine/internal/planner/logical/planner_test.go @@ -63,7 +63,7 @@ func (q *query) GetStoreChunks() *logproto.ChunkRefGroup { // Interval implements logql.Params. func (q *query) Interval() time.Duration { - panic("unimplemented") + return q.interval } // Shards implements logql.Params. @@ -123,18 +123,19 @@ RETURN %21 } func TestConvertAST_MetricQuery_Success(t *testing.T) { - q := &query{ - statement: `sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"} |= "metric.go"[5m]))`, - start: 3600, - end: 7200, - interval: 5 * time.Minute, - } + t.Run("simple metric query", func(t *testing.T) { + q := &query{ + statement: `sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"} |= "metric.go"[5m]))`, + start: 3600, + end: 7200, + interval: 5 * time.Minute, + } - logicalPlan, err := BuildPlan(q) - require.NoError(t, err) - t.Logf("\n%s\n", logicalPlan.String()) + logicalPlan, err := BuildPlan(q) + require.NoError(t, err) + t.Logf("\n%s\n", logicalPlan.String()) - expected := `%1 = EQ label.cluster "prod" + expected := `%1 = EQ label.cluster "prod" %2 = MATCH_RE label.namespace "loki-.*" %3 = AND %1 %2 %4 = MATCH_STR builtin.message "metric.go" @@ -150,12 +151,48 @@ func TestConvertAST_MetricQuery_Success(t *testing.T) { RETURN %13 ` - require.Equal(t, expected, logicalPlan.String()) + require.Equal(t, expected, logicalPlan.String()) - var sb strings.Builder - PrintTree(&sb, logicalPlan.Value()) + var sb strings.Builder + PrintTree(&sb, logicalPlan.Value()) - t.Logf("\n%s\n", sb.String()) + t.Logf("\n%s\n", sb.String()) + }) + + t.Run(`metric query with one binary math operation`, func(t *testing.T) { + q := &query{ + statement: `sum by (level) (count_over_time({cluster="prod", namespace=~"loki-.*"}[5m]) / 300)`, + start: 3600, + end: 7200, + interval: 5 * time.Minute, + } + + logicalPlan, err := BuildPlan(q) + require.NoError(t, err) + t.Logf("\n%s\n", logicalPlan.String()) + + expected := `%1 = EQ label.cluster "prod" +%2 = MATCH_RE label.namespace "loki-.*" +%3 = AND %1 %2 +%4 = MAKETABLE [selector=%3, predicates=[], shard=0_of_1] +%5 = GTE builtin.timestamp 1970-01-01T00:55:00Z +%6 = SELECT %4 [predicate=%5] +%7 = LT builtin.timestamp 1970-01-01T02:00:00Z +%8 = SELECT %6 [predicate=%7] +%9 = RANGE_AGGREGATION %8 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] +%10 = DIV %9 300 +%11 = VECTOR_AGGREGATION %10 [operation=sum, group_by=(ambiguous.level)] +%12 = LOGQL_COMPAT %11 +RETURN %12 +` + + require.Equal(t, expected, logicalPlan.String()) + + var sb strings.Builder + PrintTree(&sb, logicalPlan.Value()) + + t.Logf("\n%s\n", sb.String()) + }) } func TestCanExecuteQuery(t *testing.T) { @@ -223,6 +260,18 @@ func TestCanExecuteQuery(t *testing.T) { statement: `sum by (level) (count_over_time({env="prod"}[1m]))`, expected: true, }, + { + statement: `sum by (level) (count_over_time({env="prod"}[1m]) / 60)`, + expected: true, + }, + { + statement: `sum by (level) (count_over_time({env="prod"}[1m]) / 60 * 4)`, + expected: true, + }, + { + // two inputs are not supported + statement: `sum by (level) (count_over_time({env="prod"} |= "error" [1m]) / count_over_time({env="prod"}[1m]))`, + }, { statement: `sum without (level) (count_over_time({env="prod"}[1m]))`, }, @@ -235,7 +284,6 @@ func TestCanExecuteQuery(t *testing.T) { expected: true, }, { - // rate is not supported statement: `sum by (level) (rate({env="prod"}[1m]))`, }, { @@ -247,6 +295,7 @@ func TestCanExecuteQuery(t *testing.T) { statement: `sum by (level) (count_over_time({env="prod"}[1m] offset 5m))`, }, { + // unwrap is not supported statement: `sum by (level) (sum_over_time({env="prod"} | unwrap size [1m]))`, expected: true, }, diff --git a/pkg/engine/internal/planner/physical/join.go b/pkg/engine/internal/planner/physical/join.go new file mode 100644 index 0000000000000..489259ba51cc1 --- /dev/null +++ b/pkg/engine/internal/planner/physical/join.go @@ -0,0 +1,35 @@ +package physical + +import "fmt" + +// Join represents a join operation in the physical plan. +// For now it is only an inner join on `timestamp`. Will be expanded later. +type Join struct { + id string +} + +// ID implements the [Node] interface. +// Returns a string that uniquely identifies the node in the plan. +func (f *Join) ID() string { + if f.id == "" { + return fmt.Sprintf("%p", f) + } + return f.id +} + +// Clone returns a deep copy of the node (minus its ID). +func (f *Join) Clone() Node { + return &Join{} +} + +// Type implements the [Node] interface. +// Returns the type of the node. +func (*Join) Type() NodeType { + return NodeTypeJoin +} + +// Accept implements the [Node] interface. +// Dispatches itself to the provided [Visitor] v +func (f *Join) Accept(v Visitor) error { + return v.VisitJoin(f) +} diff --git a/pkg/engine/internal/planner/physical/math_expression.go b/pkg/engine/internal/planner/physical/math_expression.go new file mode 100644 index 0000000000000..46b147771891b --- /dev/null +++ b/pkg/engine/internal/planner/physical/math_expression.go @@ -0,0 +1,41 @@ +package physical + +import ( + "fmt" +) + +// MathExpression represents an arithmetic operation +type MathExpression struct { + id string + + // Expression is a math expression (a tree of BinOps or UnaryOps) with literals and a column reference as input. + Expression Expression +} + +// ID implements the [Node] interface. +// Returns a string that uniquely identifies the node in the plan. +func (m *MathExpression) ID() string { + if m.id == "" { + return fmt.Sprintf("%p", m) + } + return m.id +} + +// Type implements the [Node] interface. +// Returns the type of the node. +func (*MathExpression) Type() NodeType { + return NodeTypeMathExpression +} + +// Accept implements the [Node] interface. +// Dispatches itself to the provided [Visitor] v +func (m *MathExpression) Accept(v Visitor) error { + return v.VisitMathExpression(m) +} + +// Clone returns a deep copy of the node (minus its ID). +func (m *MathExpression) Clone() Node { + return &MathExpression{ + Expression: m.Expression.Clone(), + } +} diff --git a/pkg/engine/internal/planner/physical/optimizer.go b/pkg/engine/internal/planner/physical/optimizer.go index 6a9cedd9b06af..cb1aba840857d 100644 --- a/pkg/engine/internal/planner/physical/optimizer.go +++ b/pkg/engine/internal/planner/physical/optimizer.go @@ -136,6 +136,20 @@ type projectionPushdown struct { plan *Plan } +func (r *projectionPushdown) applyToRangeAggregations(node Node, groupBy []ColumnExpression, ops ...types.RangeAggregationType) bool { + anyChanged := false + for _, child := range r.plan.Children(node) { + if ra, ok := child.(*RangeAggregation); ok { + if slices.Contains(ops, ra.Operation) { + anyChanged = r.handleRangeAggregation(ra, groupBy) || anyChanged + } + } else { + anyChanged = r.applyToRangeAggregations(child, groupBy, ops...) || anyChanged + } + } + return anyChanged +} + // apply implements rule. func (r *projectionPushdown) apply(node Node) bool { switch node := node.(type) { @@ -150,33 +164,17 @@ func (r *projectionPushdown) apply(node Node) bool { // MAX -> MAX // MIN -> MIN - applyToRangeAggregations := func(ops ...types.RangeAggregationType) bool { - anyChanged := false - for _, child := range r.plan.Children(node) { - if ra, ok := child.(*RangeAggregation); ok { - if slices.Contains(ops, ra.Operation) { - anyChanged = r.handleRangeAggregation(ra, node.GroupBy) || anyChanged - } - } - } - return anyChanged - } - switch node.Operation { case types.VectorAggregationTypeSum: - return applyToRangeAggregations(types.RangeAggregationTypeSum, types.RangeAggregationTypeCount) + return r.applyToRangeAggregations(node, node.GroupBy, types.RangeAggregationTypeSum, types.RangeAggregationTypeCount) case types.VectorAggregationTypeMax: - return applyToRangeAggregations(types.RangeAggregationTypeMax) + return r.applyToRangeAggregations(node, node.GroupBy, types.RangeAggregationTypeMax) case types.VectorAggregationTypeMin: - return applyToRangeAggregations(types.RangeAggregationTypeMin) + return r.applyToRangeAggregations(node, node.GroupBy, types.RangeAggregationTypeMin) default: return false } case *RangeAggregation: - if !slices.Contains(types.SupportedRangeAggregationTypes, node.Operation) { - return false - } - projections := make([]ColumnExpression, len(node.PartitionBy)+1) copy(projections, node.PartitionBy) // Always project timestamp column even if partitionBy is empty. @@ -218,7 +216,7 @@ func (r *projectionPushdown) applyProjectionPushdown( return r.handleParseNode(node, projections, applyIfNotEmpty) case *RangeAggregation: return r.handleRangeAggregation(node, projections) - case *Parallelize, *Filter, *ColumnCompat: + case *Parallelize, *Filter, *ColumnCompat, *MathExpression: // Push to next direct child that cares about projections return r.pushToChildren(node, projections, applyIfNotEmpty) } @@ -488,6 +486,50 @@ func disambiguateColumns(columns []ColumnExpression) ([]ColumnExpression, []Colu return unambiguousColumns, ambiguousColumns } +// mathExpressionsMerge is a rule that merges adjacent math expressions nodes into one node with a complex expression +type mathExpressionsMerge struct { + plan *Plan +} + +// apply implements rule. +func (r *mathExpressionsMerge) apply(node Node) bool { + changed := false + children := r.plan.Children(node) + switch node := node.(type) { + case *MathExpression: + switch e := node.Expression.(type) { + case *BinaryExpr: + // If LHS is a column reference + if _, ok := e.Left.(*ColumnExpr); ok { + if c, ok := children[0].(*MathExpression); ok { + e.Left = c.Expression + r.plan.graph.Eliminate(c) + changed = true + } + } + // If RHS is a column reference + if _, ok := e.Right.(*ColumnExpr); ok { + if c, ok := children[len(children)-1].(*MathExpression); ok { + e.Right = c.Expression + r.plan.graph.Eliminate(c) + changed = true + } + } + + case *UnaryExpr: + // If LHS is a column reference + if _, ok := e.Left.(*ColumnExpr); ok { + if c, ok := children[0].(*MathExpression); ok { + e.Left = c.Expression + r.plan.graph.Eliminate(c) + changed = true + } + } + } + } + return changed +} + // optimization represents a single optimization pass and can hold multiple rules. type optimization struct { plan *Plan diff --git a/pkg/engine/internal/planner/physical/plan.go b/pkg/engine/internal/planner/physical/plan.go index 2c6e4993f69f4..26f97915e0c38 100644 --- a/pkg/engine/internal/planner/physical/plan.go +++ b/pkg/engine/internal/planner/physical/plan.go @@ -16,10 +16,12 @@ const ( NodeTypeVectorAggregation NodeTypeMerge NodeTypeParse + NodeTypeMathExpression NodeTypeCompat NodeTypeTopK NodeTypeParallelize NodeTypeScanSet + NodeTypeJoin ) func (t NodeType) String() string { @@ -42,6 +44,8 @@ func (t NodeType) String() string { return "VectorAggregation" case NodeTypeParse: return "Parse" + case NodeTypeMathExpression: + return "MathExpression" case NodeTypeCompat: return "Compat" case NodeTypeTopK: @@ -50,6 +54,8 @@ func (t NodeType) String() string { return "Parallelize" case NodeTypeScanSet: return "ScanSet" + case NodeTypeJoin: + return "Join" default: return "Undefined" } @@ -85,10 +91,12 @@ var _ Node = (*Filter)(nil) var _ Node = (*RangeAggregation)(nil) var _ Node = (*VectorAggregation)(nil) var _ Node = (*ParseNode)(nil) +var _ Node = (*MathExpression)(nil) var _ Node = (*ColumnCompat)(nil) var _ Node = (*TopK)(nil) var _ Node = (*Parallelize)(nil) var _ Node = (*ScanSet)(nil) +var _ Node = (*Join)(nil) func (*DataObjScan) isNode() {} func (*Projection) isNode() {} @@ -96,11 +104,13 @@ func (*Limit) isNode() {} func (*Filter) isNode() {} func (*RangeAggregation) isNode() {} func (*VectorAggregation) isNode() {} +func (*MathExpression) isNode() {} func (*ParseNode) isNode() {} func (*ColumnCompat) isNode() {} func (*TopK) isNode() {} func (*Parallelize) isNode() {} func (*ScanSet) isNode() {} +func (*Join) isNode() {} // WalkOrder defines the order for how a node and its children are visited. type WalkOrder uint8 diff --git a/pkg/engine/internal/planner/physical/planner.go b/pkg/engine/internal/planner/physical/planner.go index a0ed275eadae2..58df5e7c60530 100644 --- a/pkg/engine/internal/planner/physical/planner.go +++ b/pkg/engine/internal/planner/physical/planner.go @@ -155,6 +155,10 @@ func (p *Planner) process(inst logical.Value, ctx *Context) ([]Node, error) { return p.processVectorAggregation(inst, ctx) case *logical.Parse: return p.processParse(inst, ctx) + case *logical.BinOp: + return p.processBinOp(inst, ctx) + case *logical.UnaryOp: + return p.processUnaryOp(inst, ctx) case *logical.LogQLCompat: p.context.v1Compatible = true return p.process(inst.Value, ctx) @@ -382,6 +386,128 @@ func (p *Planner) processVectorAggregation(lp *logical.VectorAggregation, ctx *C return []Node{node}, nil } +func (p *Planner) hasNonMathExpressionChild(n Node) bool { + if n == nil { + return false + } + + if _, ok := n.(*MathExpression); ok { + for _, c := range p.plan.Children(n) { + if p.hasNonMathExpressionChild(c) { + return true + } + } + return false + } else { + return true + } +} + +func (p *Planner) processBinOp(lp *logical.BinOp, ctx *Context) ([]Node, error) { + var left, right Expression + var leftChild, rightChild Node + + if l, ok := lp.Left.(*logical.Literal); ok { + left = &LiteralExpr{Literal: l.Literal} + } else { + left = newColumnExpr(types.ColumnNameGeneratedValue, types.ColumnTypeGenerated) + + leftChildren, err := p.process(lp.Left, ctx) + if err != nil { + return nil, err + } + leftChild = leftChildren[0] + } + + if l, ok := lp.Right.(*logical.Literal); ok { + right = &LiteralExpr{l.Literal} + } else { + right = newColumnExpr(types.ColumnNameGeneratedValue, types.ColumnTypeGenerated) + + rightChildren, err := p.process(lp.Right, ctx) + if err != nil { + return nil, err + } + rightChild = rightChildren[0] + } + + mathExprNode := &MathExpression{ + Expression: &BinaryExpr{ + Left: left, + Right: right, + Op: lp.Op, + }, + } + p.plan.graph.Add(mathExprNode) + + // If both left and right children have some data scans + if p.hasNonMathExpressionChild(leftChild) && p.hasNonMathExpressionChild(rightChild) { + // Rename column references to match 2 joined `value` columns + mathExprNode.Expression = &BinaryExpr{ + Left: newColumnExpr("value_left", types.ColumnTypeGenerated), + Right: newColumnExpr("value_right", types.ColumnTypeGenerated), + Op: lp.Op, + } + + // Insert InnerJoin node between this math expression and its two inputs. + joinNode := &Join{} + p.plan.graph.Add(joinNode) + if err := p.plan.graph.AddEdge(dag.Edge[Node]{Parent: joinNode, Child: leftChild}); err != nil { + return nil, err + } + if err := p.plan.graph.AddEdge(dag.Edge[Node]{Parent: joinNode, Child: rightChild}); err != nil { + return nil, err + } + if err := p.plan.graph.AddEdge(dag.Edge[Node]{Parent: mathExprNode, Child: joinNode}); err != nil { + return nil, err + } + } else { + if leftChild != nil { + if err := p.plan.graph.AddEdge(dag.Edge[Node]{Parent: mathExprNode, Child: leftChild}); err != nil { + return nil, err + } + } + if rightChild != nil { + if err := p.plan.graph.AddEdge(dag.Edge[Node]{Parent: mathExprNode, Child: rightChild}); err != nil { + return nil, err + } + } + } + + return []Node{mathExprNode}, nil +} + +func (p *Planner) processUnaryOp(lp *logical.UnaryOp, ctx *Context) ([]Node, error) { + var left Expression + + if l, ok := lp.Value.(*logical.Literal); ok { + left = &LiteralExpr{Literal: l.Literal} + } else { + left = newColumnExpr(types.ColumnNameGeneratedValue, types.ColumnTypeGenerated) + } + + node := &MathExpression{ + Expression: &UnaryExpr{ + Left: left, + Op: lp.Op, + }, + } + p.plan.graph.Add(node) + + // if lhs is not a literal, then process children nodes + if _, ok := lp.Value.(*logical.Literal); !ok { + leftChildren, err := p.process(lp.Value, ctx) + if err != nil { + return nil, err + } + if err := p.plan.graph.AddEdge(dag.Edge[Node]{Parent: node, Child: leftChildren[0]}); err != nil { + return nil, err + } + } + + return []Node{node}, nil +} + // Convert [logical.Parse] into one [ParseNode] node. // A ParseNode initially has an empty list of RequestedKeys which will be populated during optimization. func (p *Planner) processParse(lp *logical.Parse, ctx *Context) ([]Node, error) { @@ -429,6 +555,9 @@ func (p *Planner) wrapNodeWith(node Node, wrapper Node) (Node, error) { func (p *Planner) Optimize(plan *Plan) (*Plan, error) { for i, root := range plan.Roots() { optimizations := []*optimization{ + newOptimization("MathExpressionsMerge", plan).withRules( + &mathExpressionsMerge{plan: plan}, + ), newOptimization("PredicatePushdown", plan).withRules( &predicatePushdown{plan: plan}, ), diff --git a/pkg/engine/internal/planner/physical/planner_test.go b/pkg/engine/internal/planner/physical/planner_test.go index 70c89a70a139e..39f642fa57dce 100644 --- a/pkg/engine/internal/planner/physical/planner_test.go +++ b/pkg/engine/internal/planner/physical/planner_test.go @@ -428,7 +428,7 @@ func TestPlanner_Convert_RangeAggregations(t *testing.T) { Op: types.BinaryOpLt, }, ).RangeAggregation( - []logical.ColumnRef{*logical.NewColumnRef("label1", types.ColumnTypeAmbiguous), *logical.NewColumnRef("label2", types.ColumnTypeMetadata)}, + []logical.ColumnRef{}, types.RangeAggregationTypeCount, time.Date(2023, 10, 1, 0, 0, 0, 0, time.UTC), // Start Time time.Date(2023, 10, 1, 1, 0, 0, 0, time.UTC), // End Time @@ -458,6 +458,207 @@ func TestPlanner_Convert_RangeAggregations(t *testing.T) { t.Logf("Optimized plan\n%s\n", PrintAsTree(physicalPlan)) } +func TestPlanner_Convert_Rate(t *testing.T) { + // logical plan for rate({ app="users" } | age > 21[5m]) + b := logical.NewBuilder( + &logical.MakeTable{ + Selector: &logical.BinOp{ + Left: logical.NewColumnRef("app", types.ColumnTypeLabel), + Right: logical.NewLiteral("users"), + Op: types.BinaryOpEq, + }, + Shard: logical.NewShard(0, 1), // no sharding + }, + ).Select( + &logical.BinOp{ + Left: logical.NewColumnRef("age", types.ColumnTypeMetadata), + Right: logical.NewLiteral(int64(21)), + Op: types.BinaryOpGt, + }, + ).Select( + &logical.BinOp{ + Left: logical.NewColumnRef("timestamp", types.ColumnTypeBuiltin), + Right: logical.NewLiteral(types.Timestamp(1742826126000000000)), + Op: types.BinaryOpLt, + }, + ).RangeAggregation( + []logical.ColumnRef{}, + types.RangeAggregationTypeCount, + time.Date(2023, 10, 1, 0, 0, 0, 0, time.UTC), // Start Time + time.Date(2023, 10, 1, 1, 0, 0, 0, time.UTC), // End Time + 0, // Step + time.Minute*5, // Range + ).BinOpRight( + types.BinaryOpDiv, logical.NewLiteral(int64(300)), + ) + + logicalPlan, err := b.ToPlan() + require.NoError(t, err) + + timeStart := time.Now() + timeEnd := timeStart.Add(time.Second * 10) + catalog := &catalog{ + sectionDescriptors: []*metastore.DataobjSectionDescriptor{ + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 3}, StreamIDs: []int64{1, 2}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj2", SectionIdx: 1}, StreamIDs: []int64{3, 4}, Start: timeStart, End: timeEnd}, + }, + } + planner := NewPlanner(NewContext(timeStart, timeEnd), catalog) + + physicalPlan, err := planner.Build(logicalPlan) + require.NoError(t, err) + t.Logf("Physical plan\n%s\n", PrintAsTree(physicalPlan)) + + physicalPlan, err = planner.Optimize(physicalPlan) + require.NoError(t, err) + t.Logf("Optimized plan\n%s\n", PrintAsTree(physicalPlan)) +} + +func TestPlanner_BuildMathExpressions(t *testing.T) { + // logical plan for (rate({ app="users" }[5m]) * 40) ^ 2 + b := logical.NewBuilder( + &logical.MakeTable{ + Selector: &logical.BinOp{ + Left: logical.NewColumnRef("app", types.ColumnTypeLabel), + Right: logical.NewLiteral("users"), + Op: types.BinaryOpEq, + }, + Shard: logical.NewShard(0, 1), // no sharding + }, + ).Select( + &logical.BinOp{ + Left: logical.NewColumnRef("age", types.ColumnTypeMetadata), + Right: logical.NewLiteral(int64(21)), + Op: types.BinaryOpGt, + }, + ).Select( + &logical.BinOp{ + Left: logical.NewColumnRef("timestamp", types.ColumnTypeBuiltin), + Right: logical.NewLiteral(types.Timestamp(1742826126000000000)), + Op: types.BinaryOpLt, + }, + ).RangeAggregation( + []logical.ColumnRef{}, + types.RangeAggregationTypeCount, + time.Date(2023, 10, 1, 0, 0, 0, 0, time.UTC), // Start Time + time.Date(2023, 10, 1, 1, 0, 0, 0, time.UTC), // End Time + 0, // Step + time.Minute*5, // Range + ).BinOpRight( + types.BinaryOpDiv, logical.NewLiteral(int64(300)), + ).BinOpRight( + types.BinaryOpMul, logical.NewLiteral(int64(40)), + ).BinOpRight( + types.BinaryOpPow, logical.NewLiteral(int64(2)), + ) + + logicalPlan, err := b.ToPlan() + require.NoError(t, err) + + timeStart := time.Now() + timeEnd := timeStart.Add(time.Second * 10) + catalog := &catalog{ + sectionDescriptors: []*metastore.DataobjSectionDescriptor{ + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 3}, StreamIDs: []int64{1, 2}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj2", SectionIdx: 1}, StreamIDs: []int64{3, 4}, Start: timeStart, End: timeEnd}, + }, + } + planner := NewPlanner(NewContext(timeStart, timeEnd), catalog) + + physicalPlan, err := planner.Build(logicalPlan) + require.NoError(t, err) + t.Logf("Physical plan\n%s\n", PrintAsTree(physicalPlan)) + + physicalPlan, err = planner.Optimize(physicalPlan) + require.NoError(t, err) + t.Logf("Optimized plan\n%s\n", PrintAsTree(physicalPlan)) +} + +func TestPlanner_BuildMathExpressionsWithTwoInputs(t *testing.T) { + // logical plan for rate({ env="prod", app="users" }[5m])) / rate({ env="prod" }[5m])) + b1 := logical.NewBuilder( + &logical.MakeTable{ + Selector: &logical.BinOp{ + Left: logical.NewColumnRef("env", types.ColumnTypeLabel), + Right: logical.NewLiteral("prod"), + Op: types.BinaryOpEq, + }, + Shard: logical.NewShard(0, 1), // no sharding + }, + ).Select( + &logical.BinOp{ + Left: logical.NewColumnRef("timestamp", types.ColumnTypeBuiltin), + Right: logical.NewLiteral(types.Timestamp(1742826126000000000)), + Op: types.BinaryOpLt, + }, + ).RangeAggregation( + []logical.ColumnRef{}, + types.RangeAggregationTypeCount, + time.Date(2023, 10, 1, 0, 0, 0, 0, time.UTC), // Start Time + time.Date(2023, 10, 1, 1, 0, 0, 0, time.UTC), // End Time + 0, // Step + time.Minute*5, // Range + ).BinOpRight( + types.BinaryOpDiv, logical.NewLiteral(float64(300)), + ) + b2 := logical.NewBuilder( + &logical.MakeTable{ + Selector: &logical.BinOp{ + Op: types.BinaryOpAnd, + Left: &logical.BinOp{ + Left: logical.NewColumnRef("env", types.ColumnTypeLabel), + Right: logical.NewLiteral("prod"), + Op: types.BinaryOpEq, + }, + Right: &logical.BinOp{ + Left: logical.NewColumnRef("app", types.ColumnTypeLabel), + Right: logical.NewLiteral("users"), + Op: types.BinaryOpEq, + }, + }, + Shard: logical.NewShard(0, 1), // no sharding + }, + ).Select( + &logical.BinOp{ + Left: logical.NewColumnRef("timestamp", types.ColumnTypeBuiltin), + Right: logical.NewLiteral(types.Timestamp(1742826126000000000)), + Op: types.BinaryOpLt, + }, + ).RangeAggregation( + []logical.ColumnRef{}, + types.RangeAggregationTypeCount, + time.Date(2023, 10, 1, 0, 0, 0, 0, time.UTC), // Start Time + time.Date(2023, 10, 1, 1, 0, 0, 0, time.UTC), // End Time + 0, // Step + time.Minute*5, // Range + ).BinOpRight( + types.BinaryOpDiv, logical.NewLiteral(float64(300)), + ).BinOpRight( + types.BinaryOpDiv, b1.Value(), + ) + + logicalPlan, err := b2.ToPlan() + require.NoError(t, err) + + timeStart := time.Now() + timeEnd := timeStart.Add(time.Second * 10) + catalog := &catalog{ + sectionDescriptors: []*metastore.DataobjSectionDescriptor{ + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 3}, StreamIDs: []int64{1, 2}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj2", SectionIdx: 1}, StreamIDs: []int64{3, 4}, Start: timeStart, End: timeEnd}, + }, + } + planner := NewPlanner(NewContext(timeStart, timeEnd), catalog) + + physicalPlan, err := planner.Build(logicalPlan) + require.NoError(t, err) + t.Logf("Physical plan\n%s\n", PrintAsTree(physicalPlan)) + + physicalPlan, err = planner.Optimize(physicalPlan) + require.NoError(t, err) + t.Logf("Optimized plan\n%s\n", PrintAsTree(physicalPlan)) +} + func TestPlanner_MakeTable_Ordering(t *testing.T) { // Two separate groups with different timestamps in each group now := time.Now() diff --git a/pkg/engine/internal/planner/physical/printer.go b/pkg/engine/internal/planner/physical/printer.go index 198f712f66577..c2d194353c392 100644 --- a/pkg/engine/internal/planner/physical/printer.go +++ b/pkg/engine/internal/planner/physical/printer.go @@ -77,6 +77,10 @@ func toTreeNode(n Node) *tree.Node { if len(node.RequestedKeys) > 0 { treeNode.Properties = append(treeNode.Properties, tree.NewProperty("requested_keys", true, toAnySlice(node.RequestedKeys)...)) } + case *MathExpression: + treeNode.Properties = []tree.Property{ + tree.NewProperty("expression", false, node.Expression.String()), + } case *ColumnCompat: treeNode.Properties = []tree.Property{ tree.NewProperty("src", false, node.Source), diff --git a/pkg/engine/internal/planner/physical/visitor.go b/pkg/engine/internal/planner/physical/visitor.go index 1f576c824f339..e99a4aae77d53 100644 --- a/pkg/engine/internal/planner/physical/visitor.go +++ b/pkg/engine/internal/planner/physical/visitor.go @@ -12,8 +12,10 @@ type Visitor interface { VisitLimit(*Limit) error VisitVectorAggregation(*VectorAggregation) error VisitParse(*ParseNode) error + VisitMathExpression(expression *MathExpression) error VisitCompat(*ColumnCompat) error VisitTopK(*TopK) error VisitParallelize(*Parallelize) error VisitScanSet(*ScanSet) error + VisitJoin(*Join) error } diff --git a/pkg/engine/internal/planner/physical/visitor_test.go b/pkg/engine/internal/planner/physical/visitor_test.go index f0bf76656835d..ec9f229917bb2 100644 --- a/pkg/engine/internal/planner/physical/visitor_test.go +++ b/pkg/engine/internal/planner/physical/visitor_test.go @@ -18,8 +18,10 @@ type nodeCollectVisitor struct { onVisitRangeAggregation func(*RangeAggregation) error onVisitVectorAggregation func(*VectorAggregation) error onVisitParse func(*ParseNode) error + onVisitMathExpression func(expression *MathExpression) error onVisitParallelize func(*Parallelize) error onVisitScanSet func(*ScanSet) error + onVisitJoin func(*Join) error } func (v *nodeCollectVisitor) VisitDataObjScan(n *DataObjScan) error { @@ -79,6 +81,14 @@ func (v *nodeCollectVisitor) VisitParse(n *ParseNode) error { return nil } +func (v *nodeCollectVisitor) VisitMathExpression(n *MathExpression) error { + if v.onVisitMathExpression != nil { + return v.onVisitMathExpression(n) + } + v.visited = append(v.visited, fmt.Sprintf("%s.%s", n.Type().String(), n.ID())) + return nil +} + func (v *nodeCollectVisitor) VisitCompat(*ColumnCompat) error { return nil } @@ -104,3 +114,11 @@ func (v *nodeCollectVisitor) VisitScanSet(n *ScanSet) error { v.visited = append(v.visited, fmt.Sprintf("%s.%s", n.Type().String(), n.ID())) return nil } + +func (v *nodeCollectVisitor) VisitJoin(n *Join) error { + if v.onVisitJoin != nil { + return v.onVisitJoin(n) + } + v.visited = append(v.visited, fmt.Sprintf("%s.%s", n.Type().String(), n.ID())) + return nil +} diff --git a/pkg/engine/internal/types/aggregations.go b/pkg/engine/internal/types/aggregations.go index ceaf2853e63fc..89a407a357572 100644 --- a/pkg/engine/internal/types/aggregations.go +++ b/pkg/engine/internal/types/aggregations.go @@ -12,10 +12,6 @@ const ( RangeAggregationTypeMin // Represents min_over_time range aggregation ) -var SupportedRangeAggregationTypes = []RangeAggregationType{ - RangeAggregationTypeCount, RangeAggregationTypeSum, RangeAggregationTypeMax, RangeAggregationTypeMin, -} - func (op RangeAggregationType) String() string { switch op { case RangeAggregationTypeCount: @@ -50,10 +46,6 @@ const ( VectorAggregationTypeSortDesc // Represents sort_desc vector aggregation ) -var SupportedVectorAggregationTypes = []VectorAggregationType{ - VectorAggregationTypeSum, VectorAggregationTypeMax, VectorAggregationTypeMin, VectorAggregationTypeCount, -} - func (op VectorAggregationType) String() string { switch op { case VectorAggregationTypeSum: diff --git a/pkg/engine/internal/types/operators.go b/pkg/engine/internal/types/operators.go index 7f437b8f69e13..5fb22db7dc752 100644 --- a/pkg/engine/internal/types/operators.go +++ b/pkg/engine/internal/types/operators.go @@ -51,6 +51,7 @@ const ( BinaryOpMul // Multiplication operation (*). BinaryOpDiv // Division operation (/). BinaryOpMod // Modulo operation (%). + BinaryOpPow // power/exponentiation operation (^). BinaryOpMatchSubstr // Substring matching operation (|=). Used for string match filter. BinaryOpNotMatchSubstr // Substring non-matching operation (!=). Used for string match filter. @@ -93,6 +94,8 @@ func (t BinaryOp) String() string { return "DIV" case BinaryOpMod: return "MOD" + case BinaryOpPow: + return "POW" case BinaryOpMatchSubstr: return "MATCH_STR" case BinaryOpNotMatchSubstr: