-
Notifications
You must be signed in to change notification settings - Fork 3.8k
feat(engine): basic math expressions #19407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 34 commits
c3fe4d1
605933c
3b5d6a4
7978ff5
e2e6e87
5a238b8
d88a7e0
a1bf376
eedbfaf
36f3540
1ef3a95
49c5215
88892c7
a2147e1
a10228a
14265a2
ffce0da
357450a
1ce7c78
75d4ab4
0ae79aa
7c556fb
206ac82
ef1e804
a7181c2
1f03fba
dd74932
fbad3d0
0c1453f
2fede5f
de1851c
42219bb
efcb3e8
51e100e
f78d9f1
7cae847
a8b5c3c
87c1ee0
3cce17d
7216e04
f57498f
32bcbc6
beebaba
bba1e87
23f20c8
dec254e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package executor | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/apache/arrow-go/v18/arrow" | ||
"github.com/apache/arrow-go/v18/arrow/array" | ||
|
||
"github.com/grafana/loki/v3/pkg/engine/internal/semconv" | ||
|
||
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" | ||
"github.com/grafana/loki/v3/pkg/engine/internal/types" | ||
) | ||
|
||
func NewMathExpressionPipeline(expr *physical.MathExpression, inputs []Pipeline, evaluator expressionEvaluator) *GenericPipeline { | ||
return newGenericPipeline(Local, func(ctx context.Context, inputs []Pipeline) (arrow.Record, error) { | ||
// Works only with a single input for now. | ||
input := inputs[0] | ||
batch, err := input.Read(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer batch.Release() | ||
|
||
// TODO make sure all inputs matches on timestamps | ||
|
||
fields := make([]arrow.Field, 0, len(inputs)) | ||
for i := range inputs { | ||
fields = append(fields, arrow.Field{ | ||
Name: fmt.Sprintf("float64.generated.input_%d", i), | ||
Type: types.Arrow.Float, | ||
}) | ||
|
||
} | ||
|
||
valColumnExpr := &physical.ColumnExpr{ | ||
Ref: types.ColumnRef{ | ||
Column: types.ColumnNameGeneratedValue, | ||
Type: types.ColumnTypeGenerated, | ||
}, | ||
} | ||
cols := make([]arrow.Array, 0, len(inputs)) | ||
inputVec, err := evaluator.eval(valColumnExpr, batch) // TODO read input[i] instead of batch | ||
if err != nil { | ||
return nil, err | ||
} | ||
inputData := inputVec.ToArray() | ||
if inputData.DataType().ID() != arrow.FLOAT64 { | ||
return nil, fmt.Errorf("expression returned non-float64 type %s", inputData.DataType()) | ||
} | ||
inputCol := inputData.(*array.Float64) | ||
defer inputCol.Release() | ||
|
||
cols = append(cols, inputCol) | ||
|
||
inputSchema := arrow.NewSchema(fields, nil) | ||
evalInput := array.NewRecord(inputSchema, cols, batch.NumRows()) | ||
defer evalInput.Release() | ||
|
||
res, err := evaluator.eval(expr.Expression, evalInput) | ||
if err != nil { | ||
return nil, err | ||
} | ||
data := res.ToArray() | ||
if data.DataType().ID() != arrow.FLOAT64 { | ||
return nil, fmt.Errorf("expression returned non-float64 type %s", data.DataType()) | ||
} | ||
valCol := data.(*array.Float64) | ||
defer valCol.Release() | ||
|
||
tsColumnExpr := &physical.ColumnExpr{ | ||
Ref: types.ColumnRef{ | ||
Column: types.ColumnNameBuiltinTimestamp, | ||
Type: types.ColumnTypeBuiltin, | ||
}, | ||
} | ||
tsVec, err := evaluator.eval(tsColumnExpr, batch) | ||
if err != nil { | ||
return nil, err | ||
} | ||
tsCol := tsVec.ToArray().(*array.Timestamp) | ||
defer tsCol.Release() | ||
|
||
outputSchema := arrow.NewSchema([]arrow.Field{ | ||
semconv.FieldFromIdent(semconv.NewIdentifier(types.ColumnNameBuiltinTimestamp, types.ColumnTypeBuiltin, types.Loki.Timestamp), true), | ||
semconv.FieldFromIdent(semconv.NewIdentifier(types.ColumnNameGeneratedValue, types.ColumnTypeGenerated, types.Loki.Float), true), | ||
}, nil) | ||
evaluatedRecord := array.NewRecord(outputSchema, []arrow.Array{tsCol, valCol}, batch.NumRows()) | ||
|
||
return evaluatedRecord, nil | ||
}, inputs...) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package executor | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/apache/arrow-go/v18/arrow" | ||
"github.com/apache/arrow-go/v18/arrow/memory" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/grafana/loki/v3/pkg/engine/internal/semconv" | ||
|
||
"github.com/grafana/loki/v3/pkg/util/arrowtest" | ||
|
||
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" | ||
"github.com/grafana/loki/v3/pkg/engine/internal/types" | ||
) | ||
|
||
func TestNewMathExpressionPipeline(t *testing.T) { | ||
colTs := "timestamp_ns.builtin.timestamp" | ||
colVal := "float64.generated.value" | ||
|
||
t.Run("calculates input_1 / 10", func(t *testing.T) { | ||
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) | ||
defer alloc.AssertSize(t, 0) | ||
|
||
schema := arrow.NewSchema([]arrow.Field{ | ||
semconv.FieldFromFQN(colTs, false), | ||
semconv.FieldFromFQN(colVal, false), | ||
}, nil) | ||
Comment on lines
29
to
34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you extend the test so it also has additional (grouping) columns? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added two more labels into the test |
||
|
||
rowsPipeline1 := []arrowtest.Rows{ | ||
{ | ||
{colTs: time.Unix(20, 0).UTC(), colVal: float64(230)}, | ||
{colTs: time.Unix(15, 0).UTC(), colVal: float64(120)}, | ||
{colTs: time.Unix(10, 0).UTC(), colVal: float64(260)}, | ||
{colTs: time.Unix(12, 0).UTC(), colVal: float64(250)}, | ||
}, | ||
} | ||
input1 := NewArrowtestPipeline(alloc, schema, rowsPipeline1...) | ||
|
||
mathExpr := &physical.MathExpression{ | ||
Expression: &physical.BinaryExpr{ | ||
Left: &physical.ColumnExpr{ | ||
Ref: types.ColumnRef{ | ||
Column: "input_0", | ||
Type: types.ColumnTypeGenerated, | ||
}, | ||
}, | ||
Right: physical.NewLiteral(float64(10)), | ||
Op: types.BinaryOpDiv, | ||
}, | ||
} | ||
|
||
pipeline := NewMathExpressionPipeline(mathExpr, []Pipeline{input1}, expressionEvaluator{}) | ||
defer pipeline.Close() | ||
|
||
// Read the pipeline output | ||
record, err := pipeline.Read(t.Context()) | ||
require.NoError(t, err) | ||
defer record.Release() | ||
|
||
expect := arrowtest.Rows{ | ||
{colTs: time.Unix(20, 0).UTC(), colVal: float64(23)}, | ||
{colTs: time.Unix(15, 0).UTC(), colVal: float64(12)}, | ||
{colTs: time.Unix(10, 0).UTC(), colVal: float64(26)}, | ||
{colTs: time.Unix(12, 0).UTC(), colVal: float64(25)}, | ||
} | ||
|
||
rows, err := arrowtest.RecordRows(record) | ||
require.NoError(t, err, "should be able to convert record back to rows") | ||
require.Equal(t, len(expect), len(rows), "number of rows should match") | ||
require.ElementsMatch(t, expect, rows) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think your implementation of the pipeline is correct.
What this pipeline is doing is returning a new
arrow.Record
with fieldsfloat64.generated.input_0
andtimestamp_ns.builtin.timestamp
, instead of preserving the existing column names.While this works for a specific subset of queries that have a vector aggregation without grouping, it does not work when there is a vector aggregation with grouping, e.g.
sum by (cluster, namespace) (count_over_time({env="prod"} [1m])) / 100)
.So instead of generating a new schema, you "only" need to apply the math function on the
float64.generated.value
(semconv.ColumnIdentValue
) column.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pipeline takes its inputs (just 1 for now, but multiple in general) and makes
input_0
,input_1
etc columns to evaluate the given math expression. The math expression is in the form ofinput_0 / 90 * 199 + input_1
regardless of what exactly those input pipelines are. After evaluation is done this pipeline returns two columnsvalue
andts
. This is a bug that I miss other columns (for groupings) in the output, and I will fix this. But it does not returninput_0
in any case.