Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/engine/internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,10 @@ func (c *Context) executeFilter(ctx context.Context, filter *physical.Filter, in
return errorPipeline(ctx, fmt.Errorf("filter expects exactly one input, got %d", len(inputs)))
}

return NewFilterPipeline(filter, inputs[0], c.evaluator)
// Use memory allocator from context or default
allocator := memory.DefaultAllocator

return NewFilterPipeline(filter, inputs[0], c.evaluator, allocator)
}

func (c *Context) executeMerge(ctx context.Context, _ *physical.Merge, inputs []Pipeline) Pipeline {
Expand Down
26 changes: 24 additions & 2 deletions pkg/engine/internal/executor/expressions.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
continue
}

col := input.Column(idx)
col.Retain()
return &Array{
array: input.Column(idx),
array: col,
dt: types.MustFromString(dt),
ct: types.ColumnTypeFromString(ct),
rows: input.NumRows(),
Expand All @@ -69,8 +71,10 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
return nil, fmt.Errorf("column %s has datatype %s, but expression expects string", expr.Ref.Column, dt)
}

col := input.Column(idx)
col.Retain()
vecs = append(vecs, &Array{
array: input.Column(idx),
array: col,
dt: types.MustFromString(dt),
ct: types.ColumnTypeFromString(ct),
rows: input.NumRows(),
Expand Down Expand Up @@ -107,6 +111,7 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
if err != nil {
return nil, err
}
defer lhr.Release()

fn, err := unaryFunctions.GetForSignature(expr.Op, lhr.Type().ArrowType())
if err != nil {
Expand All @@ -119,10 +124,12 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
if err != nil {
return nil, err
}
defer lhs.Release()
rhs, err := e.eval(expr.Right, input)
if err != nil {
return nil, err
}
defer rhs.Release()

// At the moment we only support functions that accept the same input types.
// TODO(chaudum): Compare Loki type, not Arrow type
Expand Down Expand Up @@ -162,6 +169,8 @@ type ColumnVector interface {
ColumnType() types.ColumnType
// Len returns the length of the vector
Len() int64
// Release decreases the reference count by 1 on underlying Arrow array
Release()
}

// Scalar represents a single value repeated any number of times.
Expand Down Expand Up @@ -223,6 +232,10 @@ func (v *Scalar) ColumnType() types.ColumnType {
return v.ct
}

// Release implements ColumnVector.
func (v *Scalar) Release() {
}

// Len implements ColumnVector.
func (v *Scalar) Len() int64 {
return v.rows
Expand Down Expand Up @@ -280,6 +293,11 @@ func (a *Array) Len() int64 {
return int64(a.array.Len())
}

// Release implements ColumnVector.
func (a *Array) Release() {
a.array.Release()
}

// CoalesceVector represents multiple columns with the same name but different [types.ColumnType]
// Vectors are ordered by precedence (highest precedence first).
type CoalesceVector struct {
Expand Down Expand Up @@ -340,3 +358,7 @@ func (m *CoalesceVector) ColumnType() types.ColumnType {
func (m *CoalesceVector) Len() int64 {
return m.rows
}

// Release implements ColumnVector.
func (m *CoalesceVector) Release() {
}
52 changes: 18 additions & 34 deletions pkg/engine/internal/executor/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,37 @@ import (
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
)

func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expressionEvaluator) *GenericPipeline {
func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expressionEvaluator, allocator memory.Allocator) *GenericPipeline {
return newGenericPipeline(Local, func(ctx context.Context, inputs []Pipeline) state {
// Pull the next item from the input pipeline
input := inputs[0]
batch, err := input.Read(ctx)
if err != nil {
return failureState(err)
}
defer batch.Release()

cols := make([]*array.Boolean, 0, len(filter.Predicates))
defer func() {
for _, col := range cols {
// boolean filters are only used for filtering; they're not returned
// and must be released
// TODO: verify this once the evaluator implementation is fleshed out
col.Release()
}
}()

for i, pred := range filter.Predicates {
res, err := evaluator.eval(pred, batch)
if err != nil {
return failureState(err)
}
data := res.ToArray()

// boolean filters are only used for filtering; they're not returned
// and must be released
defer data.Release()

if data.DataType().ID() != arrow.BOOL {
return failureState(fmt.Errorf("predicate %d returned non-boolean type %s", i, data.DataType()))
}
casted := data.(*array.Boolean)
cols = append(cols, casted)
}

filtered := filterBatch(batch, func(i int) bool {
filtered := filterBatch(batch, allocator, func(i int) bool {
for _, p := range cols {
if !p.IsValid(i) || !p.Value(i) {
return false
Expand All @@ -53,7 +51,6 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres
})

return successState(filtered)

}, input)
}

Expand All @@ -66,75 +63,61 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres
// pushdown optimizations.
//
// We should re-think this approach.
func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record {
mem := memory.NewGoAllocator()
func filterBatch(batch arrow.Record, allocator memory.Allocator, include func(int) bool) arrow.Record {
fields := batch.Schema().Fields()

builders := make([]array.Builder, len(fields))
defer func() {
for _, b := range builders {
if b != nil {
b.Release()
}
}
}()

additions := make([]func(int), len(fields))

for i, field := range fields {

switch field.Type.ID() {
case arrow.BOOL:
builder := array.NewBooleanBuilder(mem)
builder := array.NewBooleanBuilder(allocator)
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.Boolean)
builder.Append(src.Value(offset))
}

case arrow.STRING:
builder := array.NewStringBuilder(mem)
builder := array.NewStringBuilder(allocator)
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.String)
builder.Append(src.Value(offset))
}

case arrow.UINT64:
builder := array.NewUint64Builder(mem)
builder := array.NewUint64Builder(allocator)
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.Uint64)
builder.Append(src.Value(offset))
}

case arrow.INT64:
builder := array.NewInt64Builder(mem)
builder := array.NewInt64Builder(allocator)
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.Int64)
builder.Append(src.Value(offset))
}

case arrow.FLOAT64:
builder := array.NewFloat64Builder(mem)
builder := array.NewFloat64Builder(allocator)
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.Float64)
builder.Append(src.Value(offset))
}

case arrow.TIMESTAMP:
builder := array.NewTimestampBuilder(mem, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
builder := array.NewTimestampBuilder(allocator, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.Timestamp)
builder.Append(src.Value(offset))
}

default:
panic(fmt.Sprintf("unimplemented type in filterBatch: %s", field.Type.Name()))
}

defer builders[i].Release()
}

var ct int64
Expand All @@ -158,6 +141,7 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record {
arrays := make([]arrow.Array, len(fields))
for i, builder := range builders {
arrays[i] = builder.NewArray()
defer arrays[i].Release()
}

return array.NewRecord(schema, arrays, ct)
Expand Down
42 changes: 28 additions & 14 deletions pkg/engine/internal/executor/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ func TestNewFilterPipeline(t *testing.T) {
}

t.Run("filter with true literal predicate", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create input data using arrowtest.Rows
Expand All @@ -41,7 +43,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

// Read the pipeline output
Expand All @@ -56,7 +58,9 @@ func TestNewFilterPipeline(t *testing.T) {
})

t.Run("filter with false literal predicate", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create input data using arrowtest.Rows
Expand All @@ -80,7 +84,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

// Read the pipeline output
Expand All @@ -92,7 +96,9 @@ func TestNewFilterPipeline(t *testing.T) {
})

t.Run("filter on boolean column with column expression", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create input data using arrowtest.Rows
Expand All @@ -117,7 +123,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

// Create expected output (only rows where valid=true)
Expand All @@ -138,7 +144,9 @@ func TestNewFilterPipeline(t *testing.T) {
})

t.Run("filter on multiple columns with binary expressions", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create input data using arrowtest.Rows
Expand Down Expand Up @@ -170,7 +178,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

// Create expected output (only rows where name=="Bob" AND valid!=false)
Expand All @@ -191,7 +199,9 @@ func TestNewFilterPipeline(t *testing.T) {

// TODO: instead of returning empty batch, filter should read the next non-empty batch.
t.Run("filter on empty batch", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create empty input data using arrowtest.Rows
Expand All @@ -210,7 +220,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

record, err := pipeline.Read(t.Context())
Expand All @@ -223,7 +233,9 @@ func TestNewFilterPipeline(t *testing.T) {
})

t.Run("filter with multiple input batches", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create input data split across multiple batches using arrowtest.Rows
Expand Down Expand Up @@ -251,7 +263,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

// Create expected output (only rows where valid=true)
Expand Down Expand Up @@ -284,7 +296,9 @@ func TestNewFilterPipeline(t *testing.T) {
})

t.Run("filter with null values", func(t *testing.T) {
alloc := memory.DefaultAllocator
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)

schema := arrow.NewSchema(fields, nil)

// Create input data with null values
Expand All @@ -309,7 +323,7 @@ func TestNewFilterPipeline(t *testing.T) {
}

// Create filter pipeline
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
defer pipeline.Close()

// Create expected output (only rows where valid=true, including null name)
Expand Down
Loading