From 6e34f7cc6bbc301162bcba16adad938ae3438b3d Mon Sep 17 00:00:00 2001 From: Stas Spiridonov Date: Wed, 8 Oct 2025 10:03:06 -0400 Subject: [PATCH 1/5] eval memory ownership --- pkg/engine/internal/executor/executor.go | 5 ++- pkg/engine/internal/executor/expressions.go | 26 +++++++++++- pkg/engine/internal/executor/filter.go | 27 +++++++----- pkg/engine/internal/executor/filter_test.go | 42 ++++++++++++------- .../internal/executor/range_aggregation.go | 9 +++- 5 files changed, 80 insertions(+), 29 deletions(-) diff --git a/pkg/engine/internal/executor/executor.go b/pkg/engine/internal/executor/executor.go index 4d2d03426818c..570162c318ea2 100644 --- a/pkg/engine/internal/executor/executor.go +++ b/pkg/engine/internal/executor/executor.go @@ -312,7 +312,10 @@ func (c *Context) executeFilter(ctx context.Context, filter *physical.Filter, in return errorPipeline(ctx, fmt.Errorf("filter expects exactly one input, got %d", len(inputs))) } - return NewFilterPipeline(filter, inputs[0], c.evaluator) + // Use memory allocator from context or default + allocator := memory.DefaultAllocator + + return NewFilterPipeline(filter, inputs[0], c.evaluator, allocator) } func (c *Context) executeMerge(ctx context.Context, _ *physical.Merge, inputs []Pipeline) Pipeline { diff --git a/pkg/engine/internal/executor/expressions.go b/pkg/engine/internal/executor/expressions.go index c26885e7b7e08..b5b3040f385ff 100644 --- a/pkg/engine/internal/executor/expressions.go +++ b/pkg/engine/internal/executor/expressions.go @@ -41,8 +41,10 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record) continue } + col := input.Column(idx) + col.Retain() return &Array{ - array: input.Column(idx), + array: col, dt: types.MustFromString(dt), ct: types.ColumnTypeFromString(ct), rows: input.NumRows(), @@ -69,8 +71,10 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record) return nil, fmt.Errorf("column %s has datatype %s, but expression expects string", expr.Ref.Column, dt) } + col := input.Column(idx) + col.Retain() vecs = append(vecs, &Array{ - array: input.Column(idx), + array: col, dt: types.MustFromString(dt), ct: types.ColumnTypeFromString(ct), rows: input.NumRows(), @@ -107,6 +111,7 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record) if err != nil { return nil, err } + defer lhr.Release() fn, err := unaryFunctions.GetForSignature(expr.Op, lhr.Type().ArrowType()) if err != nil { @@ -119,10 +124,12 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record) if err != nil { return nil, err } + defer lhs.Release() rhs, err := e.eval(expr.Right, input) if err != nil { return nil, err } + defer rhs.Release() // At the moment we only support functions that accept the same input types. // TODO(chaudum): Compare Loki type, not Arrow type @@ -162,6 +169,8 @@ type ColumnVector interface { ColumnType() types.ColumnType // Len returns the length of the vector Len() int64 + // Release decreases the reference count by 1 on underlying Arrow array + Release() } // Scalar represents a single value repeated any number of times. @@ -223,6 +232,10 @@ func (v *Scalar) ColumnType() types.ColumnType { return v.ct } +// Release implements ColumnVector. +func (v *Scalar) Release() { +} + // Len implements ColumnVector. func (v *Scalar) Len() int64 { return v.rows @@ -280,6 +293,11 @@ func (a *Array) Len() int64 { return int64(a.array.Len()) } +// Release implements ColumnVector. +func (a *Array) Release() { + a.array.Release() +} + // CoalesceVector represents multiple columns with the same name but different [types.ColumnType] // Vectors are ordered by precedence (highest precedence first). type CoalesceVector struct { @@ -340,3 +358,7 @@ func (m *CoalesceVector) ColumnType() types.ColumnType { func (m *CoalesceVector) Len() int64 { return m.rows } + +// Release implements ColumnVector. +func (m *CoalesceVector) Release() { +} diff --git a/pkg/engine/internal/executor/filter.go b/pkg/engine/internal/executor/filter.go index cde790a3baf8e..5b9aeade283e9 100644 --- a/pkg/engine/internal/executor/filter.go +++ b/pkg/engine/internal/executor/filter.go @@ -11,7 +11,7 @@ import ( "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" ) -func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expressionEvaluator) *GenericPipeline { +func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expressionEvaluator, allocator memory.Allocator) *GenericPipeline { return newGenericPipeline(Local, func(ctx context.Context, inputs []Pipeline) state { // Pull the next item from the input pipeline input := inputs[0] @@ -19,6 +19,7 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres if err != nil { return failureState(err) } + defer batch.Release() cols := make([]*array.Boolean, 0, len(filter.Predicates)) defer func() { @@ -43,7 +44,7 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres cols = append(cols, casted) } - filtered := filterBatch(batch, func(i int) bool { + filtered := filterBatch(batch, allocator, func(i int) bool { for _, p := range cols { if !p.IsValid(i) || !p.Value(i) { return false @@ -53,7 +54,6 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres }) return successState(filtered) - }, input) } @@ -66,8 +66,7 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres // pushdown optimizations. // // We should re-think this approach. -func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record { - mem := memory.NewGoAllocator() +func filterBatch(batch arrow.Record, allocator memory.Allocator, include func(int) bool) arrow.Record { fields := batch.Schema().Fields() builders := make([]array.Builder, len(fields)) @@ -85,7 +84,7 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record { switch field.Type.ID() { case arrow.BOOL: - builder := array.NewBooleanBuilder(mem) + builder := array.NewBooleanBuilder(allocator) builders[i] = builder additions[i] = func(offset int) { src := batch.Column(i).(*array.Boolean) @@ -93,7 +92,7 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record { } case arrow.STRING: - builder := array.NewStringBuilder(mem) + builder := array.NewStringBuilder(allocator) builders[i] = builder additions[i] = func(offset int) { src := batch.Column(i).(*array.String) @@ -101,7 +100,7 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record { } case arrow.UINT64: - builder := array.NewUint64Builder(mem) + builder := array.NewUint64Builder(allocator) builders[i] = builder additions[i] = func(offset int) { src := batch.Column(i).(*array.Uint64) @@ -109,7 +108,7 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record { } case arrow.INT64: - builder := array.NewInt64Builder(mem) + builder := array.NewInt64Builder(allocator) builders[i] = builder additions[i] = func(offset int) { src := batch.Column(i).(*array.Int64) @@ -117,7 +116,7 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record { } case arrow.FLOAT64: - builder := array.NewFloat64Builder(mem) + builder := array.NewFloat64Builder(allocator) builders[i] = builder additions[i] = func(offset int) { src := batch.Column(i).(*array.Float64) @@ -125,7 +124,7 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record { } case arrow.TIMESTAMP: - builder := array.NewTimestampBuilder(mem, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"}) + builder := array.NewTimestampBuilder(allocator, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"}) builders[i] = builder additions[i] = func(offset int) { src := batch.Column(i).(*array.Timestamp) @@ -160,5 +159,11 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record { arrays[i] = builder.NewArray() } + defer func() { + for _, a := range arrays { + a.Release() + } + }() + return array.NewRecord(schema, arrays, ct) } diff --git a/pkg/engine/internal/executor/filter_test.go b/pkg/engine/internal/executor/filter_test.go index 04e943c86f096..09686e0a4e4e2 100644 --- a/pkg/engine/internal/executor/filter_test.go +++ b/pkg/engine/internal/executor/filter_test.go @@ -19,7 +19,9 @@ func TestNewFilterPipeline(t *testing.T) { } t.Run("filter with true literal predicate", func(t *testing.T) { - alloc := memory.DefaultAllocator + alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer alloc.AssertSize(t, 0) + schema := arrow.NewSchema(fields, nil) // Create input data using arrowtest.Rows @@ -41,7 +43,7 @@ func TestNewFilterPipeline(t *testing.T) { } // Create filter pipeline - pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}) + pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc) defer pipeline.Close() // Read the pipeline output @@ -56,7 +58,9 @@ func TestNewFilterPipeline(t *testing.T) { }) t.Run("filter with false literal predicate", func(t *testing.T) { - alloc := memory.DefaultAllocator + alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer alloc.AssertSize(t, 0) + schema := arrow.NewSchema(fields, nil) // Create input data using arrowtest.Rows @@ -80,7 +84,7 @@ func TestNewFilterPipeline(t *testing.T) { } // Create filter pipeline - pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}) + pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc) defer pipeline.Close() // Read the pipeline output @@ -92,7 +96,9 @@ func TestNewFilterPipeline(t *testing.T) { }) t.Run("filter on boolean column with column expression", func(t *testing.T) { - alloc := memory.DefaultAllocator + alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer alloc.AssertSize(t, 0) + schema := arrow.NewSchema(fields, nil) // Create input data using arrowtest.Rows @@ -117,7 +123,7 @@ func TestNewFilterPipeline(t *testing.T) { } // Create filter pipeline - pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}) + pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc) defer pipeline.Close() // Create expected output (only rows where valid=true) @@ -138,7 +144,9 @@ func TestNewFilterPipeline(t *testing.T) { }) t.Run("filter on multiple columns with binary expressions", func(t *testing.T) { - alloc := memory.DefaultAllocator + alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer alloc.AssertSize(t, 0) + schema := arrow.NewSchema(fields, nil) // Create input data using arrowtest.Rows @@ -170,7 +178,7 @@ func TestNewFilterPipeline(t *testing.T) { } // Create filter pipeline - pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}) + pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc) defer pipeline.Close() // Create expected output (only rows where name=="Bob" AND valid!=false) @@ -191,7 +199,9 @@ func TestNewFilterPipeline(t *testing.T) { // TODO: instead of returning empty batch, filter should read the next non-empty batch. t.Run("filter on empty batch", func(t *testing.T) { - alloc := memory.DefaultAllocator + alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer alloc.AssertSize(t, 0) + schema := arrow.NewSchema(fields, nil) // Create empty input data using arrowtest.Rows @@ -210,7 +220,7 @@ func TestNewFilterPipeline(t *testing.T) { } // Create filter pipeline - pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}) + pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc) defer pipeline.Close() record, err := pipeline.Read(t.Context()) @@ -223,7 +233,9 @@ func TestNewFilterPipeline(t *testing.T) { }) t.Run("filter with multiple input batches", func(t *testing.T) { - alloc := memory.DefaultAllocator + alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer alloc.AssertSize(t, 0) + schema := arrow.NewSchema(fields, nil) // Create input data split across multiple batches using arrowtest.Rows @@ -251,7 +263,7 @@ func TestNewFilterPipeline(t *testing.T) { } // Create filter pipeline - pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}) + pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc) defer pipeline.Close() // Create expected output (only rows where valid=true) @@ -284,7 +296,9 @@ func TestNewFilterPipeline(t *testing.T) { }) t.Run("filter with null values", func(t *testing.T) { - alloc := memory.DefaultAllocator + alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer alloc.AssertSize(t, 0) + schema := arrow.NewSchema(fields, nil) // Create input data with null values @@ -309,7 +323,7 @@ func TestNewFilterPipeline(t *testing.T) { } // Create filter pipeline - pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}) + pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc) defer pipeline.Close() // Create expected output (only rows where valid=true, including null name) diff --git a/pkg/engine/internal/executor/range_aggregation.go b/pkg/engine/internal/executor/range_aggregation.go index 350f2244fcbf1..3b452c2e4829b 100644 --- a/pkg/engine/internal/executor/range_aggregation.go +++ b/pkg/engine/internal/executor/range_aggregation.go @@ -152,9 +152,9 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro } return nil, err } + defer record.Release() inputsExhausted = false - defer record.Release() // extract all the columns that are used for partitioning arrays := make([]*array.String, 0, len(r.opts.partitionBy)) @@ -170,6 +170,11 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro arrays = append(arrays, vec.ToArray().(*array.String)) } + defer func() { + for _, a := range arrays { + a.Release() + } + }() // extract timestamp column to check if the entry is in range tsVec, err := r.evaluator.eval(tsColumnExpr, record) @@ -177,6 +182,7 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro return nil, err } tsCol := tsVec.ToArray().(*array.Timestamp) + defer tsCol.Release() // no need to extract value column for COUNT aggregation var valVec ColumnVector @@ -185,6 +191,7 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro if err != nil { return nil, err } + defer valVec.Release() } for row := range int(record.NumRows()) { From bd96719b415d0a6c4607983d86c4481dc6c3d7d8 Mon Sep 17 00:00:00 2001 From: Stas Spiridonov Date: Wed, 8 Oct 2025 10:47:36 -0400 Subject: [PATCH 2/5] more pipelines --- pkg/engine/internal/executor/project.go | 7 ++++++- pkg/engine/internal/executor/project_test.go | 8 ++++++-- pkg/engine/internal/executor/vector_aggregate.go | 9 +++++++++ pkg/engine/internal/executor/vector_aggregate_test.go | 8 ++++++-- 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/pkg/engine/internal/executor/project.go b/pkg/engine/internal/executor/project.go index 322ce56c65aae..1747a79fe6c51 100644 --- a/pkg/engine/internal/executor/project.go +++ b/pkg/engine/internal/executor/project.go @@ -30,8 +30,14 @@ func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, eva if err != nil { return failureState(err) } + defer batch.Release() projected := make([]arrow.Array, 0, len(columns)) + defer func() { + for _, proj := range projected { + proj.Release() + } + }() fields := make([]arrow.Field, 0, len(columns)) for i := range columns { @@ -47,7 +53,6 @@ func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, eva // Create a new record with only the projected columns // retain the projected columns in a new batch then release the original record. projectedRecord := array.NewRecord(schema, projected, batch.NumRows()) - batch.Release() return successState(projectedRecord) }, input), nil } diff --git a/pkg/engine/internal/executor/project_test.go b/pkg/engine/internal/executor/project_test.go index fc91b018a0ad5..c1cf7d681949d 100644 --- a/pkg/engine/internal/executor/project_test.go +++ b/pkg/engine/internal/executor/project_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/memory" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" @@ -18,9 +19,12 @@ func TestNewProjectPipeline(t *testing.T) { } t.Run("project single column", func(t *testing.T) { + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer alloc.AssertSize(t, 0) + // Create input data inputCSV := "Alice,30,New York\nBob,25,Boston\nCharlie,35,Seattle" - inputRecord, err := CSVToArrow(fields, inputCSV) + inputRecord, err := CSVToArrowWithAllocator(alloc, fields, inputCSV) require.NoError(t, err) defer inputRecord.Release() @@ -43,7 +47,7 @@ func TestNewProjectPipeline(t *testing.T) { expectedFields := []arrow.Field{ {Name: "name", Type: types.Arrow.String, Metadata: types.ColumnMetadata(types.ColumnTypeBuiltin, types.Loki.String)}, } - expectedRecord, err := CSVToArrow(expectedFields, expectedCSV) + expectedRecord, err := CSVToArrowWithAllocator(alloc, expectedFields, expectedCSV) require.NoError(t, err) defer expectedRecord.Release() diff --git a/pkg/engine/internal/executor/vector_aggregate.go b/pkg/engine/internal/executor/vector_aggregate.go index 7a77952298296..904b7ca6a30e6 100644 --- a/pkg/engine/internal/executor/vector_aggregate.go +++ b/pkg/engine/internal/executor/vector_aggregate.go @@ -93,6 +93,7 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err } return nil, err } + defer record.Release() inputsExhausted = false @@ -102,6 +103,7 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err return nil, err } tsCol := tsVec.ToArray().(*array.Timestamp) + defer tsCol.Release() // extract value column valueVec, err := v.valueEval(record) @@ -109,9 +111,16 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err return nil, err } valueArr := valueVec.ToArray().(*array.Float64) + defer valueArr.Release() // extract all the columns that are used for grouping arrays := make([]*array.String, 0, len(v.groupBy)) + defer func() { + for _, array := range arrays { + array.Release() + } + }() + for _, columnExpr := range v.groupBy { vec, err := v.evaluator.eval(columnExpr, record) if err != nil { diff --git a/pkg/engine/internal/executor/vector_aggregate_test.go b/pkg/engine/internal/executor/vector_aggregate_test.go index 5460640fac3f8..c96359a62cbad 100644 --- a/pkg/engine/internal/executor/vector_aggregate_test.go +++ b/pkg/engine/internal/executor/vector_aggregate_test.go @@ -8,6 +8,7 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" @@ -15,6 +16,9 @@ import ( ) func TestVectorAggregationPipeline(t *testing.T) { + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer alloc.AssertSize(t, 0) + // input schema with timestamp, value and group by columns fields := []arrow.Field{ {Name: types.ColumnNameBuiltinTimestamp, Type: types.Arrow.Timestamp, Metadata: types.ColumnMetadataBuiltinTimestamp}, @@ -55,11 +59,11 @@ func TestVectorAggregationPipeline(t *testing.T) { fmt.Sprintf("%s,40,dev,app2", t3.Format(arrowTimestampFormat)), }, "\n") - input1Record, err := CSVToArrow(fields, input1CSV) + input1Record, err := CSVToArrowWithAllocator(alloc, fields, input1CSV) require.NoError(t, err) defer input1Record.Release() - input2Record, err := CSVToArrow(fields, input2CSV) + input2Record, err := CSVToArrowWithAllocator(alloc, fields, input2CSV) require.NoError(t, err) defer input2Record.Release() From 3823411b7d706f53bb135ba2b63658526e6a945b Mon Sep 17 00:00:00 2001 From: Stas Spiridonov Date: Wed, 8 Oct 2025 16:20:53 -0400 Subject: [PATCH 3/5] default allocator --- pkg/engine/internal/executor/filter_test.go | 14 ++++++------- .../internal/executor/functions_test.go | 20 +++++++++---------- pkg/engine/internal/executor/pipeline_test.go | 2 +- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/engine/internal/executor/filter_test.go b/pkg/engine/internal/executor/filter_test.go index 09686e0a4e4e2..620cb3da0dacf 100644 --- a/pkg/engine/internal/executor/filter_test.go +++ b/pkg/engine/internal/executor/filter_test.go @@ -19,7 +19,7 @@ func TestNewFilterPipeline(t *testing.T) { } t.Run("filter with true literal predicate", func(t *testing.T) { - alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) defer alloc.AssertSize(t, 0) schema := arrow.NewSchema(fields, nil) @@ -58,7 +58,7 @@ func TestNewFilterPipeline(t *testing.T) { }) t.Run("filter with false literal predicate", func(t *testing.T) { - alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) defer alloc.AssertSize(t, 0) schema := arrow.NewSchema(fields, nil) @@ -96,7 +96,7 @@ func TestNewFilterPipeline(t *testing.T) { }) t.Run("filter on boolean column with column expression", func(t *testing.T) { - alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) defer alloc.AssertSize(t, 0) schema := arrow.NewSchema(fields, nil) @@ -144,7 +144,7 @@ func TestNewFilterPipeline(t *testing.T) { }) t.Run("filter on multiple columns with binary expressions", func(t *testing.T) { - alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) defer alloc.AssertSize(t, 0) schema := arrow.NewSchema(fields, nil) @@ -199,7 +199,7 @@ func TestNewFilterPipeline(t *testing.T) { // TODO: instead of returning empty batch, filter should read the next non-empty batch. t.Run("filter on empty batch", func(t *testing.T) { - alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) defer alloc.AssertSize(t, 0) schema := arrow.NewSchema(fields, nil) @@ -233,7 +233,7 @@ func TestNewFilterPipeline(t *testing.T) { }) t.Run("filter with multiple input batches", func(t *testing.T) { - alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) defer alloc.AssertSize(t, 0) schema := arrow.NewSchema(fields, nil) @@ -296,7 +296,7 @@ func TestNewFilterPipeline(t *testing.T) { }) t.Run("filter with null values", func(t *testing.T) { - alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) defer alloc.AssertSize(t, 0) schema := arrow.NewSchema(fields, nil) diff --git a/pkg/engine/internal/executor/functions_test.go b/pkg/engine/internal/executor/functions_test.go index 035b336887ff0..45afeabc68a7e 100644 --- a/pkg/engine/internal/executor/functions_test.go +++ b/pkg/engine/internal/executor/functions_test.go @@ -210,7 +210,7 @@ func TestBinaryFunctionRegistry_GetForSignature(t *testing.T) { } func TestBooleanComparisonFunctions(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) tests := []struct { @@ -284,7 +284,7 @@ func TestBooleanComparisonFunctions(t *testing.T) { } func TestStringComparisonFunctions(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) tests := []struct { @@ -358,7 +358,7 @@ func TestStringComparisonFunctions(t *testing.T) { } func TestIntegerComparisonFunctions(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) tests := []struct { @@ -432,7 +432,7 @@ func TestIntegerComparisonFunctions(t *testing.T) { } func TestTimestampComparisonFunctions(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) tests := []struct { @@ -506,7 +506,7 @@ func TestTimestampComparisonFunctions(t *testing.T) { } func TestFloat64ComparisonFunctions(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) tests := []struct { @@ -580,7 +580,7 @@ func TestFloat64ComparisonFunctions(t *testing.T) { } func TestStringMatchingFunctions(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) tests := []struct { @@ -654,7 +654,7 @@ func TestStringMatchingFunctions(t *testing.T) { } func TestNullValueHandling(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) tests := []struct { @@ -718,7 +718,7 @@ func TestNullValueHandling(t *testing.T) { } func TestArrayLengthMismatch(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) tests := []struct { @@ -776,7 +776,7 @@ func TestArrayLengthMismatch(t *testing.T) { } func TestRegexCompileError(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) // Test with invalid regex patterns @@ -819,7 +819,7 @@ func TestBoolToIntConversion(t *testing.T) { } func TestEmptyArrays(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) // Test with empty arrays diff --git a/pkg/engine/internal/executor/pipeline_test.go b/pkg/engine/internal/executor/pipeline_test.go index 9fcff3ca306c4..cbcc12fd79872 100644 --- a/pkg/engine/internal/executor/pipeline_test.go +++ b/pkg/engine/internal/executor/pipeline_test.go @@ -144,7 +144,7 @@ func (i *instrumentedPipeline) Transport() Transport { var _ Pipeline = (*instrumentedPipeline)(nil) func Test_prefetchWrapper_Read(t *testing.T) { - alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) defer alloc.AssertSize(t, 0) batch1 := arrowtest.Rows{ From acfd9c830651dea5f61939b57daaa06e73aeb8c4 Mon Sep 17 00:00:00 2001 From: Stas Spiridonov Date: Thu, 9 Oct 2025 10:22:57 -0400 Subject: [PATCH 4/5] defer closer to creation --- pkg/engine/internal/executor/filter.go | 37 ++++--------------- pkg/engine/internal/executor/project.go | 9 ++--- .../internal/executor/range_aggregation.go | 10 ++--- .../internal/executor/vector_aggregate.go | 10 ++--- 4 files changed, 19 insertions(+), 47 deletions(-) diff --git a/pkg/engine/internal/executor/filter.go b/pkg/engine/internal/executor/filter.go index 5b9aeade283e9..c6aff7dc519c2 100644 --- a/pkg/engine/internal/executor/filter.go +++ b/pkg/engine/internal/executor/filter.go @@ -22,14 +22,6 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres defer batch.Release() cols := make([]*array.Boolean, 0, len(filter.Predicates)) - defer func() { - for _, col := range cols { - // boolean filters are only used for filtering; they're not returned - // and must be released - // TODO: verify this once the evaluator implementation is fleshed out - col.Release() - } - }() for i, pred := range filter.Predicates { res, err := evaluator.eval(pred, batch) @@ -37,6 +29,11 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres return failureState(err) } data := res.ToArray() + + // boolean filters are only used for filtering; they're not returned + // and must be released + defer data.Release() + if data.DataType().ID() != arrow.BOOL { return failureState(fmt.Errorf("predicate %d returned non-boolean type %s", i, data.DataType())) } @@ -70,18 +67,9 @@ func filterBatch(batch arrow.Record, allocator memory.Allocator, include func(in fields := batch.Schema().Fields() builders := make([]array.Builder, len(fields)) - defer func() { - for _, b := range builders { - if b != nil { - b.Release() - } - } - }() - additions := make([]func(int), len(fields)) for i, field := range fields { - switch field.Type.ID() { case arrow.BOOL: builder := array.NewBooleanBuilder(allocator) @@ -90,7 +78,6 @@ func filterBatch(batch arrow.Record, allocator memory.Allocator, include func(in src := batch.Column(i).(*array.Boolean) builder.Append(src.Value(offset)) } - case arrow.STRING: builder := array.NewStringBuilder(allocator) builders[i] = builder @@ -98,7 +85,6 @@ func filterBatch(batch arrow.Record, allocator memory.Allocator, include func(in src := batch.Column(i).(*array.String) builder.Append(src.Value(offset)) } - case arrow.UINT64: builder := array.NewUint64Builder(allocator) builders[i] = builder @@ -106,7 +92,6 @@ func filterBatch(batch arrow.Record, allocator memory.Allocator, include func(in src := batch.Column(i).(*array.Uint64) builder.Append(src.Value(offset)) } - case arrow.INT64: builder := array.NewInt64Builder(allocator) builders[i] = builder @@ -114,7 +99,6 @@ func filterBatch(batch arrow.Record, allocator memory.Allocator, include func(in src := batch.Column(i).(*array.Int64) builder.Append(src.Value(offset)) } - case arrow.FLOAT64: builder := array.NewFloat64Builder(allocator) builders[i] = builder @@ -122,7 +106,6 @@ func filterBatch(batch arrow.Record, allocator memory.Allocator, include func(in src := batch.Column(i).(*array.Float64) builder.Append(src.Value(offset)) } - case arrow.TIMESTAMP: builder := array.NewTimestampBuilder(allocator, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"}) builders[i] = builder @@ -130,10 +113,11 @@ func filterBatch(batch arrow.Record, allocator memory.Allocator, include func(in src := batch.Column(i).(*array.Timestamp) builder.Append(src.Value(offset)) } - default: panic(fmt.Sprintf("unimplemented type in filterBatch: %s", field.Type.Name())) } + + defer builders[i].Release() } var ct int64 @@ -157,13 +141,8 @@ func filterBatch(batch arrow.Record, allocator memory.Allocator, include func(in arrays := make([]arrow.Array, len(fields)) for i, builder := range builders { arrays[i] = builder.NewArray() + defer arrays[i].Release() } - defer func() { - for _, a := range arrays { - a.Release() - } - }() - return array.NewRecord(schema, arrays, ct) } diff --git a/pkg/engine/internal/executor/project.go b/pkg/engine/internal/executor/project.go index 1747a79fe6c51..832e3c22e5643 100644 --- a/pkg/engine/internal/executor/project.go +++ b/pkg/engine/internal/executor/project.go @@ -33,11 +33,6 @@ func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, eva defer batch.Release() projected := make([]arrow.Array, 0, len(columns)) - defer func() { - for _, proj := range projected { - proj.Release() - } - }() fields := make([]arrow.Field, 0, len(columns)) for i := range columns { @@ -46,7 +41,9 @@ func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, eva return failureState(err) } fields = append(fields, arrow.Field{Name: columnNames[i], Type: vec.Type().ArrowType(), Metadata: types.ColumnMetadata(vec.ColumnType(), vec.Type())}) - projected = append(projected, vec.ToArray()) + arr := vec.ToArray() + defer arr.Release() + projected = append(projected, arr) } schema := arrow.NewSchema(fields, nil) diff --git a/pkg/engine/internal/executor/range_aggregation.go b/pkg/engine/internal/executor/range_aggregation.go index 3b452c2e4829b..2025a6595f98c 100644 --- a/pkg/engine/internal/executor/range_aggregation.go +++ b/pkg/engine/internal/executor/range_aggregation.go @@ -168,13 +168,11 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro return nil, fmt.Errorf("unsupported datatype for partitioning %s", vec.Type()) } - arrays = append(arrays, vec.ToArray().(*array.String)) + arr := vec.ToArray().(*array.String) + defer arr.Release() + + arrays = append(arrays, arr) } - defer func() { - for _, a := range arrays { - a.Release() - } - }() // extract timestamp column to check if the entry is in range tsVec, err := r.evaluator.eval(tsColumnExpr, record) diff --git a/pkg/engine/internal/executor/vector_aggregate.go b/pkg/engine/internal/executor/vector_aggregate.go index 904b7ca6a30e6..4c4b88d4b465b 100644 --- a/pkg/engine/internal/executor/vector_aggregate.go +++ b/pkg/engine/internal/executor/vector_aggregate.go @@ -115,11 +115,6 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err // extract all the columns that are used for grouping arrays := make([]*array.String, 0, len(v.groupBy)) - defer func() { - for _, array := range arrays { - array.Release() - } - }() for _, columnExpr := range v.groupBy { vec, err := v.evaluator.eval(columnExpr, record) @@ -131,7 +126,10 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err return nil, fmt.Errorf("unsupported datatype for grouping %s", vec.Type()) } - arrays = append(arrays, vec.ToArray().(*array.String)) + arr := vec.ToArray().(*array.String) + defer arr.Release() + + arrays = append(arrays, arr) } for row := range int(record.NumRows()) { From 9b74208aaac50e66cd684d217108a1b117c9a5e8 Mon Sep 17 00:00:00 2001 From: Stas Spiridonov Date: Thu, 9 Oct 2025 10:31:42 -0400 Subject: [PATCH 5/5] lint --- pkg/engine/internal/executor/range_aggregation.go | 2 +- pkg/engine/internal/executor/vector_aggregate.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/engine/internal/executor/range_aggregation.go b/pkg/engine/internal/executor/range_aggregation.go index 2025a6595f98c..8b099e3174243 100644 --- a/pkg/engine/internal/executor/range_aggregation.go +++ b/pkg/engine/internal/executor/range_aggregation.go @@ -170,7 +170,7 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro arr := vec.ToArray().(*array.String) defer arr.Release() - + arrays = append(arrays, arr) } diff --git a/pkg/engine/internal/executor/vector_aggregate.go b/pkg/engine/internal/executor/vector_aggregate.go index 4c4b88d4b465b..3654556977494 100644 --- a/pkg/engine/internal/executor/vector_aggregate.go +++ b/pkg/engine/internal/executor/vector_aggregate.go @@ -128,7 +128,7 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err arr := vec.ToArray().(*array.String) defer arr.Release() - + arrays = append(arrays, arr) }