Skip to content

Commit 16dab82

Browse files
authored
chore: consistent expressionEvaluator.eval result memory ownership (#19438)
1 parent 632eb36 commit 16dab82

File tree

11 files changed

+116
-71
lines changed

11 files changed

+116
-71
lines changed

pkg/engine/internal/executor/executor.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,10 @@ func (c *Context) executeFilter(ctx context.Context, filter *physical.Filter, in
312312
return errorPipeline(ctx, fmt.Errorf("filter expects exactly one input, got %d", len(inputs)))
313313
}
314314

315-
return NewFilterPipeline(filter, inputs[0], c.evaluator)
315+
// Use memory allocator from context or default
316+
allocator := memory.DefaultAllocator
317+
318+
return NewFilterPipeline(filter, inputs[0], c.evaluator, allocator)
316319
}
317320

318321
func (c *Context) executeMerge(ctx context.Context, _ *physical.Merge, inputs []Pipeline) Pipeline {

pkg/engine/internal/executor/expressions.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
4141
continue
4242
}
4343

44+
col := input.Column(idx)
45+
col.Retain()
4446
return &Array{
45-
array: input.Column(idx),
47+
array: col,
4648
dt: types.MustFromString(dt),
4749
ct: types.ColumnTypeFromString(ct),
4850
rows: input.NumRows(),
@@ -69,8 +71,10 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
6971
return nil, fmt.Errorf("column %s has datatype %s, but expression expects string", expr.Ref.Column, dt)
7072
}
7173

74+
col := input.Column(idx)
75+
col.Retain()
7276
vecs = append(vecs, &Array{
73-
array: input.Column(idx),
77+
array: col,
7478
dt: types.MustFromString(dt),
7579
ct: types.ColumnTypeFromString(ct),
7680
rows: input.NumRows(),
@@ -107,6 +111,7 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
107111
if err != nil {
108112
return nil, err
109113
}
114+
defer lhr.Release()
110115

111116
fn, err := unaryFunctions.GetForSignature(expr.Op, lhr.Type().ArrowType())
112117
if err != nil {
@@ -119,10 +124,12 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
119124
if err != nil {
120125
return nil, err
121126
}
127+
defer lhs.Release()
122128
rhs, err := e.eval(expr.Right, input)
123129
if err != nil {
124130
return nil, err
125131
}
132+
defer rhs.Release()
126133

127134
// At the moment we only support functions that accept the same input types.
128135
// TODO(chaudum): Compare Loki type, not Arrow type
@@ -162,6 +169,8 @@ type ColumnVector interface {
162169
ColumnType() types.ColumnType
163170
// Len returns the length of the vector
164171
Len() int64
172+
// Release decreases the reference count by 1 on underlying Arrow array
173+
Release()
165174
}
166175

167176
// Scalar represents a single value repeated any number of times.
@@ -223,6 +232,10 @@ func (v *Scalar) ColumnType() types.ColumnType {
223232
return v.ct
224233
}
225234

235+
// Release implements ColumnVector.
236+
func (v *Scalar) Release() {
237+
}
238+
226239
// Len implements ColumnVector.
227240
func (v *Scalar) Len() int64 {
228241
return v.rows
@@ -280,6 +293,11 @@ func (a *Array) Len() int64 {
280293
return int64(a.array.Len())
281294
}
282295

296+
// Release implements ColumnVector.
297+
func (a *Array) Release() {
298+
a.array.Release()
299+
}
300+
283301
// CoalesceVector represents multiple columns with the same name but different [types.ColumnType]
284302
// Vectors are ordered by precedence (highest precedence first).
285303
type CoalesceVector struct {
@@ -340,3 +358,7 @@ func (m *CoalesceVector) ColumnType() types.ColumnType {
340358
func (m *CoalesceVector) Len() int64 {
341359
return m.rows
342360
}
361+
362+
// Release implements ColumnVector.
363+
func (m *CoalesceVector) Release() {
364+
}

pkg/engine/internal/executor/filter.go

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,39 +11,37 @@ import (
1111
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
1212
)
1313

14-
func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expressionEvaluator) *GenericPipeline {
14+
func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expressionEvaluator, allocator memory.Allocator) *GenericPipeline {
1515
return newGenericPipeline(Local, func(ctx context.Context, inputs []Pipeline) state {
1616
// Pull the next item from the input pipeline
1717
input := inputs[0]
1818
batch, err := input.Read(ctx)
1919
if err != nil {
2020
return failureState(err)
2121
}
22+
defer batch.Release()
2223

2324
cols := make([]*array.Boolean, 0, len(filter.Predicates))
24-
defer func() {
25-
for _, col := range cols {
26-
// boolean filters are only used for filtering; they're not returned
27-
// and must be released
28-
// TODO: verify this once the evaluator implementation is fleshed out
29-
col.Release()
30-
}
31-
}()
3225

3326
for i, pred := range filter.Predicates {
3427
res, err := evaluator.eval(pred, batch)
3528
if err != nil {
3629
return failureState(err)
3730
}
3831
data := res.ToArray()
32+
33+
// boolean filters are only used for filtering; they're not returned
34+
// and must be released
35+
defer data.Release()
36+
3937
if data.DataType().ID() != arrow.BOOL {
4038
return failureState(fmt.Errorf("predicate %d returned non-boolean type %s", i, data.DataType()))
4139
}
4240
casted := data.(*array.Boolean)
4341
cols = append(cols, casted)
4442
}
4543

46-
filtered := filterBatch(batch, func(i int) bool {
44+
filtered := filterBatch(batch, allocator, func(i int) bool {
4745
for _, p := range cols {
4846
if !p.IsValid(i) || !p.Value(i) {
4947
return false
@@ -53,7 +51,6 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres
5351
})
5452

5553
return successState(filtered)
56-
5754
}, input)
5855
}
5956

@@ -66,75 +63,61 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres
6663
// pushdown optimizations.
6764
//
6865
// We should re-think this approach.
69-
func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record {
70-
mem := memory.NewGoAllocator()
66+
func filterBatch(batch arrow.Record, allocator memory.Allocator, include func(int) bool) arrow.Record {
7167
fields := batch.Schema().Fields()
7268

7369
builders := make([]array.Builder, len(fields))
74-
defer func() {
75-
for _, b := range builders {
76-
if b != nil {
77-
b.Release()
78-
}
79-
}
80-
}()
81-
8270
additions := make([]func(int), len(fields))
8371

8472
for i, field := range fields {
85-
8673
switch field.Type.ID() {
8774
case arrow.BOOL:
88-
builder := array.NewBooleanBuilder(mem)
75+
builder := array.NewBooleanBuilder(allocator)
8976
builders[i] = builder
9077
additions[i] = func(offset int) {
9178
src := batch.Column(i).(*array.Boolean)
9279
builder.Append(src.Value(offset))
9380
}
94-
9581
case arrow.STRING:
96-
builder := array.NewStringBuilder(mem)
82+
builder := array.NewStringBuilder(allocator)
9783
builders[i] = builder
9884
additions[i] = func(offset int) {
9985
src := batch.Column(i).(*array.String)
10086
builder.Append(src.Value(offset))
10187
}
102-
10388
case arrow.UINT64:
104-
builder := array.NewUint64Builder(mem)
89+
builder := array.NewUint64Builder(allocator)
10590
builders[i] = builder
10691
additions[i] = func(offset int) {
10792
src := batch.Column(i).(*array.Uint64)
10893
builder.Append(src.Value(offset))
10994
}
110-
11195
case arrow.INT64:
112-
builder := array.NewInt64Builder(mem)
96+
builder := array.NewInt64Builder(allocator)
11397
builders[i] = builder
11498
additions[i] = func(offset int) {
11599
src := batch.Column(i).(*array.Int64)
116100
builder.Append(src.Value(offset))
117101
}
118-
119102
case arrow.FLOAT64:
120-
builder := array.NewFloat64Builder(mem)
103+
builder := array.NewFloat64Builder(allocator)
121104
builders[i] = builder
122105
additions[i] = func(offset int) {
123106
src := batch.Column(i).(*array.Float64)
124107
builder.Append(src.Value(offset))
125108
}
126-
127109
case arrow.TIMESTAMP:
128-
builder := array.NewTimestampBuilder(mem, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
110+
builder := array.NewTimestampBuilder(allocator, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
129111
builders[i] = builder
130112
additions[i] = func(offset int) {
131113
src := batch.Column(i).(*array.Timestamp)
132114
builder.Append(src.Value(offset))
133115
}
134-
135116
default:
136117
panic(fmt.Sprintf("unimplemented type in filterBatch: %s", field.Type.Name()))
137118
}
119+
120+
defer builders[i].Release()
138121
}
139122

140123
var ct int64
@@ -158,6 +141,7 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record {
158141
arrays := make([]arrow.Array, len(fields))
159142
for i, builder := range builders {
160143
arrays[i] = builder.NewArray()
144+
defer arrays[i].Release()
161145
}
162146

163147
return array.NewRecord(schema, arrays, ct)

pkg/engine/internal/executor/filter_test.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ func TestNewFilterPipeline(t *testing.T) {
1919
}
2020

2121
t.Run("filter with true literal predicate", func(t *testing.T) {
22-
alloc := memory.DefaultAllocator
22+
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
23+
defer alloc.AssertSize(t, 0)
24+
2325
schema := arrow.NewSchema(fields, nil)
2426

2527
// Create input data using arrowtest.Rows
@@ -41,7 +43,7 @@ func TestNewFilterPipeline(t *testing.T) {
4143
}
4244

4345
// Create filter pipeline
44-
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
46+
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
4547
defer pipeline.Close()
4648

4749
// Read the pipeline output
@@ -56,7 +58,9 @@ func TestNewFilterPipeline(t *testing.T) {
5658
})
5759

5860
t.Run("filter with false literal predicate", func(t *testing.T) {
59-
alloc := memory.DefaultAllocator
61+
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
62+
defer alloc.AssertSize(t, 0)
63+
6064
schema := arrow.NewSchema(fields, nil)
6165

6266
// Create input data using arrowtest.Rows
@@ -80,7 +84,7 @@ func TestNewFilterPipeline(t *testing.T) {
8084
}
8185

8286
// Create filter pipeline
83-
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
87+
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
8488
defer pipeline.Close()
8589

8690
// Read the pipeline output
@@ -92,7 +96,9 @@ func TestNewFilterPipeline(t *testing.T) {
9296
})
9397

9498
t.Run("filter on boolean column with column expression", func(t *testing.T) {
95-
alloc := memory.DefaultAllocator
99+
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
100+
defer alloc.AssertSize(t, 0)
101+
96102
schema := arrow.NewSchema(fields, nil)
97103

98104
// Create input data using arrowtest.Rows
@@ -117,7 +123,7 @@ func TestNewFilterPipeline(t *testing.T) {
117123
}
118124

119125
// Create filter pipeline
120-
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
126+
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
121127
defer pipeline.Close()
122128

123129
// Create expected output (only rows where valid=true)
@@ -138,7 +144,9 @@ func TestNewFilterPipeline(t *testing.T) {
138144
})
139145

140146
t.Run("filter on multiple columns with binary expressions", func(t *testing.T) {
141-
alloc := memory.DefaultAllocator
147+
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
148+
defer alloc.AssertSize(t, 0)
149+
142150
schema := arrow.NewSchema(fields, nil)
143151

144152
// Create input data using arrowtest.Rows
@@ -170,7 +178,7 @@ func TestNewFilterPipeline(t *testing.T) {
170178
}
171179

172180
// Create filter pipeline
173-
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
181+
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
174182
defer pipeline.Close()
175183

176184
// Create expected output (only rows where name=="Bob" AND valid!=false)
@@ -191,7 +199,9 @@ func TestNewFilterPipeline(t *testing.T) {
191199

192200
// TODO: instead of returning empty batch, filter should read the next non-empty batch.
193201
t.Run("filter on empty batch", func(t *testing.T) {
194-
alloc := memory.DefaultAllocator
202+
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
203+
defer alloc.AssertSize(t, 0)
204+
195205
schema := arrow.NewSchema(fields, nil)
196206

197207
// Create empty input data using arrowtest.Rows
@@ -210,7 +220,7 @@ func TestNewFilterPipeline(t *testing.T) {
210220
}
211221

212222
// Create filter pipeline
213-
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
223+
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
214224
defer pipeline.Close()
215225

216226
record, err := pipeline.Read(t.Context())
@@ -223,7 +233,9 @@ func TestNewFilterPipeline(t *testing.T) {
223233
})
224234

225235
t.Run("filter with multiple input batches", func(t *testing.T) {
226-
alloc := memory.DefaultAllocator
236+
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
237+
defer alloc.AssertSize(t, 0)
238+
227239
schema := arrow.NewSchema(fields, nil)
228240

229241
// Create input data split across multiple batches using arrowtest.Rows
@@ -251,7 +263,7 @@ func TestNewFilterPipeline(t *testing.T) {
251263
}
252264

253265
// Create filter pipeline
254-
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
266+
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
255267
defer pipeline.Close()
256268

257269
// Create expected output (only rows where valid=true)
@@ -284,7 +296,9 @@ func TestNewFilterPipeline(t *testing.T) {
284296
})
285297

286298
t.Run("filter with null values", func(t *testing.T) {
287-
alloc := memory.DefaultAllocator
299+
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
300+
defer alloc.AssertSize(t, 0)
301+
288302
schema := arrow.NewSchema(fields, nil)
289303

290304
// Create input data with null values
@@ -309,7 +323,7 @@ func TestNewFilterPipeline(t *testing.T) {
309323
}
310324

311325
// Create filter pipeline
312-
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{})
326+
pipeline := NewFilterPipeline(filter, input, expressionEvaluator{}, alloc)
313327
defer pipeline.Close()
314328

315329
// Create expected output (only rows where valid=true, including null name)

0 commit comments

Comments
 (0)