Skip to content
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
c3fe4d1
merge:
spiridonov Oct 6, 2025
605933c
math expressions
spiridonov Oct 7, 2025
3b5d6a4
merge
spiridonov Oct 7, 2025
7978ff5
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 7, 2025
e2e6e87
fix merge
spiridonov Oct 7, 2025
5a238b8
format
spiridonov Oct 7, 2025
d88a7e0
format
spiridonov Oct 7, 2025
a1bf376
memory leaks
spiridonov Oct 7, 2025
eedbfaf
lint
spiridonov Oct 7, 2025
36f3540
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 8, 2025
1ef3a95
merge
spiridonov Oct 8, 2025
49c5215
wip
spiridonov Oct 8, 2025
88892c7
revert go.mod
spiridonov Oct 8, 2025
a2147e1
revert go.mod
spiridonov Oct 8, 2025
a10228a
revert go.mod
spiridonov Oct 8, 2025
14265a2
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 9, 2025
ffce0da
merge
spiridonov Oct 9, 2025
357450a
release
spiridonov Oct 9, 2025
1ce7c78
comment
spiridonov Oct 9, 2025
75d4ab4
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 9, 2025
0ae79aa
Merge branch 'main' into spiridonov-rate-aggregate
spiridonov Oct 9, 2025
7c556fb
do.mod
spiridonov Oct 9, 2025
206ac82
Merge branch 'spiridonov-rate-aggregate' of github.com:grafana/loki i…
spiridonov Oct 9, 2025
ef1e804
do.mod
spiridonov Oct 9, 2025
a7181c2
semconv
spiridonov Oct 9, 2025
1f03fba
Merge branch 'main' into spiridonov-rate-aggregate
spiridonov Oct 9, 2025
dd74932
Merge branch 'main' into spiridonov-rate-aggregate
spiridonov Oct 9, 2025
fbad3d0
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 10, 2025
0c1453f
Merge branch 'spiridonov-rate-aggregate' of github.com:grafana/loki i…
spiridonov Oct 10, 2025
2fede5f
Merge branch 'main' into spiridonov-rate-aggregate
spiridonov Oct 10, 2025
de1851c
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 14, 2025
42219bb
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 14, 2025
efcb3e8
removed rate
spiridonov Oct 14, 2025
51e100e
Merge branch 'spiridonov-rate-aggregate' of github.com:grafana/loki i…
spiridonov Oct 14, 2025
f78d9f1
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 15, 2025
7cae847
pr comments
spiridonov Oct 15, 2025
a8b5c3c
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 15, 2025
87c1ee0
format
spiridonov Oct 15, 2025
3cce17d
Merge branch 'main' into spiridonov-rate-aggregate
spiridonov Oct 15, 2025
7216e04
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 16, 2025
f57498f
input columns
spiridonov Oct 16, 2025
32bcbc6
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 16, 2025
beebaba
Merge branch 'spiridonov-rate-aggregate' of github.com:grafana/loki i…
spiridonov Oct 16, 2025
bba1e87
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 17, 2025
23f20c8
separate join and math expressions
spiridonov Oct 17, 2025
dec254e
Merge branch 'main' of github.com:grafana/loki into spiridonov-rate-a…
spiridonov Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/engine/internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline {
return tracePipeline("physical.VectorAggregation", c.executeVectorAggregation(ctx, n, inputs))
case *physical.ParseNode:
return tracePipeline("physical.ParseNode", c.executeParse(ctx, n, inputs))
case *physical.MathExpression:
return tracePipeline("physical.MathExpression", c.executeMathExpression(ctx, n, inputs))
case *physical.ColumnCompat:
return tracePipeline("physical.ColumnCompat", c.executeColumnCompat(ctx, n, inputs))
default:
Expand Down Expand Up @@ -429,6 +431,14 @@ func (c *Context) executeParse(ctx context.Context, parse *physical.ParseNode, i
return NewParsePipeline(parse, inputs[0], allocator)
}

func (c *Context) executeMathExpression(ctx context.Context, expr *physical.MathExpression, inputs []Pipeline) Pipeline {
if len(inputs) > 1 {
return errorPipeline(ctx, fmt.Errorf("only one input is currently supported in mathExpression, got %d", len(inputs)))
}

return NewMathExpressionPipeline(expr, inputs, c.evaluator)
}

func (c *Context) executeColumnCompat(ctx context.Context, compat *physical.ColumnCompat, inputs []Pipeline) Pipeline {
if len(inputs) == 0 {
return emptyPipeline()
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/internal/executor/expressions.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ var _ ColumnVector = (*Array)(nil)

// ToArray implements ColumnVector.
func (a *Array) ToArray() arrow.Array {
a.array.Retain()
// this should already has 1 ref counter
return a.array
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/engine/internal/executor/expressions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ func TestEvaluateBinaryExpression(t *testing.T) {
func collectBooleanColumnVector(vec ColumnVector) []bool {
res := make([]bool, 0, vec.Len())
arr := vec.ToArray().(*array.Boolean)
defer arr.Release()
for i := range int(vec.Len()) {
res = append(res, arr.Value(i))
}
Expand Down Expand Up @@ -327,10 +326,8 @@ null,null,null`
colVec, err := e.eval(colExpr, record)
require.NoError(t, err)
require.IsType(t, &CoalesceVector{}, colVec)
defer colVec.Release()

arr := colVec.ToArray()
defer arr.Release()
require.IsType(t, &array.String{}, arr)
stringArr := arr.(*array.String)

Expand Down
2 changes: 0 additions & 2 deletions pkg/engine/internal/executor/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres
if err != nil {
return nil, err
}
defer vec.Release()

arr := vec.ToArray()
// boolean filters are only used for filtering; they're not returned
// and must be released
Expand Down
140 changes: 99 additions & 41 deletions pkg/engine/internal/executor/functions.go

Large diffs are not rendered by default.

29 changes: 26 additions & 3 deletions pkg/engine/internal/executor/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func createFloat64Array(mem memory.Allocator, values []float64, nulls []bool) *A
// Helper function to extract boolean values from result
func extractBoolValues(result ColumnVector) []bool {
arr := result.ToArray().(*array.Boolean)
defer arr.Release()

values := make([]bool, arr.Len())
for i := 0; i < arr.Len(); i++ {
Expand Down Expand Up @@ -182,10 +181,34 @@ func TestBinaryFunctionRegistry_GetForSignature(t *testing.T) {
dataType: arrow.BinaryTypes.String,
expectError: false,
},
{
name: "valid div operation",
op: types.BinaryOpDiv,
dataType: arrow.PrimitiveTypes.Float64,
expectError: false,
},
{
name: "valid add operation",
op: types.BinaryOpAdd,
dataType: arrow.PrimitiveTypes.Float64,
expectError: false,
},
{
name: "valid Mul operation",
op: types.BinaryOpMul,
dataType: arrow.PrimitiveTypes.Float64,
expectError: false,
},
{
name: "valid sub operation",
op: types.BinaryOpSub,
dataType: arrow.PrimitiveTypes.Float64,
expectError: false,
},
{
name: "invalid operation",
op: types.BinaryOpAdd, // Not registered
dataType: arrow.PrimitiveTypes.Int64,
op: types.BinaryOpAnd, // Not registered
dataType: arrow.FixedWidthTypes.Boolean,
expectError: true,
},
{
Expand Down
88 changes: 88 additions & 0 deletions pkg/engine/internal/executor/math_expression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package executor

import (
"context"
"fmt"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"

"github.com/grafana/loki/v3/pkg/engine/internal/types"

"github.com/grafana/loki/v3/pkg/engine/internal/semconv"

"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
)

func NewMathExpressionPipeline(expr *physical.MathExpression, inputs []Pipeline, evaluator expressionEvaluator) *GenericPipeline {
return newGenericPipeline(Local, func(ctx context.Context, inputs []Pipeline) (arrow.Record, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think your implementation of the pipeline is correct.

What this pipeline is doing is returning a new arrow.Record with fields float64.generated.input_0 and timestamp_ns.builtin.timestamp, instead of preserving the existing column names.

While this works for a specific subset of queries that have a vector aggregation without grouping, it does not work when there is a vector aggregation with grouping, e.g. sum by (cluster, namespace) (count_over_time({env="prod"} [1m])) / 100).

Image Image

So instead of generating a new schema, you "only" need to apply the math function on the float64.generated.value (semconv.ColumnIdentValue) column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pipeline takes its inputs (just 1 for now, but multiple in general) and makes input_0, input_1 etc columns to evaluate the given math expression. The math expression is in the form of input_0 / 90 * 199 + input_1 regardless of what exactly those input pipelines are. After evaluation is done this pipeline returns two columns value and ts. This is a bug that I miss other columns (for groupings) in the output, and I will fix this. But it does not return input_0 in any case.

// Works only with a single input for now.
input := inputs[0]
batch, err := input.Read(ctx)
if err != nil {
return nil, err
}
defer batch.Release()

// TODO make sure all inputs matches on timestamps

fields := make([]arrow.Field, 0, len(inputs))
for i := range inputs {
fields = append(fields, semconv.FieldFromFQN(fmt.Sprintf("float64.generated.input_%d", i), false))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit confusing that for the fields you iterate over the inputs, but for the cols you do an explicit single append().

I think it would be clearer if you also only append a single field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


valColumnExpr := &physical.ColumnExpr{
Ref: types.ColumnRef{
Column: types.ColumnNameGeneratedValue,
Type: types.ColumnTypeGenerated,
},
}
cols := make([]arrow.Array, 0, len(inputs))
inputVec, err := evaluator.eval(valColumnExpr, batch) // TODO read input[i] instead of batch
if err != nil {
return nil, err
}
inputData := inputVec.ToArray()
if inputData.DataType().ID() != arrow.FLOAT64 {
return nil, fmt.Errorf("expression returned non-float64 type %s", inputData.DataType())
}
inputCol := inputData.(*array.Float64)
defer inputCol.Release()

cols = append(cols, inputCol)

inputSchema := arrow.NewSchema(fields, nil)
evalInput := array.NewRecord(inputSchema, cols, batch.NumRows())
defer evalInput.Release()

res, err := evaluator.eval(expr.Expression, evalInput)
if err != nil {
return nil, err
}
data := res.ToArray()
if data.DataType().ID() != arrow.FLOAT64 {
return nil, fmt.Errorf("expression returned non-float64 type %s", data.DataType())
}
valCol := data.(*array.Float64)
defer valCol.Release()

schema := batch.Schema()
valueCol := semconv.NewIdentifier(types.ColumnNameGeneratedValue, types.ColumnTypeGenerated, types.Loki.Float)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of valueCol you can use semconv.ColumnIdentValue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

outputFields := make([]arrow.Field, 0, schema.NumFields())
outputCols := make([]arrow.Array, 0, schema.NumFields())
for i := 0; i < schema.NumFields(); i++ {
field := schema.Field(i)
if field.Name != valueCol.FQN() {
outputFields = append(outputFields, field)
outputCols = append(outputCols, batch.Column(i))
}
}
outputFields = append(outputFields, semconv.FieldFromIdent(valueCol, false))
outputCols = append(outputCols, valCol)
outputSchema := arrow.NewSchema(outputFields, nil)

evaluatedRecord := array.NewRecord(outputSchema, outputCols, batch.NumRows())

return evaluatedRecord, nil
}, inputs...)
}
75 changes: 75 additions & 0 deletions pkg/engine/internal/executor/math_expression_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package executor

import (
"testing"
"time"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/engine/internal/semconv"

"github.com/grafana/loki/v3/pkg/util/arrowtest"

"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)

func TestNewMathExpressionPipeline(t *testing.T) {
colTs := "timestamp_ns.builtin.timestamp"
colVal := "float64.generated.value"

t.Run("calculates input_1 / 10", func(t *testing.T) {
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema([]arrow.Field{
semconv.FieldFromFQN(colTs, false),
semconv.FieldFromFQN(colVal, false),
}, nil)
Comment on lines 29 to 34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you extend the test so it also has additional (grouping) columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added two more labels into the test


rowsPipeline1 := []arrowtest.Rows{
{
{colTs: time.Unix(20, 0).UTC(), colVal: float64(230)},
{colTs: time.Unix(15, 0).UTC(), colVal: float64(120)},
{colTs: time.Unix(10, 0).UTC(), colVal: float64(260)},
{colTs: time.Unix(12, 0).UTC(), colVal: float64(250)},
},
}
input1 := NewArrowtestPipeline(alloc, schema, rowsPipeline1...)

mathExpr := &physical.MathExpression{
Expression: &physical.BinaryExpr{
Left: &physical.ColumnExpr{
Ref: types.ColumnRef{
Column: "input_0",
Type: types.ColumnTypeGenerated,
},
},
Right: physical.NewLiteral(float64(10)),
Op: types.BinaryOpDiv,
},
}

pipeline := NewMathExpressionPipeline(mathExpr, []Pipeline{input1}, expressionEvaluator{})
defer pipeline.Close()

// Read the pipeline output
record, err := pipeline.Read(t.Context())
require.NoError(t, err)
defer record.Release()

expect := arrowtest.Rows{
{colTs: time.Unix(20, 0).UTC(), colVal: float64(23)},
{colTs: time.Unix(15, 0).UTC(), colVal: float64(12)},
{colTs: time.Unix(10, 0).UTC(), colVal: float64(26)},
{colTs: time.Unix(12, 0).UTC(), colVal: float64(25)},
}

rows, err := arrowtest.RecordRows(record)
require.NoError(t, err, "should be able to convert record back to rows")
require.Equal(t, len(expect), len(rows), "number of rows should match")
require.ElementsMatch(t, expect, rows)
})
}
1 change: 0 additions & 1 deletion pkg/engine/internal/executor/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, eva
if err != nil {
return nil, err
}
defer vec.Release()
ident := semconv.NewIdentifier(columnNames[i], vec.ColumnType(), vec.Type())
fields = append(fields, semconv.FieldFromIdent(ident, true))
arr := vec.ToArray()
Expand Down
2 changes: 0 additions & 2 deletions pkg/engine/internal/executor/range_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro
if err != nil {
return nil, err
}
defer vec.Release()

if vec.Type() != types.Loki.String {
return nil, fmt.Errorf("unsupported datatype for partitioning %s", vec.Type())
Expand All @@ -180,7 +179,6 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro
if err != nil {
return nil, err
}
defer tsVec.Release()
tsCol := tsVec.ToArray().(*array.Timestamp)
defer tsCol.Release()

Expand Down
4 changes: 0 additions & 4 deletions pkg/engine/internal/executor/sortmerge.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,12 @@ loop:
if err != nil {
return nil, err
}
defer col.Release()

tsCol, ok := col.ToArray().(*array.Timestamp)
if !ok {
return nil, errors.New("column is not a timestamp column")
}
ts := tsCol.Value(int(p.offsets[i]))
tsCol.Release()

// Populate slices for sorting
inputIndexes = append(inputIndexes, i)
Expand Down Expand Up @@ -202,13 +200,11 @@ loop:
if err != nil {
return nil, err
}
defer col.Release()
// We assume the column is a Uint64 array
tsCol, ok := col.ToArray().(*array.Timestamp)
if !ok {
return nil, errors.New("column is not a timestamp column")
}
defer tsCol.Release()

// Calculate start/end of the sub-slice of the record
start := p.offsets[j]
Expand Down
2 changes: 0 additions & 2 deletions pkg/engine/internal/executor/sortmerge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,8 @@ func TestSortMerge(t *testing.T) {
}

tsCol, err := c.evaluator.eval(merge.Column, batch)
defer tsCol.Release()
require.NoError(t, err)
arr := tsCol.ToArray().(*array.Timestamp)
defer arr.Release()

timestamps = append(timestamps, arr.Values()...)
batches++
Expand Down
3 changes: 0 additions & 3 deletions pkg/engine/internal/executor/vector_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err
if err != nil {
return nil, err
}
defer tsVec.Release()
tsCol := tsVec.ToArray().(*array.Timestamp)
defer tsCol.Release()

Expand All @@ -111,7 +110,6 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err
if err != nil {
return nil, err
}
defer valueVec.Release()
valueArr := valueVec.ToArray().(*array.Float64)
defer valueArr.Release()

Expand All @@ -123,7 +121,6 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err
if err != nil {
return nil, err
}
defer vec.Release()

if vec.Type() != types.Loki.String {
return nil, fmt.Errorf("unsupported datatype for grouping %s", vec.Type())
Expand Down
22 changes: 22 additions & 0 deletions pkg/engine/internal/planner/logical/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,28 @@ func (b *Builder) Sort(column ColumnRef, ascending, nullsFirst bool) *Builder {
}
}

// BinOpRight adds a binary arithmetic operation with a given right value
func (b *Builder) BinOpRight(op types.BinaryOp, right Value) *Builder {
return &Builder{
val: &BinOp{
Left: b.val,
Right: right,
Op: op,
},
}
}

// BinOpLeft adds a binary arithmetic operation with a given left value
func (b *Builder) BinOpLeft(op types.BinaryOp, left Value) *Builder {
return &Builder{
val: &BinOp{
Left: left,
Right: b.val,
Op: op,
},
}
}

// RangeAggregation applies a [RangeAggregation] operation to the Builder.
func (b *Builder) RangeAggregation(
partitionBy []ColumnRef,
Expand Down
Loading