Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/engine/internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,10 @@ func (c *Context) executeFilter(ctx context.Context, filter *physical.Filter, in
return errorPipeline(ctx, fmt.Errorf("filter expects exactly one input, got %d", len(inputs)))
}

return NewFilterPipeline(filter, inputs[0], c.evaluator)
// Use memory allocator from context or default
allocator := memory.DefaultAllocator

return NewFilterPipeline(filter, inputs[0], c.evaluator, allocator)
}

func (c *Context) executeMerge(ctx context.Context, _ *physical.Merge, inputs []Pipeline) Pipeline {
Expand Down
26 changes: 24 additions & 2 deletions pkg/engine/internal/executor/expressions.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
continue
}

col := input.Column(idx)
col.Retain()
return &Array{
array: input.Column(idx),
array: col,
dt: types.MustFromString(dt),
ct: types.ColumnTypeFromString(ct),
rows: input.NumRows(),
Expand All @@ -69,8 +71,10 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
return nil, fmt.Errorf("column %s has datatype %s, but expression expects string", expr.Ref.Column, dt)
}

col := input.Column(idx)
col.Retain()
vecs = append(vecs, &Array{
array: input.Column(idx),
array: col,
dt: types.MustFromString(dt),
ct: types.ColumnTypeFromString(ct),
rows: input.NumRows(),
Expand Down Expand Up @@ -107,6 +111,7 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
if err != nil {
return nil, err
}
defer lhr.Release()

fn, err := unaryFunctions.GetForSignature(expr.Op, lhr.Type().ArrowType())
if err != nil {
Expand All @@ -119,10 +124,12 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
if err != nil {
return nil, err
}
defer lhs.Release()
rhs, err := e.eval(expr.Right, input)
if err != nil {
return nil, err
}
defer rhs.Release()

// At the moment we only support functions that accept the same input types.
// TODO(chaudum): Compare Loki type, not Arrow type
Expand Down Expand Up @@ -162,6 +169,8 @@ type ColumnVector interface {
ColumnType() types.ColumnType
// Len returns the length of the vector
Len() int64
// Release decreases the reference count by 1 on underlying Arrow array
Release()
}

// Scalar represents a single value repeated any number of times.
Expand Down Expand Up @@ -223,6 +232,10 @@ func (v *Scalar) ColumnType() types.ColumnType {
return v.ct
}

// Release implements ColumnVector.
func (v *Scalar) Release() {
}

// Len implements ColumnVector.
func (v *Scalar) Len() int64 {
return v.rows
Expand Down Expand Up @@ -280,6 +293,11 @@ func (a *Array) Len() int64 {
return int64(a.array.Len())
}

// Release implements ColumnVector.
func (a *Array) Release() {
a.array.Release()
}

// CoalesceVector represents multiple columns with the same name but different [types.ColumnType]
// Vectors are ordered by precedence (highest precedence first).
type CoalesceVector struct {
Expand Down Expand Up @@ -340,3 +358,7 @@ func (m *CoalesceVector) ColumnType() types.ColumnType {
func (m *CoalesceVector) Len() int64 {
return m.rows
}

// Release implements ColumnVector.
func (m *CoalesceVector) Release() {
}
27 changes: 16 additions & 11 deletions pkg/engine/internal/executor/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
)

func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expressionEvaluator) *GenericPipeline {
func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expressionEvaluator, allocator memory.Allocator) *GenericPipeline {
return newGenericPipeline(Local, func(ctx context.Context, inputs []Pipeline) state {
// Pull the next item from the input pipeline
input := inputs[0]
batch, err := input.Read(ctx)
if err != nil {
return failureState(err)
}
defer batch.Release()

cols := make([]*array.Boolean, 0, len(filter.Predicates))
defer func() {
Expand All @@ -43,7 +44,7 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres
cols = append(cols, casted)
}

filtered := filterBatch(batch, func(i int) bool {
filtered := filterBatch(batch, allocator, func(i int) bool {
for _, p := range cols {
if !p.IsValid(i) || !p.Value(i) {
return false
Expand All @@ -53,7 +54,6 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres
})

return successState(filtered)

}, input)
}

Expand All @@ -66,8 +66,7 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres
// pushdown optimizations.
//
// We should re-think this approach.
func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record {
mem := memory.NewGoAllocator()
func filterBatch(batch arrow.Record, allocator memory.Allocator, include func(int) bool) arrow.Record {
fields := batch.Schema().Fields()

builders := make([]array.Builder, len(fields))
Expand All @@ -85,47 +84,47 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record {

switch field.Type.ID() {
case arrow.BOOL:
builder := array.NewBooleanBuilder(mem)
builder := array.NewBooleanBuilder(allocator)
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.Boolean)
builder.Append(src.Value(offset))
}

case arrow.STRING:
builder := array.NewStringBuilder(mem)
builder := array.NewStringBuilder(allocator)
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.String)
builder.Append(src.Value(offset))
}

case arrow.UINT64:
builder := array.NewUint64Builder(mem)
builder := array.NewUint64Builder(allocator)
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.Uint64)
builder.Append(src.Value(offset))
}

case arrow.INT64:
builder := array.NewInt64Builder(mem)
builder := array.NewInt64Builder(allocator)
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.Int64)
builder.Append(src.Value(offset))
}

case arrow.FLOAT64:
builder := array.NewFloat64Builder(mem)
builder := array.NewFloat64Builder(allocator)
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.Float64)
builder.Append(src.Value(offset))
}

case arrow.TIMESTAMP:
builder := array.NewTimestampBuilder(mem, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
builder := array.NewTimestampBuilder(allocator, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.Timestamp)
Expand Down Expand Up @@ -160,5 +159,11 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record {
arrays[i] = builder.NewArray()
}

defer func() {
for _, a := range arrays {
a.Release()
}
}()

return array.NewRecord(schema, arrays, ct)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the defer Release approach when it's in an open/close situation, so for example immediately after creating an array. So, how would you feel about either a) moving this up into the builders loop above, so

for i, builder := range builders {
		arrays[i] = builder.NewArray()
        defer arrays[i].Release()
	}

or b) storing the new record in a variable and then releasing synchronously (ie. not in a defer)?

Copy link
Contributor Author

@spiridonov spiridonov Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was an existing pattern that I followed. I am not a fan of doing that without defer because there is risk of missing important cleanup in case of early returns or failures. Using defer is as simple and reliable as finally in Java.

}
42 changes: 28 additions & 14 deletions pkg/engine/internal/executor/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ func TestNewFilterPipeline(t *testing.T) {
}

t.Run("filter with true literal predicate", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.NewGoAllocator())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason we can't use memory.DefaultAllocator instead of instantiating a new GoAllocator()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be inconsistent between test files and GoAllocator was the one I randomly copy-pasted from. I fixed all test files to use DefaultAllocator.

defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create input data using arrowtest.Rows
Expand All @@ -41,7 +43,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

// Read the pipeline output
Expand All @@ -56,7 +58,9 @@ func TestNewFilterPipeline(t *testing.T) {
})

t.Run("filter with false literal predicate", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create input data using arrowtest.Rows
Expand All @@ -80,7 +84,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

// Read the pipeline output
Expand All @@ -92,7 +96,9 @@ func TestNewFilterPipeline(t *testing.T) {
})

t.Run("filter on boolean column with column expression", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create input data using arrowtest.Rows
Expand All @@ -117,7 +123,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

// Create expected output (only rows where valid=true)
Expand All @@ -138,7 +144,9 @@ func TestNewFilterPipeline(t *testing.T) {
})

t.Run("filter on multiple columns with binary expressions", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create input data using arrowtest.Rows
Expand Down Expand Up @@ -170,7 +178,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

// Create expected output (only rows where name=="Bob" AND valid!=false)
Expand All @@ -191,7 +199,9 @@ func TestNewFilterPipeline(t *testing.T) {

// TODO: instead of returning empty batch, filter should read the next non-empty batch.
t.Run("filter on empty batch", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create empty input data using arrowtest.Rows
Expand All @@ -210,7 +220,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

record, err := pipeline.Read(t.Context())
Expand All @@ -223,7 +233,9 @@ func TestNewFilterPipeline(t *testing.T) {
})

t.Run("filter with multiple input batches", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create input data split across multiple batches using arrowtest.Rows
Expand Down Expand Up @@ -251,7 +263,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

// Create expected output (only rows where valid=true)
Expand Down Expand Up @@ -284,7 +296,9 @@ func TestNewFilterPipeline(t *testing.T) {
})

t.Run("filter with null values", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create input data with null values
Expand All @@ -309,7 +323,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

// Create expected output (only rows where valid=true, including null name)
Expand Down
7 changes: 6 additions & 1 deletion pkg/engine/internal/executor/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, eva
if err != nil {
return failureState(err)
}
defer batch.Release()

projected := make([]arrow.Array, 0, len(columns))
defer func() {
for _, proj := range projected {
proj.Release()
}
}()
fields := make([]arrow.Field, 0, len(columns))

for i := range columns {
Expand All @@ -47,7 +53,6 @@ func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, eva
// Create a new record with only the projected columns
// retain the projected columns in a new batch then release the original record.
projectedRecord := array.NewRecord(schema, projected, batch.NumRows())
batch.Release()
return successState(projectedRecord)
}, input), nil
}
Loading