Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,11 @@ public Block eval(Page page) {
return mask;
}

@Override
public long baseRamBytesUsed() {
return 0;
}

@Override
public void close() {
mask.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ static void selfTest() {
public String operation;

private static Operator operator(String operation) {
return new EvalOperator(driverContext.blockFactory(), evaluator(operation));
return new EvalOperator(driverContext, evaluator(operation));
}

private static EvalOperator.ExpressionEvaluator evaluator(String operation) {
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/133392.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 133392
summary: Track memory in evaluators
area: ES|QL
type: bug
issues: []

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
*/
package org.elasticsearch.xpack.esql.core.expression;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -35,7 +37,9 @@
/**
* Literal or constant.
*/
public class Literal extends LeafExpression {
public class Literal extends LeafExpression implements Accountable {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Literal.class);

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"Literal",
Expand Down Expand Up @@ -169,6 +173,17 @@ public String nodeString() {
return toString() + "[" + dataType + "]";
}

@Override
public long ramBytesUsed() {
long ramBytesUsed = BASE_RAM_BYTES_USED;
if (value instanceof BytesRef b) {
ramBytesUsed += b.length;
} else {
ramBytesUsed += RamUsageEstimator.sizeOfObject(value);
}
return ramBytesUsed;
}

/**
* Utility method for creating a literal out of a foldable expression.
* Throws an exception if the expression is not foldable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.lang.model.type.TypeMirror;
import javax.lang.model.util.Elements;

import static org.elasticsearch.compute.gen.EvaluatorImplementer.baseRamBytesUsed;
import static org.elasticsearch.compute.gen.Methods.buildFromFactory;
import static org.elasticsearch.compute.gen.Methods.getMethod;
import static org.elasticsearch.compute.gen.Types.ABSTRACT_CONVERT_FUNCTION_EVALUATOR;
Expand Down Expand Up @@ -98,6 +99,7 @@ private TypeSpec type() {
builder.addJavadoc("This class is generated. Edit {@code " + getClass().getSimpleName() + "} instead.");
builder.addModifiers(Modifier.PUBLIC, Modifier.FINAL);
builder.superclass(ABSTRACT_CONVERT_FUNCTION_EVALUATOR);
builder.addField(baseRamBytesUsed(implementation));

for (EvaluatorImplementer.ProcessFunctionArg a : processFunction.args) {
a.declareField(builder);
Expand All @@ -113,6 +115,7 @@ private TypeSpec type() {
}
builder.addMethod(processFunction.toStringMethod(implementation));
builder.addMethod(processFunction.close());
builder.addMethod(processFunction.baseRamBytesUsed());
builder.addType(factory());
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import com.squareup.javapoet.ArrayTypeName;
import com.squareup.javapoet.ClassName;
import com.squareup.javapoet.FieldSpec;
import com.squareup.javapoet.JavaFile;
import com.squareup.javapoet.MethodSpec;
import com.squareup.javapoet.ParameterizedTypeName;
Expand Down Expand Up @@ -47,6 +48,7 @@
import static org.elasticsearch.compute.gen.Types.INT_BLOCK;
import static org.elasticsearch.compute.gen.Types.LONG_BLOCK;
import static org.elasticsearch.compute.gen.Types.PAGE;
import static org.elasticsearch.compute.gen.Types.RAM_USAGE_ESIMATOR;
import static org.elasticsearch.compute.gen.Types.RELEASABLE;
import static org.elasticsearch.compute.gen.Types.RELEASABLES;
import static org.elasticsearch.compute.gen.Types.SOURCE;
Expand Down Expand Up @@ -96,6 +98,7 @@ private TypeSpec type() {
builder.addJavadoc("This class is generated. Edit {@code " + getClass().getSimpleName() + "} instead.");
builder.addModifiers(Modifier.PUBLIC, Modifier.FINAL);
builder.addSuperinterface(EXPRESSION_EVALUATOR);
builder.addField(baseRamBytesUsed(implementation));
builder.addType(factory());

builder.addField(SOURCE, "source", Modifier.PRIVATE, Modifier.FINAL);
Expand All @@ -106,6 +109,7 @@ private TypeSpec type() {

builder.addMethod(ctor());
builder.addMethod(eval());
builder.addMethod(processFunction.baseRamBytesUsed());

if (processOutputsMultivalued) {
if (processFunction.args.stream().anyMatch(x -> x instanceof FixedProcessFunctionArg == false)) {
Expand All @@ -123,6 +127,19 @@ private TypeSpec type() {
return builder.build();
}

static FieldSpec baseRamBytesUsed(ClassName implementation) {
FieldSpec.Builder builder = FieldSpec.builder(
TypeName.LONG,
"BASE_RAM_BYTES_USED",
Modifier.PRIVATE,
Modifier.STATIC,
Modifier.FINAL
);
builder.initializer("$T.shallowSizeOfInstance($T.class)", RAM_USAGE_ESIMATOR, implementation);

return builder.build();
}

private MethodSpec ctor() {
MethodSpec.Builder builder = MethodSpec.constructorBuilder().addModifiers(Modifier.PUBLIC);
builder.addParameter(SOURCE, "source");
Expand Down Expand Up @@ -411,6 +428,11 @@ interface ProcessFunctionArg {
* The string to close this argument or {@code null}.
*/
String closeInvocation();

/**
* Invokes {@code baseRamBytesUsed} on sub-expressions an
*/
void sumBaseRamBytesUsed(MethodSpec.Builder builder);
}

record StandardProcessFunctionArg(TypeName type, String name) implements ProcessFunctionArg {
Expand Down Expand Up @@ -535,6 +557,11 @@ public void buildToStringInvocation(StringBuilder pattern, List<Object> args, St
public String closeInvocation() {
return name;
}

@Override
public void sumBaseRamBytesUsed(MethodSpec.Builder builder) {
builder.addStatement("baseRamBytesUsed += $L.baseRamBytesUsed()", name);
}
}

private record ArrayProcessFunctionArg(TypeName componentType, String name) implements ProcessFunctionArg {
Expand Down Expand Up @@ -667,6 +694,13 @@ public void buildToStringInvocation(StringBuilder pattern, List<Object> args, St
public String closeInvocation() {
return "() -> Releasables.close(" + name + ")";
}

@Override
public void sumBaseRamBytesUsed(MethodSpec.Builder builder) {
builder.beginControlFlow("for ($T e : $L)", EXPRESSION_EVALUATOR, name);
builder.addStatement("baseRamBytesUsed += e.baseRamBytesUsed()");
builder.endControlFlow();
}
}

record FixedProcessFunctionArg(TypeName type, String name, boolean includeInToString, Scope scope, boolean releasable)
Expand Down Expand Up @@ -769,6 +803,9 @@ public void buildToStringInvocation(StringBuilder pattern, List<Object> args, St
public String closeInvocation() {
return releasable ? name : null;
}

@Override
public void sumBaseRamBytesUsed(MethodSpec.Builder builder) {}
}

private record BuilderProcessFunctionArg(ClassName type, String name) implements ProcessFunctionArg {
Expand Down Expand Up @@ -853,6 +890,9 @@ public void buildToStringInvocation(StringBuilder pattern, List<Object> args, St
public String closeInvocation() {
return null;
}

@Override
public void sumBaseRamBytesUsed(MethodSpec.Builder builder) {}
}

private record BlockProcessFunctionArg(TypeName type, String name) implements ProcessFunctionArg {
Expand Down Expand Up @@ -940,6 +980,11 @@ public void buildToStringInvocation(StringBuilder pattern, List<Object> args, St
public String closeInvocation() {
return name;
}

@Override
public void sumBaseRamBytesUsed(MethodSpec.Builder builder) {
builder.addStatement("baseRamBytesUsed += $L.baseRamBytesUsed()", name);
}
}

static class ProcessFunction {
Expand Down Expand Up @@ -1085,6 +1130,18 @@ MethodSpec close() {
}
return builder.build();
}

MethodSpec baseRamBytesUsed() {
MethodSpec.Builder builder = MethodSpec.methodBuilder("baseRamBytesUsed").addAnnotation(Override.class);
builder.addModifiers(Modifier.PUBLIC).returns(TypeName.LONG);

builder.addStatement("long baseRamBytesUsed = BASE_RAM_BYTES_USED");
for (ProcessFunctionArg arg : args) {
arg.sumBaseRamBytesUsed(builder);
}
builder.addStatement("return baseRamBytesUsed");
return builder.build();
}
}

static boolean isBlockType(TypeName type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ private TypeSpec type() {
builder.addField(SOURCE, "source", Modifier.PRIVATE, Modifier.FINAL);
builder.addField(WARNINGS, "warnings", Modifier.PRIVATE);
}
builder.addField(EvaluatorImplementer.baseRamBytesUsed(implementation));

builder.addMethod(ctor());
builder.addMethod(name());
Expand All @@ -159,6 +160,7 @@ private TypeSpec type() {
if (warnExceptions.isEmpty() == false) {
builder.addMethod(EvaluatorImplementer.warnings());
}
builder.addMethod(baseRamBytesUsed());
return builder.build();
}

Expand Down Expand Up @@ -581,4 +583,12 @@ private void call(MethodSpec.Builder builder) {
}
}
}

MethodSpec baseRamBytesUsed() {
MethodSpec.Builder builder = MethodSpec.methodBuilder("baseRamBytesUsed").addAnnotation(Override.class);
builder.addModifiers(Modifier.PUBLIC).returns(TypeName.LONG);

builder.addStatement("return BASE_RAM_BYTES_USED + field.baseRamBytesUsed()");
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class Types {

static final ClassName CIRCUIT_BREAKER = ClassName.get("org.elasticsearch.common.breaker", "CircuitBreaker");
static final ClassName BIG_ARRAYS = ClassName.get("org.elasticsearch.common.util", "BigArrays");
static final ClassName RAM_USAGE_ESIMATOR = ClassName.get("org.apache.lucene.util", "RamUsageEstimator");

static final ClassName BOOLEAN_BLOCK = ClassName.get(DATA_PACKAGE, "BooleanBlock");
static final ClassName BYTES_REF_BLOCK = ClassName.get(DATA_PACKAGE, "BytesRefBlock");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanVector;
Expand All @@ -29,6 +30,7 @@
public class LuceneQueryExpressionEvaluator extends LuceneQueryEvaluator<BooleanVector.Builder>
implements
EvalOperator.ExpressionEvaluator {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(LuceneQueryExpressionEvaluator.class);

LuceneQueryExpressionEvaluator(BlockFactory blockFactory, ShardConfig[] shards) {
super(blockFactory, shards);
Expand Down Expand Up @@ -64,6 +66,11 @@ protected void appendMatch(BooleanVector.Builder builder, Scorable scorer) throw
builder.appendBoolean(true);
}

@Override
public long baseRamBytesUsed() {
return BASE_RAM_BYTES_USED;
}

public record Factory(ShardConfig[] shardConfigs) implements EvalOperator.ExpressionEvaluator.Factory {
@Override
public EvalOperator.ExpressionEvaluator get(DriverContext context) {
Expand Down
Loading
Loading