diff --git a/.gitattributes b/.gitattributes index 04881c92ede00..a0f434f16b32b 100644 --- a/.gitattributes +++ b/.gitattributes @@ -11,6 +11,7 @@ x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/*.interp li x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/EsqlBaseLexer*.java linguist-generated=true x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/EsqlBaseParser*.java linguist-generated=true x-pack/plugin/esql/src/main/generated/** linguist-generated=true +x-pack/plugin/esql/src/main/generated-src/** linguist-generated=true # ESQL functions docs are autogenerated. More information at `docs/reference/esql/functions/README.md` docs/reference/esql/functions/*/** linguist-generated=true diff --git a/benchmarks/README.md b/benchmarks/README.md index d7b324acfef81..0cf95a2e81b9a 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -126,9 +126,12 @@ exit Grab the async profiler from https://github.com/jvm-profiling-tools/async-profiler and run `prof async` like so: ``` -gradlew -p benchmarks/ run --args 'LongKeyedBucketOrdsBenchmark.multiBucket -prof "async:libPath=/home/nik9000/Downloads/tmp/async-profiler-1.8.3-linux-x64/build/libasyncProfiler.so;dir=/tmp/prof;output=flamegraph"' +gradlew -p benchmarks/ run --args 'LongKeyedBucketOrdsBenchmark.multiBucket -prof "async:libPath=/home/nik9000/Downloads/async-profiler-3.0-29ee888-linux-x64/lib/libasyncProfiler.so;dir=/tmp/prof;output=flamegraph"' ``` +Note: As of January 2025 the latest release of async profiler doesn't work + with our JDK but the nightly is fine. + If you are on Mac, this'll warn you that you downloaded the shared library from the internet. You'll need to go to settings and allow it to run. diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/EvalBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/EvalBenchmark.java index d3259b9604717..19d72a1f84f25 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/EvalBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/EvalBenchmark.java @@ -38,6 +38,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateTrunc; import org.elasticsearch.xpack.esql.expression.function.scalar.math.Abs; import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMin; +import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce; import org.elasticsearch.xpack.esql.expression.function.scalar.string.RLike; import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Add; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals; @@ -96,6 +97,9 @@ public class EvalBenchmark { "add_double", "case_1_eager", "case_1_lazy", + "coalesce_2_noop", + "coalesce_2_eager", + "coalesce_2_lazy", "date_trunc", "equal_to_const", "long_equal_to_long", @@ -142,8 +146,34 @@ private static EvalOperator.ExpressionEvaluator evaluator(String operation) { lhs = new Add(Source.EMPTY, lhs, new Literal(Source.EMPTY, 1L, DataType.LONG)); rhs = new Add(Source.EMPTY, rhs, new Literal(Source.EMPTY, 1L, DataType.LONG)); } - yield EvalMapper.toEvaluator(FOLD_CONTEXT, new Case(Source.EMPTY, condition, List.of(lhs, rhs)), layout(f1, f2)) - .get(driverContext); + EvalOperator.ExpressionEvaluator evaluator = EvalMapper.toEvaluator( + FOLD_CONTEXT, + new Case(Source.EMPTY, condition, List.of(lhs, rhs)), + layout(f1, f2) + ).get(driverContext); + String desc = operation.endsWith("lazy") ? "CaseLazyEvaluator" : "CaseEagerEvaluator"; + if (evaluator.toString().contains(desc) == false) { + throw new IllegalArgumentException("Evaluator was [" + evaluator + "] but expected one containing [" + desc + "]"); + } + yield evaluator; + } + case "coalesce_2_noop", "coalesce_2_eager", "coalesce_2_lazy" -> { + FieldAttribute f1 = longField(); + FieldAttribute f2 = longField(); + Expression lhs = f1; + if (operation.endsWith("lazy")) { + lhs = new Add(Source.EMPTY, lhs, new Literal(Source.EMPTY, 1L, DataType.LONG)); + } + EvalOperator.ExpressionEvaluator evaluator = EvalMapper.toEvaluator( + FOLD_CONTEXT, + new Coalesce(Source.EMPTY, lhs, List.of(f2)), + layout(f1, f2) + ).get(driverContext); + String desc = operation.endsWith("lazy") ? "CoalesceLazyEvaluator" : "CoalesceEagerEvaluator"; + if (evaluator.toString().contains(desc) == false) { + throw new IllegalArgumentException("Evaluator was [" + evaluator + "] but expected one containing [" + desc + "]"); + } + yield evaluator; } case "date_trunc" -> { FieldAttribute timestamp = new FieldAttribute( @@ -260,6 +290,38 @@ private static void checkExpected(String operation, Page actual) { } } } + case "coalesce_2_noop" -> { + LongVector f1 = actual.getBlock(0).asVector(); + LongVector result = actual.getBlock(2).asVector(); + for (int i = 0; i < BLOCK_LENGTH; i++) { + long expected = f1.getLong(i); + if (result.getLong(i) != expected) { + throw new AssertionError("[" + operation + "] expected [" + expected + "] but was [" + result.getLong(i) + "]"); + } + } + } + case "coalesce_2_eager" -> { + LongBlock f1 = actual.getBlock(0); + LongVector f2 = actual.getBlock(1).asVector(); + LongVector result = actual.getBlock(2).asVector(); + for (int i = 0; i < BLOCK_LENGTH; i++) { + long expected = i % 5 == 0 ? f2.getLong(i) : f1.getLong(f1.getFirstValueIndex(i)); + if (result.getLong(i) != expected) { + throw new AssertionError("[" + operation + "] expected [" + expected + "] but was [" + result.getLong(i) + "]"); + } + } + } + case "coalesce_2_lazy" -> { + LongBlock f1 = actual.getBlock(0); + LongVector f2 = actual.getBlock(1).asVector(); + LongVector result = actual.getBlock(2).asVector(); + for (int i = 0; i < BLOCK_LENGTH; i++) { + long expected = i % 5 == 0 ? f2.getLong(i) : f1.getLong(f1.getFirstValueIndex(i)) + 1; + if (result.getLong(i) != expected) { + throw new AssertionError("[" + operation + "] expected [" + expected + "] but was [" + result.getLong(i) + "]"); + } + } + } case "date_trunc" -> { LongVector v = actual.getBlock(1).asVector(); long oneDay = TimeValue.timeValueHours(24).millis(); @@ -304,7 +366,7 @@ private static void checkExpected(String operation, Page actual) { } } } - default -> throw new UnsupportedOperationException(); + default -> throw new UnsupportedOperationException(operation); } } @@ -324,7 +386,7 @@ private static Page page(String operation) { } yield new Page(builder.build()); } - case "case_1_eager", "case_1_lazy" -> { + case "case_1_eager", "case_1_lazy", "coalesce_2_noop" -> { var f1 = blockFactory.newLongBlockBuilder(BLOCK_LENGTH); var f2 = blockFactory.newLongBlockBuilder(BLOCK_LENGTH); for (int i = 0; i < BLOCK_LENGTH; i++) { @@ -333,6 +395,19 @@ private static Page page(String operation) { } yield new Page(f1.build(), f2.build()); } + case "coalesce_2_eager", "coalesce_2_lazy" -> { + var f1 = blockFactory.newLongBlockBuilder(BLOCK_LENGTH); + var f2 = blockFactory.newLongBlockBuilder(BLOCK_LENGTH); + for (int i = 0; i < BLOCK_LENGTH; i++) { + if (i % 5 == 0) { + f1.appendNull(); + } else { + f1.appendLong(i); + } + f2.appendLong(-i); + } + yield new Page(f1.build(), f2.build()); + } case "long_equal_to_long" -> { var lhs = blockFactory.newLongBlockBuilder(BLOCK_LENGTH); var rhs = blockFactory.newLongBlockBuilder(BLOCK_LENGTH); diff --git a/x-pack/plugin/esql/build.gradle b/x-pack/plugin/esql/build.gradle index 8d2050fb43044..2498b621b73e4 100644 --- a/x-pack/plugin/esql/build.gradle +++ b/x-pack/plugin/esql/build.gradle @@ -348,4 +348,31 @@ tasks.named('stringTemplates').configure { it.inputFile = inInputFile it.outputFile = "org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/InBytesRefEvaluator.java" } + + File coalesceInputFile = file("src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/X-CoalesceEvaluator.java.st") + template { + it.properties = booleanProperties + it.inputFile = coalesceInputFile + it.outputFile = "org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceBooleanEvaluator.java" + } + template { + it.properties = intProperties + it.inputFile = coalesceInputFile + it.outputFile = "org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceIntEvaluator.java" + } + template { + it.properties = longProperties + it.inputFile = coalesceInputFile + it.outputFile = "org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceLongEvaluator.java" + } + template { + it.properties = doubleProperties + it.inputFile = coalesceInputFile + it.outputFile = "org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceDoubleEvaluator.java" + } + template { + it.properties = bytesRefProperties + it.inputFile = coalesceInputFile + it.outputFile = "org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceBytesRefEvaluator.java" + } } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java index 5d2d6c97a11f1..b08b80acc6976 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java @@ -223,6 +223,14 @@ sealed interface Builder extends Block.Builder, BlockLoader.BooleanBuilder permi */ Builder copyFrom(BooleanBlock block, int beginInclusive, int endExclusive); + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. + */ + Builder copyFrom(BooleanBlock block, int position); + @Override Builder appendNull(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlockBuilder.java index 32627a0e0d36b..7f4705ddecb27 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlockBuilder.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.BitArray; @@ -85,7 +86,11 @@ public BooleanBlockBuilder copyFrom(Block block, int beginInclusive, int endExcl /** * Copy the values in {@code block} from {@code beginInclusive} to * {@code endExclusive} into this builder. + *

+ * For single-position copies see {@link #copyFrom(BooleanBlock, int)}. + *

*/ + @Override public BooleanBlockBuilder copyFrom(BooleanBlock block, int beginInclusive, int endExclusive) { if (endExclusive > block.getPositionCount()) { throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]"); @@ -101,21 +106,7 @@ public BooleanBlockBuilder copyFrom(BooleanBlock block, int beginInclusive, int private void copyFromBlock(BooleanBlock block, int beginInclusive, int endExclusive) { for (int p = beginInclusive; p < endExclusive; p++) { - if (block.isNull(p)) { - appendNull(); - continue; - } - int count = block.getValueCount(p); - if (count > 1) { - beginPositionEntry(); - } - int i = block.getFirstValueIndex(p); - for (int v = 0; v < count; v++) { - appendBoolean(block.getBoolean(i++)); - } - if (count > 1) { - endPositionEntry(); - } + copyFrom(block, p); } } @@ -125,6 +116,37 @@ private void copyFromVector(BooleanVector vector, int beginInclusive, int endExc } } + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. + *

+ * Note that there isn't a version of this method on {@link Block.Builder} that takes + * {@link Block}. That'd be quite slow, running position by position. And it's important + * to know if you are copying {@link BytesRef}s so you can have the scratch. + *

+ */ + @Override + public BooleanBlockBuilder copyFrom(BooleanBlock block, int position) { + if (block.isNull(position)) { + appendNull(); + return this; + } + int count = block.getValueCount(position); + int i = block.getFirstValueIndex(position); + if (count == 1) { + appendBoolean(block.getBoolean(i++)); + return this; + } + beginPositionEntry(); + for (int v = 0; v < count; v++) { + appendBoolean(block.getBoolean(i++)); + } + endPositionEntry(); + return this; + } + @Override public BooleanBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) { this.mvOrdering = mvOrdering; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java index 6fe45f33a7df6..6661895722725 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java @@ -228,6 +228,16 @@ sealed interface Builder extends Block.Builder, BlockLoader.BytesRefBuilder perm */ Builder copyFrom(BytesRefBlock block, int beginInclusive, int endExclusive); + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. + * @param scratch Scratch string used to prevent allocation. Share this + between many calls to this function. + */ + Builder copyFrom(BytesRefBlock block, int position, BytesRef scratch); + @Override Builder appendNull(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlockBuilder.java index 6232cbdd2717c..0a2b350780405 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlockBuilder.java @@ -88,7 +88,11 @@ public BytesRefBlockBuilder copyFrom(Block block, int beginInclusive, int endExc /** * Copy the values in {@code block} from {@code beginInclusive} to * {@code endExclusive} into this builder. + *

+ * For single-position copies see {@link #copyFrom(BytesRefBlock, int, BytesRef scratch)}. + *

*/ + @Override public BytesRefBlockBuilder copyFrom(BytesRefBlock block, int beginInclusive, int endExclusive) { if (endExclusive > block.getPositionCount()) { throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]"); @@ -105,21 +109,7 @@ public BytesRefBlockBuilder copyFrom(BytesRefBlock block, int beginInclusive, in private void copyFromBlock(BytesRefBlock block, int beginInclusive, int endExclusive) { BytesRef scratch = new BytesRef(); for (int p = beginInclusive; p < endExclusive; p++) { - if (block.isNull(p)) { - appendNull(); - continue; - } - int count = block.getValueCount(p); - if (count > 1) { - beginPositionEntry(); - } - int i = block.getFirstValueIndex(p); - for (int v = 0; v < count; v++) { - appendBytesRef(block.getBytesRef(i++, scratch)); - } - if (count > 1) { - endPositionEntry(); - } + copyFrom(block, p, scratch); } } @@ -130,6 +120,39 @@ private void copyFromVector(BytesRefVector vector, int beginInclusive, int endEx } } + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. + * @param scratch Scratch string used to prevent allocation. Share this + between many calls to this function. + *

+ * Note that there isn't a version of this method on {@link Block.Builder} that takes + * {@link Block}. That'd be quite slow, running position by position. And it's important + * to know if you are copying {@link BytesRef}s so you can have the scratch. + *

+ */ + @Override + public BytesRefBlockBuilder copyFrom(BytesRefBlock block, int position, BytesRef scratch) { + if (block.isNull(position)) { + appendNull(); + return this; + } + int count = block.getValueCount(position); + int i = block.getFirstValueIndex(position); + if (count == 1) { + appendBytesRef(block.getBytesRef(i++, scratch)); + return this; + } + beginPositionEntry(); + for (int v = 0; v < count; v++) { + appendBytesRef(block.getBytesRef(i++, scratch)); + } + endPositionEntry(); + return this; + } + @Override public BytesRefBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) { this.mvOrdering = mvOrdering; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java index 395ccd412fabb..04df6253662a9 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java @@ -217,6 +217,14 @@ sealed interface Builder extends Block.Builder, BlockLoader.DoubleBuilder permit */ Builder copyFrom(DoubleBlock block, int beginInclusive, int endExclusive); + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. + */ + Builder copyFrom(DoubleBlock block, int position); + @Override Builder appendNull(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlockBuilder.java index 5921c2daa9f92..8ecc9b91e0ffe 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlockBuilder.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.DoubleArray; @@ -85,7 +86,11 @@ public DoubleBlockBuilder copyFrom(Block block, int beginInclusive, int endExclu /** * Copy the values in {@code block} from {@code beginInclusive} to * {@code endExclusive} into this builder. + *

+ * For single-position copies see {@link #copyFrom(DoubleBlock, int)}. + *

*/ + @Override public DoubleBlockBuilder copyFrom(DoubleBlock block, int beginInclusive, int endExclusive) { if (endExclusive > block.getPositionCount()) { throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]"); @@ -101,21 +106,7 @@ public DoubleBlockBuilder copyFrom(DoubleBlock block, int beginInclusive, int en private void copyFromBlock(DoubleBlock block, int beginInclusive, int endExclusive) { for (int p = beginInclusive; p < endExclusive; p++) { - if (block.isNull(p)) { - appendNull(); - continue; - } - int count = block.getValueCount(p); - if (count > 1) { - beginPositionEntry(); - } - int i = block.getFirstValueIndex(p); - for (int v = 0; v < count; v++) { - appendDouble(block.getDouble(i++)); - } - if (count > 1) { - endPositionEntry(); - } + copyFrom(block, p); } } @@ -125,6 +116,37 @@ private void copyFromVector(DoubleVector vector, int beginInclusive, int endExcl } } + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. + *

+ * Note that there isn't a version of this method on {@link Block.Builder} that takes + * {@link Block}. That'd be quite slow, running position by position. And it's important + * to know if you are copying {@link BytesRef}s so you can have the scratch. + *

+ */ + @Override + public DoubleBlockBuilder copyFrom(DoubleBlock block, int position) { + if (block.isNull(position)) { + appendNull(); + return this; + } + int count = block.getValueCount(position); + int i = block.getFirstValueIndex(position); + if (count == 1) { + appendDouble(block.getDouble(i++)); + return this; + } + beginPositionEntry(); + for (int v = 0; v < count; v++) { + appendDouble(block.getDouble(i++)); + } + endPositionEntry(); + return this; + } + @Override public DoubleBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) { this.mvOrdering = mvOrdering; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/FloatBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/FloatBlock.java index 633c9f309901a..0679e38b63219 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/FloatBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/FloatBlock.java @@ -216,6 +216,14 @@ sealed interface Builder extends Block.Builder, BlockLoader.FloatBuilder permits */ Builder copyFrom(FloatBlock block, int beginInclusive, int endExclusive); + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. + */ + Builder copyFrom(FloatBlock block, int position); + @Override Builder appendNull(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/FloatBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/FloatBlockBuilder.java index 9c1e7aba49a21..8504912adc057 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/FloatBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/FloatBlockBuilder.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.FloatArray; @@ -85,7 +86,11 @@ public FloatBlockBuilder copyFrom(Block block, int beginInclusive, int endExclus /** * Copy the values in {@code block} from {@code beginInclusive} to * {@code endExclusive} into this builder. + *

+ * For single-position copies see {@link #copyFrom(FloatBlock, int)}. + *

*/ + @Override public FloatBlockBuilder copyFrom(FloatBlock block, int beginInclusive, int endExclusive) { if (endExclusive > block.getPositionCount()) { throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]"); @@ -101,21 +106,7 @@ public FloatBlockBuilder copyFrom(FloatBlock block, int beginInclusive, int endE private void copyFromBlock(FloatBlock block, int beginInclusive, int endExclusive) { for (int p = beginInclusive; p < endExclusive; p++) { - if (block.isNull(p)) { - appendNull(); - continue; - } - int count = block.getValueCount(p); - if (count > 1) { - beginPositionEntry(); - } - int i = block.getFirstValueIndex(p); - for (int v = 0; v < count; v++) { - appendFloat(block.getFloat(i++)); - } - if (count > 1) { - endPositionEntry(); - } + copyFrom(block, p); } } @@ -125,6 +116,37 @@ private void copyFromVector(FloatVector vector, int beginInclusive, int endExclu } } + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. + *

+ * Note that there isn't a version of this method on {@link Block.Builder} that takes + * {@link Block}. That'd be quite slow, running position by position. And it's important + * to know if you are copying {@link BytesRef}s so you can have the scratch. + *

+ */ + @Override + public FloatBlockBuilder copyFrom(FloatBlock block, int position) { + if (block.isNull(position)) { + appendNull(); + return this; + } + int count = block.getValueCount(position); + int i = block.getFirstValueIndex(position); + if (count == 1) { + appendFloat(block.getFloat(i++)); + return this; + } + beginPositionEntry(); + for (int v = 0; v < count; v++) { + appendFloat(block.getFloat(i++)); + } + endPositionEntry(); + return this; + } + @Override public FloatBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) { this.mvOrdering = mvOrdering; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java index 7c77d9965391e..6af61695929df 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java @@ -216,6 +216,14 @@ sealed interface Builder extends Block.Builder, BlockLoader.IntBuilder permits I */ Builder copyFrom(IntBlock block, int beginInclusive, int endExclusive); + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. + */ + Builder copyFrom(IntBlock block, int position); + @Override Builder appendNull(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlockBuilder.java index 85f943004de29..31449b6f1cd72 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlockBuilder.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.IntArray; @@ -85,7 +86,11 @@ public IntBlockBuilder copyFrom(Block block, int beginInclusive, int endExclusiv /** * Copy the values in {@code block} from {@code beginInclusive} to * {@code endExclusive} into this builder. + *

+ * For single-position copies see {@link #copyFrom(IntBlock, int)}. + *

*/ + @Override public IntBlockBuilder copyFrom(IntBlock block, int beginInclusive, int endExclusive) { if (endExclusive > block.getPositionCount()) { throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]"); @@ -101,21 +106,7 @@ public IntBlockBuilder copyFrom(IntBlock block, int beginInclusive, int endExclu private void copyFromBlock(IntBlock block, int beginInclusive, int endExclusive) { for (int p = beginInclusive; p < endExclusive; p++) { - if (block.isNull(p)) { - appendNull(); - continue; - } - int count = block.getValueCount(p); - if (count > 1) { - beginPositionEntry(); - } - int i = block.getFirstValueIndex(p); - for (int v = 0; v < count; v++) { - appendInt(block.getInt(i++)); - } - if (count > 1) { - endPositionEntry(); - } + copyFrom(block, p); } } @@ -125,6 +116,37 @@ private void copyFromVector(IntVector vector, int beginInclusive, int endExclusi } } + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. + *

+ * Note that there isn't a version of this method on {@link Block.Builder} that takes + * {@link Block}. That'd be quite slow, running position by position. And it's important + * to know if you are copying {@link BytesRef}s so you can have the scratch. + *

+ */ + @Override + public IntBlockBuilder copyFrom(IntBlock block, int position) { + if (block.isNull(position)) { + appendNull(); + return this; + } + int count = block.getValueCount(position); + int i = block.getFirstValueIndex(position); + if (count == 1) { + appendInt(block.getInt(i++)); + return this; + } + beginPositionEntry(); + for (int v = 0; v < count; v++) { + appendInt(block.getInt(i++)); + } + endPositionEntry(); + return this; + } + @Override public IntBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) { this.mvOrdering = mvOrdering; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java index 6c88da8860ca7..090efd9a31579 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java @@ -217,6 +217,14 @@ sealed interface Builder extends Block.Builder, BlockLoader.LongBuilder permits */ Builder copyFrom(LongBlock block, int beginInclusive, int endExclusive); + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. + */ + Builder copyFrom(LongBlock block, int position); + @Override Builder appendNull(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlockBuilder.java index d24ae214da63a..bf25347edd989 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlockBuilder.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.LongArray; @@ -85,7 +86,11 @@ public LongBlockBuilder copyFrom(Block block, int beginInclusive, int endExclusi /** * Copy the values in {@code block} from {@code beginInclusive} to * {@code endExclusive} into this builder. + *

+ * For single-position copies see {@link #copyFrom(LongBlock, int)}. + *

*/ + @Override public LongBlockBuilder copyFrom(LongBlock block, int beginInclusive, int endExclusive) { if (endExclusive > block.getPositionCount()) { throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]"); @@ -101,21 +106,7 @@ public LongBlockBuilder copyFrom(LongBlock block, int beginInclusive, int endExc private void copyFromBlock(LongBlock block, int beginInclusive, int endExclusive) { for (int p = beginInclusive; p < endExclusive; p++) { - if (block.isNull(p)) { - appendNull(); - continue; - } - int count = block.getValueCount(p); - if (count > 1) { - beginPositionEntry(); - } - int i = block.getFirstValueIndex(p); - for (int v = 0; v < count; v++) { - appendLong(block.getLong(i++)); - } - if (count > 1) { - endPositionEntry(); - } + copyFrom(block, p); } } @@ -125,6 +116,37 @@ private void copyFromVector(LongVector vector, int beginInclusive, int endExclus } } + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. + *

+ * Note that there isn't a version of this method on {@link Block.Builder} that takes + * {@link Block}. That'd be quite slow, running position by position. And it's important + * to know if you are copying {@link BytesRef}s so you can have the scratch. + *

+ */ + @Override + public LongBlockBuilder copyFrom(LongBlock block, int position) { + if (block.isNull(position)) { + appendNull(); + return this; + } + int count = block.getValueCount(position); + int i = block.getFirstValueIndex(position); + if (count == 1) { + appendLong(block.getLong(i++)); + return this; + } + beginPositionEntry(); + for (int v = 0; v < count; v++) { + appendLong(block.getLong(i++)); + } + endPositionEntry(); + return this; + } + @Override public LongBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) { this.mvOrdering = mvOrdering; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java index edf54a829deba..de87c08f7ceb1 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java @@ -280,6 +280,11 @@ interface Builder extends BlockLoader.Builder, Releasable { /** * Copy the values in {@code block} from {@code beginInclusive} to * {@code endExclusive} into this builder. + *

+ * For single position copies use the faster + * {@link IntBlockBuilder#copyFrom(IntBlock, int)}, + * {@link LongBlockBuilder#copyFrom(LongBlock, int)}, etc. + *

*/ Builder copyFrom(Block block, int beginInclusive, int endExclusive); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st index 67e4ac4bb334f..6c1616c370721 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st @@ -288,6 +288,18 @@ $endif$ */ Builder copyFrom($Type$Block block, int beginInclusive, int endExclusive); + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. +$if(BytesRef)$ + * @param scratch Scratch string used to prevent allocation. Share this + between many calls to this function. +$endif$ + */ + Builder copyFrom($Type$Block block, int position$if(BytesRef)$, BytesRef scratch$endif$); + @Override Builder appendNull(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BlockBuilder.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BlockBuilder.java.st index 8397a0f5274f1..d60e1de179d20 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BlockBuilder.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BlockBuilder.java.st @@ -15,6 +15,7 @@ import org.elasticsearch.common.util.BytesRefArray; import org.elasticsearch.core.Releasables; $else$ +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.$Array$; @@ -123,7 +124,11 @@ $endif$ /** * Copy the values in {@code block} from {@code beginInclusive} to * {@code endExclusive} into this builder. + *

+ * For single-position copies see {@link #copyFrom($Type$Block, int$if(BytesRef)$, BytesRef scratch$endif$)}. + *

*/ + @Override public $Type$BlockBuilder copyFrom($Type$Block block, int beginInclusive, int endExclusive) { if (endExclusive > block.getPositionCount()) { throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]"); @@ -142,25 +147,7 @@ $if(BytesRef)$ BytesRef scratch = new BytesRef(); $endif$ for (int p = beginInclusive; p < endExclusive; p++) { - if (block.isNull(p)) { - appendNull(); - continue; - } - int count = block.getValueCount(p); - if (count > 1) { - beginPositionEntry(); - } - int i = block.getFirstValueIndex(p); - for (int v = 0; v < count; v++) { -$if(BytesRef)$ - appendBytesRef(block.getBytesRef(i++, scratch)); -$else$ - append$Type$(block.get$Type$(i++)); -$endif$ - } - if (count > 1) { - endPositionEntry(); - } + copyFrom(block, p$if(BytesRef)$, scratch$endif$); } } @@ -177,6 +164,41 @@ $endif$ } } + /** + * Copy the values in {@code block} at {@code position}. If this position + * has a single value, this'll copy a single value. If this positions has + * many values, it'll copy all of them. If this is {@code null}, then it'll + * copy the {@code null}. +$if(BytesRef)$ + * @param scratch Scratch string used to prevent allocation. Share this + between many calls to this function. +$endif$ + *

+ * Note that there isn't a version of this method on {@link Block.Builder} that takes + * {@link Block}. That'd be quite slow, running position by position. And it's important + * to know if you are copying {@link BytesRef}s so you can have the scratch. + *

+ */ + @Override + public $Type$BlockBuilder copyFrom($Type$Block block, int position$if(BytesRef)$, BytesRef scratch$endif$) { + if (block.isNull(position)) { + appendNull(); + return this; + } + int count = block.getValueCount(position); + int i = block.getFirstValueIndex(position); + if (count == 1) { + append$Type$(block.get$Type$(i++$if(BytesRef)$, scratch$endif$)); + return this; + } + beginPositionEntry(); + for (int v = 0; v < count; v++) { + append$Type$(block.get$Type$(i++$if(BytesRef)$, scratch$endif$)); + } + endPositionEntry(); + return this; + } + @Override public $Type$BlockBuilder mvOrdering(Block.MvOrdering mvOrdering) { this.mvOrdering = mvOrdering; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/EvalOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/EvalOperator.java index 349ce7b00ff10..2573baf78b16a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/EvalOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/EvalOperator.java @@ -96,12 +96,18 @@ public Block eval(Page page) { public void close() { } + + @Override + public String toString() { + return CONSTANT_NULL_NAME; + } }; } @Override public String toString() { - return "ConstantNull"; + return CONSTANT_NULL_NAME; } }; + private static final String CONSTANT_NULL_NAME = "ConstantNull"; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java index 679e3441fb45f..1d3c8df914bc6 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java @@ -10,6 +10,7 @@ import com.carrotsearch.randomizedtesting.annotations.Name; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.compute.test.RandomBlock; import org.elasticsearch.compute.test.TestBlockFactory; import org.elasticsearch.test.ESTestCase; @@ -92,7 +93,16 @@ private void assertEvens(Block block) { Block.Builder builder = elementType.newBlockBuilder(block.getPositionCount() / 2, blockFactory); List> expected = new ArrayList<>(); for (int i = 0; i < block.getPositionCount(); i += 2) { - builder.copyFrom(block, i, i + 1); + switch (elementType) { + case BOOLEAN -> ((BooleanBlockBuilder) builder).copyFrom((BooleanBlock) block, i); + case BYTES_REF -> ((BytesRefBlockBuilder) builder).copyFrom((BytesRefBlock) block, i, new BytesRef()); + case DOUBLE -> ((DoubleBlockBuilder) builder).copyFrom((DoubleBlock) block, i); + case FLOAT -> ((FloatBlockBuilder) builder).copyFrom((FloatBlock) block, i); + case INT -> ((IntBlockBuilder) builder).copyFrom((IntBlock) block, i); + case LONG -> ((LongBlockBuilder) builder).copyFrom((LongBlock) block, i); + default -> throw new IllegalArgumentException("unsupported type: " + elementType); + } + expected.add(valuesAtPositions(block, i, i + 1).get(0)); } assertBlockValues(builder.build(), expected); diff --git a/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceBooleanEvaluator.java b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceBooleanEvaluator.java new file mode 100644 index 0000000000000..97b4cba0d9938 --- /dev/null +++ b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceBooleanEvaluator.java @@ -0,0 +1,225 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.scalar.nulls; + +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.EvalOperator; +import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper; + +import java.util.List; +import java.util.stream.IntStream; + +/** + * {@link EvalOperator.ExpressionEvaluator} implementation for {@link Coalesce}. + * This class is generated. Edit {@code X-InEvaluator.java.st} instead. + */ +abstract sealed class CoalesceBooleanEvaluator implements EvalOperator.ExpressionEvaluator permits + CoalesceBooleanEvaluator.CoalesceBooleanEagerEvaluator, // + CoalesceBooleanEvaluator.CoalesceBooleanLazyEvaluator { + + static ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator, List children) { + List childEvaluators = children.stream().map(toEvaluator::apply).toList(); + if (childEvaluators.stream().allMatch(ExpressionEvaluator.Factory::eagerEvalSafeInLazy)) { + return new ExpressionEvaluator.Factory() { + @Override + public ExpressionEvaluator get(DriverContext context) { + return new CoalesceBooleanEagerEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList()); + } + + @Override + public String toString() { + return "CoalesceBooleanEagerEvaluator[values=" + childEvaluators + ']'; + } + }; + } + return new ExpressionEvaluator.Factory() { + @Override + public ExpressionEvaluator get(DriverContext context) { + return new CoalesceBooleanLazyEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList()); + } + + @Override + public String toString() { + return "CoalesceBooleanLazyEvaluator[values=" + childEvaluators + ']'; + } + }; + } + + protected final DriverContext driverContext; + protected final List evaluators; + + protected CoalesceBooleanEvaluator(DriverContext driverContext, List evaluators) { + this.driverContext = driverContext; + this.evaluators = evaluators; + } + + @Override + public final BooleanBlock eval(Page page) { + return entireBlock(page); + } + + /** + * Evaluate COALESCE for an entire {@link Block} for as long as we can, then shift to + * {@link #perPosition} evaluation. + *

+ * Entire Block evaluation is the "normal" way to run the compute engine, + * just calling {@link EvalOperator.ExpressionEvaluator#eval}. It's much faster so we try + * that first. For each evaluator, we {@linkplain EvalOperator.ExpressionEvaluator#eval} and: + *

+ *
    + *
  • If the {@linkplain Block} doesn't have any nulls we return it. COALESCE done.
  • + *
  • If the {@linkplain Block} is only nulls we skip it and try the next evaluator.
  • + *
  • If this is the last evaluator we just return it. COALESCE done.
  • + *
  • + * Otherwise, the {@linkplain Block} has mixed nulls and non-nulls so we drop + * into a per position evaluator. + *
  • + *
+ */ + private BooleanBlock entireBlock(Page page) { + int lastFullBlockIdx = 0; + while (true) { + BooleanBlock lastFullBlock = (BooleanBlock) evaluators.get(lastFullBlockIdx++).eval(page); + if (lastFullBlockIdx == evaluators.size() || lastFullBlock.asVector() != null) { + return lastFullBlock; + } + if (lastFullBlock.areAllValuesNull()) { + // Result is all nulls and isn't the last result so we don't need any of it. + lastFullBlock.close(); + continue; + } + // The result has some nulls and some non-nulls. + return perPosition(page, lastFullBlock, lastFullBlockIdx); + } + } + + /** + * Evaluate each position of the incoming {@link Page} for COALESCE + * independently. Our attempt to evaluate entire blocks has yielded + * a block that contains some nulls and some non-nulls and we have + * to fill in the nulls with the results of calling the remaining + * evaluators. + *

+ * This must not return warnings caused by + * evaluating positions for which a previous evaluator returned + * non-null. These are positions that, at least from the perspective + * of a compute engine user, don't have to be + * evaluated. Put another way, this must function as though + * {@code COALESCE} were per-position lazy. It can manage that + * any way it likes. + *

+ */ + protected abstract BooleanBlock perPosition(Page page, BooleanBlock lastFullBlock, int firstToEvaluate); + + @Override + public final String toString() { + return getClass().getSimpleName() + "[values=" + evaluators + ']'; + } + + @Override + public final void close() { + Releasables.closeExpectNoException(() -> Releasables.close(evaluators)); + } + + /** + * Evaluates {@code COALESCE} eagerly per position if entire-block evaluation fails. + * First we evaluate all remaining evaluators, and then we pluck the first non-null + * value from each one. This is much faster than + * {@link CoalesceBooleanLazyEvaluator} but will include spurious warnings if any of the + * evaluators make them so we only use it for evaluators that are + * {@link Factory#eagerEvalSafeInLazy safe} to evaluate eagerly + * in a lazy environment. + */ + static final class CoalesceBooleanEagerEvaluator extends CoalesceBooleanEvaluator { + CoalesceBooleanEagerEvaluator(DriverContext driverContext, List evaluators) { + super(driverContext, evaluators); + } + + @Override + protected BooleanBlock perPosition(Page page, BooleanBlock lastFullBlock, int firstToEvaluate) { + int positionCount = page.getPositionCount(); + BooleanBlock[] flatten = new BooleanBlock[evaluators.size() - firstToEvaluate + 1]; + try { + flatten[0] = lastFullBlock; + for (int f = 1; f < flatten.length; f++) { + flatten[f] = (BooleanBlock) evaluators.get(firstToEvaluate + f - 1).eval(page); + } + try (BooleanBlock.Builder result = driverContext.blockFactory().newBooleanBlockBuilder(positionCount)) { + position: for (int p = 0; p < positionCount; p++) { + for (BooleanBlock f : flatten) { + if (false == f.isNull(p)) { + result.copyFrom(f, p); + continue position; + } + } + result.appendNull(); + } + return result.build(); + } + } finally { + Releasables.close(flatten); + } + } + } + + /** + * Evaluates {@code COALESCE} lazily per position if entire-block evaluation fails. + * For each position we either: + *
    + *
  • Take the non-null values from the {@code lastFullBlock}
  • + *
  • + * Evaluator the remaining evaluators one at a time, keeping + * the first non-null value. + *
  • + *
+ */ + static final class CoalesceBooleanLazyEvaluator extends CoalesceBooleanEvaluator { + CoalesceBooleanLazyEvaluator(DriverContext driverContext, List evaluators) { + super(driverContext, evaluators); + } + + @Override + protected BooleanBlock perPosition(Page page, BooleanBlock lastFullBlock, int firstToEvaluate) { + int positionCount = page.getPositionCount(); + try (BooleanBlock.Builder result = driverContext.blockFactory().newBooleanBlockBuilder(positionCount)) { + position: for (int p = 0; p < positionCount; p++) { + if (lastFullBlock.isNull(p) == false) { + result.copyFrom(lastFullBlock, p, p + 1); + continue; + } + int[] positions = new int[] { p }; + Page limited = new Page( + 1, + IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new) + ); + try (Releasable ignored = limited::releaseBlocks) { + for (int e = firstToEvaluate; e < evaluators.size(); e++) { + try (BooleanBlock block = (BooleanBlock) evaluators.get(e).eval(limited)) { + if (false == block.isNull(0)) { + result.copyFrom(block, 0); + continue position; + } + } + } + result.appendNull(); + } + } + return result.build(); + } finally { + lastFullBlock.close(); + } + } + } +} diff --git a/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceBytesRefEvaluator.java b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceBytesRefEvaluator.java new file mode 100644 index 0000000000000..7d6834e765a96 --- /dev/null +++ b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceBytesRefEvaluator.java @@ -0,0 +1,228 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.scalar.nulls; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.EvalOperator; +import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper; + +import java.util.List; +import java.util.stream.IntStream; + +/** + * {@link EvalOperator.ExpressionEvaluator} implementation for {@link Coalesce}. + * This class is generated. Edit {@code X-InEvaluator.java.st} instead. + */ +abstract sealed class CoalesceBytesRefEvaluator implements EvalOperator.ExpressionEvaluator permits + CoalesceBytesRefEvaluator.CoalesceBytesRefEagerEvaluator, // + CoalesceBytesRefEvaluator.CoalesceBytesRefLazyEvaluator { + + static ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator, List children) { + List childEvaluators = children.stream().map(toEvaluator::apply).toList(); + if (childEvaluators.stream().allMatch(ExpressionEvaluator.Factory::eagerEvalSafeInLazy)) { + return new ExpressionEvaluator.Factory() { + @Override + public ExpressionEvaluator get(DriverContext context) { + return new CoalesceBytesRefEagerEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList()); + } + + @Override + public String toString() { + return "CoalesceBytesRefEagerEvaluator[values=" + childEvaluators + ']'; + } + }; + } + return new ExpressionEvaluator.Factory() { + @Override + public ExpressionEvaluator get(DriverContext context) { + return new CoalesceBytesRefLazyEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList()); + } + + @Override + public String toString() { + return "CoalesceBytesRefLazyEvaluator[values=" + childEvaluators + ']'; + } + }; + } + + protected final DriverContext driverContext; + protected final List evaluators; + + protected CoalesceBytesRefEvaluator(DriverContext driverContext, List evaluators) { + this.driverContext = driverContext; + this.evaluators = evaluators; + } + + @Override + public final BytesRefBlock eval(Page page) { + return entireBlock(page); + } + + /** + * Evaluate COALESCE for an entire {@link Block} for as long as we can, then shift to + * {@link #perPosition} evaluation. + *

+ * Entire Block evaluation is the "normal" way to run the compute engine, + * just calling {@link EvalOperator.ExpressionEvaluator#eval}. It's much faster so we try + * that first. For each evaluator, we {@linkplain EvalOperator.ExpressionEvaluator#eval} and: + *

+ *
    + *
  • If the {@linkplain Block} doesn't have any nulls we return it. COALESCE done.
  • + *
  • If the {@linkplain Block} is only nulls we skip it and try the next evaluator.
  • + *
  • If this is the last evaluator we just return it. COALESCE done.
  • + *
  • + * Otherwise, the {@linkplain Block} has mixed nulls and non-nulls so we drop + * into a per position evaluator. + *
  • + *
+ */ + private BytesRefBlock entireBlock(Page page) { + int lastFullBlockIdx = 0; + while (true) { + BytesRefBlock lastFullBlock = (BytesRefBlock) evaluators.get(lastFullBlockIdx++).eval(page); + if (lastFullBlockIdx == evaluators.size() || lastFullBlock.asVector() != null) { + return lastFullBlock; + } + if (lastFullBlock.areAllValuesNull()) { + // Result is all nulls and isn't the last result so we don't need any of it. + lastFullBlock.close(); + continue; + } + // The result has some nulls and some non-nulls. + return perPosition(page, lastFullBlock, lastFullBlockIdx); + } + } + + /** + * Evaluate each position of the incoming {@link Page} for COALESCE + * independently. Our attempt to evaluate entire blocks has yielded + * a block that contains some nulls and some non-nulls and we have + * to fill in the nulls with the results of calling the remaining + * evaluators. + *

+ * This must not return warnings caused by + * evaluating positions for which a previous evaluator returned + * non-null. These are positions that, at least from the perspective + * of a compute engine user, don't have to be + * evaluated. Put another way, this must function as though + * {@code COALESCE} were per-position lazy. It can manage that + * any way it likes. + *

+ */ + protected abstract BytesRefBlock perPosition(Page page, BytesRefBlock lastFullBlock, int firstToEvaluate); + + @Override + public final String toString() { + return getClass().getSimpleName() + "[values=" + evaluators + ']'; + } + + @Override + public final void close() { + Releasables.closeExpectNoException(() -> Releasables.close(evaluators)); + } + + /** + * Evaluates {@code COALESCE} eagerly per position if entire-block evaluation fails. + * First we evaluate all remaining evaluators, and then we pluck the first non-null + * value from each one. This is much faster than + * {@link CoalesceBytesRefLazyEvaluator} but will include spurious warnings if any of the + * evaluators make them so we only use it for evaluators that are + * {@link Factory#eagerEvalSafeInLazy safe} to evaluate eagerly + * in a lazy environment. + */ + static final class CoalesceBytesRefEagerEvaluator extends CoalesceBytesRefEvaluator { + CoalesceBytesRefEagerEvaluator(DriverContext driverContext, List evaluators) { + super(driverContext, evaluators); + } + + @Override + protected BytesRefBlock perPosition(Page page, BytesRefBlock lastFullBlock, int firstToEvaluate) { + BytesRef scratch = new BytesRef(); + int positionCount = page.getPositionCount(); + BytesRefBlock[] flatten = new BytesRefBlock[evaluators.size() - firstToEvaluate + 1]; + try { + flatten[0] = lastFullBlock; + for (int f = 1; f < flatten.length; f++) { + flatten[f] = (BytesRefBlock) evaluators.get(firstToEvaluate + f - 1).eval(page); + } + try (BytesRefBlock.Builder result = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount)) { + position: for (int p = 0; p < positionCount; p++) { + for (BytesRefBlock f : flatten) { + if (false == f.isNull(p)) { + result.copyFrom(f, p, scratch); + continue position; + } + } + result.appendNull(); + } + return result.build(); + } + } finally { + Releasables.close(flatten); + } + } + } + + /** + * Evaluates {@code COALESCE} lazily per position if entire-block evaluation fails. + * For each position we either: + *
    + *
  • Take the non-null values from the {@code lastFullBlock}
  • + *
  • + * Evaluator the remaining evaluators one at a time, keeping + * the first non-null value. + *
  • + *
+ */ + static final class CoalesceBytesRefLazyEvaluator extends CoalesceBytesRefEvaluator { + CoalesceBytesRefLazyEvaluator(DriverContext driverContext, List evaluators) { + super(driverContext, evaluators); + } + + @Override + protected BytesRefBlock perPosition(Page page, BytesRefBlock lastFullBlock, int firstToEvaluate) { + BytesRef scratch = new BytesRef(); + int positionCount = page.getPositionCount(); + try (BytesRefBlock.Builder result = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount)) { + position: for (int p = 0; p < positionCount; p++) { + if (lastFullBlock.isNull(p) == false) { + result.copyFrom(lastFullBlock, p, p + 1); + continue; + } + int[] positions = new int[] { p }; + Page limited = new Page( + 1, + IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new) + ); + try (Releasable ignored = limited::releaseBlocks) { + for (int e = firstToEvaluate; e < evaluators.size(); e++) { + try (BytesRefBlock block = (BytesRefBlock) evaluators.get(e).eval(limited)) { + if (false == block.isNull(0)) { + result.copyFrom(block, 0, scratch); + continue position; + } + } + } + result.appendNull(); + } + } + return result.build(); + } finally { + lastFullBlock.close(); + } + } + } +} diff --git a/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceDoubleEvaluator.java b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceDoubleEvaluator.java new file mode 100644 index 0000000000000..4c01a961ecbee --- /dev/null +++ b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceDoubleEvaluator.java @@ -0,0 +1,225 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.scalar.nulls; + +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.EvalOperator; +import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper; + +import java.util.List; +import java.util.stream.IntStream; + +/** + * {@link EvalOperator.ExpressionEvaluator} implementation for {@link Coalesce}. + * This class is generated. Edit {@code X-InEvaluator.java.st} instead. + */ +abstract sealed class CoalesceDoubleEvaluator implements EvalOperator.ExpressionEvaluator permits + CoalesceDoubleEvaluator.CoalesceDoubleEagerEvaluator, // + CoalesceDoubleEvaluator.CoalesceDoubleLazyEvaluator { + + static ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator, List children) { + List childEvaluators = children.stream().map(toEvaluator::apply).toList(); + if (childEvaluators.stream().allMatch(ExpressionEvaluator.Factory::eagerEvalSafeInLazy)) { + return new ExpressionEvaluator.Factory() { + @Override + public ExpressionEvaluator get(DriverContext context) { + return new CoalesceDoubleEagerEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList()); + } + + @Override + public String toString() { + return "CoalesceDoubleEagerEvaluator[values=" + childEvaluators + ']'; + } + }; + } + return new ExpressionEvaluator.Factory() { + @Override + public ExpressionEvaluator get(DriverContext context) { + return new CoalesceDoubleLazyEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList()); + } + + @Override + public String toString() { + return "CoalesceDoubleLazyEvaluator[values=" + childEvaluators + ']'; + } + }; + } + + protected final DriverContext driverContext; + protected final List evaluators; + + protected CoalesceDoubleEvaluator(DriverContext driverContext, List evaluators) { + this.driverContext = driverContext; + this.evaluators = evaluators; + } + + @Override + public final DoubleBlock eval(Page page) { + return entireBlock(page); + } + + /** + * Evaluate COALESCE for an entire {@link Block} for as long as we can, then shift to + * {@link #perPosition} evaluation. + *

+ * Entire Block evaluation is the "normal" way to run the compute engine, + * just calling {@link EvalOperator.ExpressionEvaluator#eval}. It's much faster so we try + * that first. For each evaluator, we {@linkplain EvalOperator.ExpressionEvaluator#eval} and: + *

+ *
    + *
  • If the {@linkplain Block} doesn't have any nulls we return it. COALESCE done.
  • + *
  • If the {@linkplain Block} is only nulls we skip it and try the next evaluator.
  • + *
  • If this is the last evaluator we just return it. COALESCE done.
  • + *
  • + * Otherwise, the {@linkplain Block} has mixed nulls and non-nulls so we drop + * into a per position evaluator. + *
  • + *
+ */ + private DoubleBlock entireBlock(Page page) { + int lastFullBlockIdx = 0; + while (true) { + DoubleBlock lastFullBlock = (DoubleBlock) evaluators.get(lastFullBlockIdx++).eval(page); + if (lastFullBlockIdx == evaluators.size() || lastFullBlock.asVector() != null) { + return lastFullBlock; + } + if (lastFullBlock.areAllValuesNull()) { + // Result is all nulls and isn't the last result so we don't need any of it. + lastFullBlock.close(); + continue; + } + // The result has some nulls and some non-nulls. + return perPosition(page, lastFullBlock, lastFullBlockIdx); + } + } + + /** + * Evaluate each position of the incoming {@link Page} for COALESCE + * independently. Our attempt to evaluate entire blocks has yielded + * a block that contains some nulls and some non-nulls and we have + * to fill in the nulls with the results of calling the remaining + * evaluators. + *

+ * This must not return warnings caused by + * evaluating positions for which a previous evaluator returned + * non-null. These are positions that, at least from the perspective + * of a compute engine user, don't have to be + * evaluated. Put another way, this must function as though + * {@code COALESCE} were per-position lazy. It can manage that + * any way it likes. + *

+ */ + protected abstract DoubleBlock perPosition(Page page, DoubleBlock lastFullBlock, int firstToEvaluate); + + @Override + public final String toString() { + return getClass().getSimpleName() + "[values=" + evaluators + ']'; + } + + @Override + public final void close() { + Releasables.closeExpectNoException(() -> Releasables.close(evaluators)); + } + + /** + * Evaluates {@code COALESCE} eagerly per position if entire-block evaluation fails. + * First we evaluate all remaining evaluators, and then we pluck the first non-null + * value from each one. This is much faster than + * {@link CoalesceDoubleLazyEvaluator} but will include spurious warnings if any of the + * evaluators make them so we only use it for evaluators that are + * {@link Factory#eagerEvalSafeInLazy safe} to evaluate eagerly + * in a lazy environment. + */ + static final class CoalesceDoubleEagerEvaluator extends CoalesceDoubleEvaluator { + CoalesceDoubleEagerEvaluator(DriverContext driverContext, List evaluators) { + super(driverContext, evaluators); + } + + @Override + protected DoubleBlock perPosition(Page page, DoubleBlock lastFullBlock, int firstToEvaluate) { + int positionCount = page.getPositionCount(); + DoubleBlock[] flatten = new DoubleBlock[evaluators.size() - firstToEvaluate + 1]; + try { + flatten[0] = lastFullBlock; + for (int f = 1; f < flatten.length; f++) { + flatten[f] = (DoubleBlock) evaluators.get(firstToEvaluate + f - 1).eval(page); + } + try (DoubleBlock.Builder result = driverContext.blockFactory().newDoubleBlockBuilder(positionCount)) { + position: for (int p = 0; p < positionCount; p++) { + for (DoubleBlock f : flatten) { + if (false == f.isNull(p)) { + result.copyFrom(f, p); + continue position; + } + } + result.appendNull(); + } + return result.build(); + } + } finally { + Releasables.close(flatten); + } + } + } + + /** + * Evaluates {@code COALESCE} lazily per position if entire-block evaluation fails. + * For each position we either: + *
    + *
  • Take the non-null values from the {@code lastFullBlock}
  • + *
  • + * Evaluator the remaining evaluators one at a time, keeping + * the first non-null value. + *
  • + *
+ */ + static final class CoalesceDoubleLazyEvaluator extends CoalesceDoubleEvaluator { + CoalesceDoubleLazyEvaluator(DriverContext driverContext, List evaluators) { + super(driverContext, evaluators); + } + + @Override + protected DoubleBlock perPosition(Page page, DoubleBlock lastFullBlock, int firstToEvaluate) { + int positionCount = page.getPositionCount(); + try (DoubleBlock.Builder result = driverContext.blockFactory().newDoubleBlockBuilder(positionCount)) { + position: for (int p = 0; p < positionCount; p++) { + if (lastFullBlock.isNull(p) == false) { + result.copyFrom(lastFullBlock, p, p + 1); + continue; + } + int[] positions = new int[] { p }; + Page limited = new Page( + 1, + IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new) + ); + try (Releasable ignored = limited::releaseBlocks) { + for (int e = firstToEvaluate; e < evaluators.size(); e++) { + try (DoubleBlock block = (DoubleBlock) evaluators.get(e).eval(limited)) { + if (false == block.isNull(0)) { + result.copyFrom(block, 0); + continue position; + } + } + } + result.appendNull(); + } + } + return result.build(); + } finally { + lastFullBlock.close(); + } + } + } +} diff --git a/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceIntEvaluator.java b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceIntEvaluator.java new file mode 100644 index 0000000000000..e90bd4b8e5e35 --- /dev/null +++ b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceIntEvaluator.java @@ -0,0 +1,225 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.scalar.nulls; + +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.EvalOperator; +import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper; + +import java.util.List; +import java.util.stream.IntStream; + +/** + * {@link EvalOperator.ExpressionEvaluator} implementation for {@link Coalesce}. + * This class is generated. Edit {@code X-InEvaluator.java.st} instead. + */ +abstract sealed class CoalesceIntEvaluator implements EvalOperator.ExpressionEvaluator permits + CoalesceIntEvaluator.CoalesceIntEagerEvaluator, // + CoalesceIntEvaluator.CoalesceIntLazyEvaluator { + + static ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator, List children) { + List childEvaluators = children.stream().map(toEvaluator::apply).toList(); + if (childEvaluators.stream().allMatch(ExpressionEvaluator.Factory::eagerEvalSafeInLazy)) { + return new ExpressionEvaluator.Factory() { + @Override + public ExpressionEvaluator get(DriverContext context) { + return new CoalesceIntEagerEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList()); + } + + @Override + public String toString() { + return "CoalesceIntEagerEvaluator[values=" + childEvaluators + ']'; + } + }; + } + return new ExpressionEvaluator.Factory() { + @Override + public ExpressionEvaluator get(DriverContext context) { + return new CoalesceIntLazyEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList()); + } + + @Override + public String toString() { + return "CoalesceIntLazyEvaluator[values=" + childEvaluators + ']'; + } + }; + } + + protected final DriverContext driverContext; + protected final List evaluators; + + protected CoalesceIntEvaluator(DriverContext driverContext, List evaluators) { + this.driverContext = driverContext; + this.evaluators = evaluators; + } + + @Override + public final IntBlock eval(Page page) { + return entireBlock(page); + } + + /** + * Evaluate COALESCE for an entire {@link Block} for as long as we can, then shift to + * {@link #perPosition} evaluation. + *

+ * Entire Block evaluation is the "normal" way to run the compute engine, + * just calling {@link EvalOperator.ExpressionEvaluator#eval}. It's much faster so we try + * that first. For each evaluator, we {@linkplain EvalOperator.ExpressionEvaluator#eval} and: + *

+ *
    + *
  • If the {@linkplain Block} doesn't have any nulls we return it. COALESCE done.
  • + *
  • If the {@linkplain Block} is only nulls we skip it and try the next evaluator.
  • + *
  • If this is the last evaluator we just return it. COALESCE done.
  • + *
  • + * Otherwise, the {@linkplain Block} has mixed nulls and non-nulls so we drop + * into a per position evaluator. + *
  • + *
+ */ + private IntBlock entireBlock(Page page) { + int lastFullBlockIdx = 0; + while (true) { + IntBlock lastFullBlock = (IntBlock) evaluators.get(lastFullBlockIdx++).eval(page); + if (lastFullBlockIdx == evaluators.size() || lastFullBlock.asVector() != null) { + return lastFullBlock; + } + if (lastFullBlock.areAllValuesNull()) { + // Result is all nulls and isn't the last result so we don't need any of it. + lastFullBlock.close(); + continue; + } + // The result has some nulls and some non-nulls. + return perPosition(page, lastFullBlock, lastFullBlockIdx); + } + } + + /** + * Evaluate each position of the incoming {@link Page} for COALESCE + * independently. Our attempt to evaluate entire blocks has yielded + * a block that contains some nulls and some non-nulls and we have + * to fill in the nulls with the results of calling the remaining + * evaluators. + *

+ * This must not return warnings caused by + * evaluating positions for which a previous evaluator returned + * non-null. These are positions that, at least from the perspective + * of a compute engine user, don't have to be + * evaluated. Put another way, this must function as though + * {@code COALESCE} were per-position lazy. It can manage that + * any way it likes. + *

+ */ + protected abstract IntBlock perPosition(Page page, IntBlock lastFullBlock, int firstToEvaluate); + + @Override + public final String toString() { + return getClass().getSimpleName() + "[values=" + evaluators + ']'; + } + + @Override + public final void close() { + Releasables.closeExpectNoException(() -> Releasables.close(evaluators)); + } + + /** + * Evaluates {@code COALESCE} eagerly per position if entire-block evaluation fails. + * First we evaluate all remaining evaluators, and then we pluck the first non-null + * value from each one. This is much faster than + * {@link CoalesceIntLazyEvaluator} but will include spurious warnings if any of the + * evaluators make them so we only use it for evaluators that are + * {@link Factory#eagerEvalSafeInLazy safe} to evaluate eagerly + * in a lazy environment. + */ + static final class CoalesceIntEagerEvaluator extends CoalesceIntEvaluator { + CoalesceIntEagerEvaluator(DriverContext driverContext, List evaluators) { + super(driverContext, evaluators); + } + + @Override + protected IntBlock perPosition(Page page, IntBlock lastFullBlock, int firstToEvaluate) { + int positionCount = page.getPositionCount(); + IntBlock[] flatten = new IntBlock[evaluators.size() - firstToEvaluate + 1]; + try { + flatten[0] = lastFullBlock; + for (int f = 1; f < flatten.length; f++) { + flatten[f] = (IntBlock) evaluators.get(firstToEvaluate + f - 1).eval(page); + } + try (IntBlock.Builder result = driverContext.blockFactory().newIntBlockBuilder(positionCount)) { + position: for (int p = 0; p < positionCount; p++) { + for (IntBlock f : flatten) { + if (false == f.isNull(p)) { + result.copyFrom(f, p); + continue position; + } + } + result.appendNull(); + } + return result.build(); + } + } finally { + Releasables.close(flatten); + } + } + } + + /** + * Evaluates {@code COALESCE} lazily per position if entire-block evaluation fails. + * For each position we either: + *
    + *
  • Take the non-null values from the {@code lastFullBlock}
  • + *
  • + * Evaluator the remaining evaluators one at a time, keeping + * the first non-null value. + *
  • + *
+ */ + static final class CoalesceIntLazyEvaluator extends CoalesceIntEvaluator { + CoalesceIntLazyEvaluator(DriverContext driverContext, List evaluators) { + super(driverContext, evaluators); + } + + @Override + protected IntBlock perPosition(Page page, IntBlock lastFullBlock, int firstToEvaluate) { + int positionCount = page.getPositionCount(); + try (IntBlock.Builder result = driverContext.blockFactory().newIntBlockBuilder(positionCount)) { + position: for (int p = 0; p < positionCount; p++) { + if (lastFullBlock.isNull(p) == false) { + result.copyFrom(lastFullBlock, p, p + 1); + continue; + } + int[] positions = new int[] { p }; + Page limited = new Page( + 1, + IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new) + ); + try (Releasable ignored = limited::releaseBlocks) { + for (int e = firstToEvaluate; e < evaluators.size(); e++) { + try (IntBlock block = (IntBlock) evaluators.get(e).eval(limited)) { + if (false == block.isNull(0)) { + result.copyFrom(block, 0); + continue position; + } + } + } + result.appendNull(); + } + } + return result.build(); + } finally { + lastFullBlock.close(); + } + } + } +} diff --git a/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceLongEvaluator.java b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceLongEvaluator.java new file mode 100644 index 0000000000000..53a21ad1198f4 --- /dev/null +++ b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceLongEvaluator.java @@ -0,0 +1,225 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.scalar.nulls; + +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.EvalOperator; +import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper; + +import java.util.List; +import java.util.stream.IntStream; + +/** + * {@link EvalOperator.ExpressionEvaluator} implementation for {@link Coalesce}. + * This class is generated. Edit {@code X-InEvaluator.java.st} instead. + */ +abstract sealed class CoalesceLongEvaluator implements EvalOperator.ExpressionEvaluator permits + CoalesceLongEvaluator.CoalesceLongEagerEvaluator, // + CoalesceLongEvaluator.CoalesceLongLazyEvaluator { + + static ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator, List children) { + List childEvaluators = children.stream().map(toEvaluator::apply).toList(); + if (childEvaluators.stream().allMatch(ExpressionEvaluator.Factory::eagerEvalSafeInLazy)) { + return new ExpressionEvaluator.Factory() { + @Override + public ExpressionEvaluator get(DriverContext context) { + return new CoalesceLongEagerEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList()); + } + + @Override + public String toString() { + return "CoalesceLongEagerEvaluator[values=" + childEvaluators + ']'; + } + }; + } + return new ExpressionEvaluator.Factory() { + @Override + public ExpressionEvaluator get(DriverContext context) { + return new CoalesceLongLazyEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList()); + } + + @Override + public String toString() { + return "CoalesceLongLazyEvaluator[values=" + childEvaluators + ']'; + } + }; + } + + protected final DriverContext driverContext; + protected final List evaluators; + + protected CoalesceLongEvaluator(DriverContext driverContext, List evaluators) { + this.driverContext = driverContext; + this.evaluators = evaluators; + } + + @Override + public final LongBlock eval(Page page) { + return entireBlock(page); + } + + /** + * Evaluate COALESCE for an entire {@link Block} for as long as we can, then shift to + * {@link #perPosition} evaluation. + *

+ * Entire Block evaluation is the "normal" way to run the compute engine, + * just calling {@link EvalOperator.ExpressionEvaluator#eval}. It's much faster so we try + * that first. For each evaluator, we {@linkplain EvalOperator.ExpressionEvaluator#eval} and: + *

+ *
    + *
  • If the {@linkplain Block} doesn't have any nulls we return it. COALESCE done.
  • + *
  • If the {@linkplain Block} is only nulls we skip it and try the next evaluator.
  • + *
  • If this is the last evaluator we just return it. COALESCE done.
  • + *
  • + * Otherwise, the {@linkplain Block} has mixed nulls and non-nulls so we drop + * into a per position evaluator. + *
  • + *
+ */ + private LongBlock entireBlock(Page page) { + int lastFullBlockIdx = 0; + while (true) { + LongBlock lastFullBlock = (LongBlock) evaluators.get(lastFullBlockIdx++).eval(page); + if (lastFullBlockIdx == evaluators.size() || lastFullBlock.asVector() != null) { + return lastFullBlock; + } + if (lastFullBlock.areAllValuesNull()) { + // Result is all nulls and isn't the last result so we don't need any of it. + lastFullBlock.close(); + continue; + } + // The result has some nulls and some non-nulls. + return perPosition(page, lastFullBlock, lastFullBlockIdx); + } + } + + /** + * Evaluate each position of the incoming {@link Page} for COALESCE + * independently. Our attempt to evaluate entire blocks has yielded + * a block that contains some nulls and some non-nulls and we have + * to fill in the nulls with the results of calling the remaining + * evaluators. + *

+ * This must not return warnings caused by + * evaluating positions for which a previous evaluator returned + * non-null. These are positions that, at least from the perspective + * of a compute engine user, don't have to be + * evaluated. Put another way, this must function as though + * {@code COALESCE} were per-position lazy. It can manage that + * any way it likes. + *

+ */ + protected abstract LongBlock perPosition(Page page, LongBlock lastFullBlock, int firstToEvaluate); + + @Override + public final String toString() { + return getClass().getSimpleName() + "[values=" + evaluators + ']'; + } + + @Override + public final void close() { + Releasables.closeExpectNoException(() -> Releasables.close(evaluators)); + } + + /** + * Evaluates {@code COALESCE} eagerly per position if entire-block evaluation fails. + * First we evaluate all remaining evaluators, and then we pluck the first non-null + * value from each one. This is much faster than + * {@link CoalesceLongLazyEvaluator} but will include spurious warnings if any of the + * evaluators make them so we only use it for evaluators that are + * {@link Factory#eagerEvalSafeInLazy safe} to evaluate eagerly + * in a lazy environment. + */ + static final class CoalesceLongEagerEvaluator extends CoalesceLongEvaluator { + CoalesceLongEagerEvaluator(DriverContext driverContext, List evaluators) { + super(driverContext, evaluators); + } + + @Override + protected LongBlock perPosition(Page page, LongBlock lastFullBlock, int firstToEvaluate) { + int positionCount = page.getPositionCount(); + LongBlock[] flatten = new LongBlock[evaluators.size() - firstToEvaluate + 1]; + try { + flatten[0] = lastFullBlock; + for (int f = 1; f < flatten.length; f++) { + flatten[f] = (LongBlock) evaluators.get(firstToEvaluate + f - 1).eval(page); + } + try (LongBlock.Builder result = driverContext.blockFactory().newLongBlockBuilder(positionCount)) { + position: for (int p = 0; p < positionCount; p++) { + for (LongBlock f : flatten) { + if (false == f.isNull(p)) { + result.copyFrom(f, p); + continue position; + } + } + result.appendNull(); + } + return result.build(); + } + } finally { + Releasables.close(flatten); + } + } + } + + /** + * Evaluates {@code COALESCE} lazily per position if entire-block evaluation fails. + * For each position we either: + *
    + *
  • Take the non-null values from the {@code lastFullBlock}
  • + *
  • + * Evaluator the remaining evaluators one at a time, keeping + * the first non-null value. + *
  • + *
+ */ + static final class CoalesceLongLazyEvaluator extends CoalesceLongEvaluator { + CoalesceLongLazyEvaluator(DriverContext driverContext, List evaluators) { + super(driverContext, evaluators); + } + + @Override + protected LongBlock perPosition(Page page, LongBlock lastFullBlock, int firstToEvaluate) { + int positionCount = page.getPositionCount(); + try (LongBlock.Builder result = driverContext.blockFactory().newLongBlockBuilder(positionCount)) { + position: for (int p = 0; p < positionCount; p++) { + if (lastFullBlock.isNull(p) == false) { + result.copyFrom(lastFullBlock, p, p + 1); + continue; + } + int[] positions = new int[] { p }; + Page limited = new Page( + 1, + IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new) + ); + try (Releasable ignored = limited::releaseBlocks) { + for (int e = firstToEvaluate; e < evaluators.size(); e++) { + try (LongBlock block = (LongBlock) evaluators.get(e).eval(limited)) { + if (false == block.isNull(0)) { + result.copyFrom(block, 0); + continue position; + } + } + } + result.appendNull(); + } + } + return result.build(); + } finally { + lastFullBlock.close(); + } + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/conditional/Case.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/conditional/Case.java index 236e625f7abe1..04da04e1b3927 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/conditional/Case.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/conditional/Case.java @@ -524,6 +524,9 @@ public Block eval(Page page) { ) { for (int p = 0; p < lhs.getPositionCount(); p++) { if (lhsOrRhs.mask().getBoolean(p)) { + // TODO Copy the per-type specialization that COALESCE has. + // There's also a slowdown because copying from a block checks to see if there are any nulls and that's slow. + // Vectors do not, so this still shows as fairly fast. But not as fast as the per-type unrolling. builder.copyFrom(lhs, p, p + 1); } else { builder.copyFrom(rhs, p, p + 1); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/Coalesce.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/Coalesce.java index 52686430ca5b5..611c7a456864a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/Coalesce.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/Coalesce.java @@ -11,13 +11,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.ElementType; -import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; -import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.Releasables; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.expression.Nullability; @@ -31,17 +26,29 @@ import org.elasticsearch.xpack.esql.expression.function.Param; import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; -import org.elasticsearch.xpack.esql.planner.PlannerUtils; import java.io.IOException; import java.util.List; -import java.util.stream.IntStream; import java.util.stream.Stream; +import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN; +import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_POINT; +import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_SHAPE; +import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME; +import static org.elasticsearch.xpack.esql.core.type.DataType.DATE_NANOS; +import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE; +import static org.elasticsearch.xpack.esql.core.type.DataType.GEO_POINT; +import static org.elasticsearch.xpack.esql.core.type.DataType.GEO_SHAPE; +import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER; +import static org.elasticsearch.xpack.esql.core.type.DataType.IP; +import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD; +import static org.elasticsearch.xpack.esql.core.type.DataType.LONG; import static org.elasticsearch.xpack.esql.core.type.DataType.NULL; +import static org.elasticsearch.xpack.esql.core.type.DataType.VERSION; /** - * Function returning the first non-null value. + * Function returning the first non-null value. {@code COALESCE} runs as though + * it were lazily evaluating each position in each incoming {@link Block}. */ public class Coalesce extends EsqlScalarFunction implements OptionalArgument { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Coalesce", Coalesce::new); @@ -194,70 +201,16 @@ public boolean foldable() { @Override public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) { - List childEvaluators = children().stream().map(toEvaluator::apply).toList(); - return new ExpressionEvaluator.Factory() { - @Override - public ExpressionEvaluator get(DriverContext context) { - return new CoalesceEvaluator( - context, - PlannerUtils.toElementType(dataType()), - childEvaluators.stream().map(x -> x.get(context)).toList() - ); - } - - @Override - public String toString() { - return "CoalesceEvaluator[values=" + childEvaluators + ']'; - } + return switch (dataType()) { + case BOOLEAN -> CoalesceBooleanEvaluator.toEvaluator(toEvaluator, children()); + case DOUBLE, COUNTER_DOUBLE -> CoalesceDoubleEvaluator.toEvaluator(toEvaluator, children()); + case INTEGER, COUNTER_INTEGER -> CoalesceIntEvaluator.toEvaluator(toEvaluator, children()); + case LONG, DATE_NANOS, DATETIME, COUNTER_LONG, UNSIGNED_LONG -> CoalesceLongEvaluator.toEvaluator(toEvaluator, children()); + case KEYWORD, TEXT, SEMANTIC_TEXT, CARTESIAN_POINT, CARTESIAN_SHAPE, GEO_POINT, GEO_SHAPE, IP, VERSION -> + CoalesceBytesRefEvaluator.toEvaluator(toEvaluator, children()); + case NULL -> EvalOperator.CONSTANT_NULL_FACTORY; + case UNSUPPORTED, SHORT, BYTE, DATE_PERIOD, OBJECT, DOC_DATA_TYPE, SOURCE, TIME_DURATION, FLOAT, HALF_FLOAT, TSID_DATA_TYPE, + SCALED_FLOAT, PARTIAL_AGG -> throw new UnsupportedOperationException(dataType() + " can't be coalesced"); }; } - - private record CoalesceEvaluator(DriverContext driverContext, ElementType resultType, List evaluators) - implements - EvalOperator.ExpressionEvaluator { - @Override - public Block eval(Page page) { - /* - * We have to evaluate lazily so any errors or warnings that would be - * produced by the right hand side are avoided. And so if anything - * on the right hand side is slow we skip it. - * - * And it'd be good if that lazy evaluation were fast. But this - * implementation isn't. It's fairly simple - running position at - * a time - but it's not at all fast. - */ - int positionCount = page.getPositionCount(); - try (Block.Builder result = resultType.newBlockBuilder(positionCount, driverContext.blockFactory())) { - position: for (int p = 0; p < positionCount; p++) { - int[] positions = new int[] { p }; - Page limited = new Page( - 1, - IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new) - ); - try (Releasable ignored = limited::releaseBlocks) { - for (EvalOperator.ExpressionEvaluator eval : evaluators) { - try (Block block = eval.eval(limited)) { - if (false == block.isNull(0)) { - result.copyFrom(block, 0, 1); - continue position; - } - } - } - result.appendNull(); - } - } - return result.build(); - } - } - - @Override - public String toString() { - return "CoalesceEvaluator[values=" + evaluators + ']'; - } - - @Override - public void close() { - Releasables.closeExpectNoException(() -> Releasables.close(evaluators)); - } - } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/X-CoalesceEvaluator.java.st b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/X-CoalesceEvaluator.java.st new file mode 100644 index 0000000000000..33841f03f7803 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/X-CoalesceEvaluator.java.st @@ -0,0 +1,234 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.scalar.nulls; + +$if(BytesRef)$ +import org.apache.lucene.util.BytesRef; +$endif$ +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.$Type$Block; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.EvalOperator; +import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper; + +import java.util.List; +import java.util.stream.IntStream; + +/** + * {@link EvalOperator.ExpressionEvaluator} implementation for {@link Coalesce}. + * This class is generated. Edit {@code X-InEvaluator.java.st} instead. + */ +abstract sealed class Coalesce$Type$Evaluator implements EvalOperator.ExpressionEvaluator permits + Coalesce$Type$Evaluator.Coalesce$Type$EagerEvaluator, // + Coalesce$Type$Evaluator.Coalesce$Type$LazyEvaluator { + + static ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator, List children) { + List childEvaluators = children.stream().map(toEvaluator::apply).toList(); + if (childEvaluators.stream().allMatch(ExpressionEvaluator.Factory::eagerEvalSafeInLazy)) { + return new ExpressionEvaluator.Factory() { + @Override + public ExpressionEvaluator get(DriverContext context) { + return new Coalesce$Type$EagerEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList()); + } + + @Override + public String toString() { + return "Coalesce$Type$EagerEvaluator[values=" + childEvaluators + ']'; + } + }; + } + return new ExpressionEvaluator.Factory() { + @Override + public ExpressionEvaluator get(DriverContext context) { + return new Coalesce$Type$LazyEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList()); + } + + @Override + public String toString() { + return "Coalesce$Type$LazyEvaluator[values=" + childEvaluators + ']'; + } + }; + } + + protected final DriverContext driverContext; + protected final List evaluators; + + protected Coalesce$Type$Evaluator(DriverContext driverContext, List evaluators) { + this.driverContext = driverContext; + this.evaluators = evaluators; + } + + @Override + public final $Type$Block eval(Page page) { + return entireBlock(page); + } + + /** + * Evaluate COALESCE for an entire {@link Block} for as long as we can, then shift to + * {@link #perPosition} evaluation. + *

+ * Entire Block evaluation is the "normal" way to run the compute engine, + * just calling {@link EvalOperator.ExpressionEvaluator#eval}. It's much faster so we try + * that first. For each evaluator, we {@linkplain EvalOperator.ExpressionEvaluator#eval} and: + *

+ *
    + *
  • If the {@linkplain Block} doesn't have any nulls we return it. COALESCE done.
  • + *
  • If the {@linkplain Block} is only nulls we skip it and try the next evaluator.
  • + *
  • If this is the last evaluator we just return it. COALESCE done.
  • + *
  • + * Otherwise, the {@linkplain Block} has mixed nulls and non-nulls so we drop + * into a per position evaluator. + *
  • + *
+ */ + private $Type$Block entireBlock(Page page) { + int lastFullBlockIdx = 0; + while (true) { + $Type$Block lastFullBlock = ($Type$Block) evaluators.get(lastFullBlockIdx++).eval(page); + if (lastFullBlockIdx == evaluators.size() || lastFullBlock.asVector() != null) { + return lastFullBlock; + } + if (lastFullBlock.areAllValuesNull()) { + // Result is all nulls and isn't the last result so we don't need any of it. + lastFullBlock.close(); + continue; + } + // The result has some nulls and some non-nulls. + return perPosition(page, lastFullBlock, lastFullBlockIdx); + } + } + + /** + * Evaluate each position of the incoming {@link Page} for COALESCE + * independently. Our attempt to evaluate entire blocks has yielded + * a block that contains some nulls and some non-nulls and we have + * to fill in the nulls with the results of calling the remaining + * evaluators. + *

+ * This must not return warnings caused by + * evaluating positions for which a previous evaluator returned + * non-null. These are positions that, at least from the perspective + * of a compute engine user, don't have to be + * evaluated. Put another way, this must function as though + * {@code COALESCE} were per-position lazy. It can manage that + * any way it likes. + *

+ */ + protected abstract $Type$Block perPosition(Page page, $Type$Block lastFullBlock, int firstToEvaluate); + + @Override + public final String toString() { + return getClass().getSimpleName() + "[values=" + evaluators + ']'; + } + + @Override + public final void close() { + Releasables.closeExpectNoException(() -> Releasables.close(evaluators)); + } + + /** + * Evaluates {@code COALESCE} eagerly per position if entire-block evaluation fails. + * First we evaluate all remaining evaluators, and then we pluck the first non-null + * value from each one. This is much faster than + * {@link Coalesce$Type$LazyEvaluator} but will include spurious warnings if any of the + * evaluators make them so we only use it for evaluators that are + * {@link Factory#eagerEvalSafeInLazy safe} to evaluate eagerly + * in a lazy environment. + */ + static final class Coalesce$Type$EagerEvaluator extends Coalesce$Type$Evaluator { + Coalesce$Type$EagerEvaluator(DriverContext driverContext, List evaluators) { + super(driverContext, evaluators); + } + + @Override + protected $Type$Block perPosition(Page page, $Type$Block lastFullBlock, int firstToEvaluate) { +$if(BytesRef)$ + BytesRef scratch = new BytesRef(); +$endif$ + int positionCount = page.getPositionCount(); + $Type$Block[] flatten = new $Type$Block[evaluators.size() - firstToEvaluate + 1]; + try { + flatten[0] = lastFullBlock; + for (int f = 1; f < flatten.length; f++) { + flatten[f] = ($Type$Block) evaluators.get(firstToEvaluate + f - 1).eval(page); + } + try ($Type$Block.Builder result = driverContext.blockFactory().new$Type$BlockBuilder(positionCount)) { + position: for (int p = 0; p < positionCount; p++) { + for ($Type$Block f : flatten) { + if (false == f.isNull(p)) { + result.copyFrom(f, p$if(BytesRef)$, scratch$endif$); + continue position; + } + } + result.appendNull(); + } + return result.build(); + } + } finally { + Releasables.close(flatten); + } + } + } + + /** + * Evaluates {@code COALESCE} lazily per position if entire-block evaluation fails. + * For each position we either: + *
    + *
  • Take the non-null values from the {@code lastFullBlock}
  • + *
  • + * Evaluator the remaining evaluators one at a time, keeping + * the first non-null value. + *
  • + *
+ */ + static final class Coalesce$Type$LazyEvaluator extends Coalesce$Type$Evaluator { + Coalesce$Type$LazyEvaluator(DriverContext driverContext, List evaluators) { + super(driverContext, evaluators); + } + + @Override + protected $Type$Block perPosition(Page page, $Type$Block lastFullBlock, int firstToEvaluate) { +$if(BytesRef)$ + BytesRef scratch = new BytesRef(); +$endif$ + int positionCount = page.getPositionCount(); + try ($Type$Block.Builder result = driverContext.blockFactory().new$Type$BlockBuilder(positionCount)) { + position: for (int p = 0; p < positionCount; p++) { + if (lastFullBlock.isNull(p) == false) { + result.copyFrom(lastFullBlock, p, p + 1); + continue; + } + int[] positions = new int[] { p }; + Page limited = new Page( + 1, + IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new) + ); + try (Releasable ignored = limited::releaseBlocks) { + for (int e = firstToEvaluate; e < evaluators.size(); e++) { + try ($Type$Block block = ($Type$Block) evaluators.get(e).eval(limited)) { + if (false == block.isNull(0)) { + result.copyFrom(block, 0$if(BytesRef)$, scratch$endif$); + continue position; + } + } + } + result.appendNull(); + } + } + return result.build(); + } finally { + lastFullBlock.close(); + } + } + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceTests.java index 688341ebaa2b7..1235a175294af 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceTests.java @@ -12,8 +12,13 @@ import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.EvalOperator; +import org.elasticsearch.compute.test.TestBlockFactory; +import org.elasticsearch.core.Releasables; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.FoldContext; @@ -29,6 +34,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.VaragsTestCaseBuilder; import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.SpatialRelatesFunctionTestCase; import org.elasticsearch.xpack.esql.planner.Layout; +import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter; import java.time.ZonedDateTime; @@ -40,6 +46,9 @@ import static org.elasticsearch.compute.data.BlockUtils.toJavaObject; import static org.elasticsearch.xpack.esql.EsqlTestUtils.randomLiteral; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; public class CoalesceTests extends AbstractScalarFunctionTestCase { public CoalesceTests(@Name("TestCase") Supplier testCaseSupplier) { @@ -49,7 +58,7 @@ public CoalesceTests(@Name("TestCase") Supplier testC @ParametersFactory public static Iterable parameters() { List noNullsSuppliers = new ArrayList<>(); - VaragsTestCaseBuilder builder = new VaragsTestCaseBuilder(type -> "Coalesce"); + VaragsTestCaseBuilder builder = new VaragsTestCaseBuilder(type -> "Coalesce" + type + "Eager"); builder.expectString(strings -> strings.filter(v -> v != null).findFirst()); builder.expectLong(longs -> longs.filter(v -> v != null).findFirst()); builder.expectInt(ints -> ints.filter(v -> v != null).findFirst()); @@ -64,7 +73,7 @@ public static Iterable parameters() { new TestCaseSupplier.TypedData(first, DataType.IP, "first"), new TestCaseSupplier.TypedData(second, DataType.IP, "second") ), - "CoalesceEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]", + "CoalesceBytesRefEagerEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]", DataType.IP, equalTo(first == null ? second : first) ); @@ -79,7 +88,7 @@ public static Iterable parameters() { new TestCaseSupplier.TypedData(first, DataType.VERSION, "first"), new TestCaseSupplier.TypedData(second, DataType.VERSION, "second") ), - "CoalesceEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]", + "CoalesceBytesRefEagerEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]", DataType.VERSION, equalTo(first == null ? second : first) ); @@ -92,7 +101,7 @@ public static Iterable parameters() { new TestCaseSupplier.TypedData(firstDate, DataType.DATETIME, "first"), new TestCaseSupplier.TypedData(secondDate, DataType.DATETIME, "second") ), - "CoalesceEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]", + "CoalesceLongEagerEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]", DataType.DATETIME, equalTo(firstDate == null ? secondDate : firstDate) ); @@ -105,7 +114,7 @@ public static Iterable parameters() { new TestCaseSupplier.TypedData(firstDate, DataType.DATE_NANOS, "first"), new TestCaseSupplier.TypedData(secondDate, DataType.DATE_NANOS, "second") ), - "CoalesceEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]", + "CoalesceLongEagerEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]", DataType.DATE_NANOS, equalTo(firstDate == null ? secondDate : firstDate) ); @@ -129,6 +138,20 @@ public static Iterable parameters() { suppliers.add(new TestCaseSupplier(nullCaseName(s, nullUpTo, true), types, () -> nullCase(s.get(), finalNullUpTo, true))); } } + suppliers.add( + new TestCaseSupplier( + List.of(DataType.NULL, DataType.NULL), + () -> new TestCaseSupplier.TestCase( + List.of( + new TestCaseSupplier.TypedData(null, DataType.NULL, "first"), + new TestCaseSupplier.TypedData(null, DataType.NULL, "second") + ), + "ConstantNull", + DataType.NULL, + nullValue() + ) + ) + ); return parameterSuppliersFromTypedData(suppliers); } @@ -167,7 +190,7 @@ protected static void addSpatialCombinations(List suppliers) { TestCaseSupplier.testCaseSupplier( leftDataSupplier, rightDataSupplier, - (l, r) -> equalTo("CoalesceEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]"), + (l, r) -> equalTo("CoalesceBytesRefEagerEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]"), dataType, (l, r) -> l ) @@ -235,6 +258,69 @@ public void testCoalesceNotNullable() { sub.add(between(0, sub.size()), randomLiteral(sub.get(sub.size() - 1).dataType())); Coalesce exp = build(Source.EMPTY, sub); // Known not to be nullable because it contains a non-null literal - assertThat(exp.nullable(), equalTo(Nullability.FALSE)); + if (testCase.expectedType() == DataType.NULL) { + assertThat(exp.nullable(), equalTo(Nullability.UNKNOWN)); + } else { + assertThat(exp.nullable(), equalTo(Nullability.FALSE)); + } + } + + /** + * Inserts random non-null garbage around the expected data and runs COALESCE. + *

+ * This is important for catching the case where your value is null, but the rest of the block + * isn't null. An off-by-one error in the evaluators can break this in a way that the standard + * tests weren't catching and this does. + *

+ */ + public void testEvaluateWithGarbage() { + DriverContext context = driverContext(); + Expression expression = randomBoolean() ? buildDeepCopyOfFieldExpression(testCase) : buildFieldExpression(testCase); + int positions = between(2, 1024); + List data = testCase.getData(); + Page onePositionPage = row(testCase.getDataValues()); + Block[] blocks = new Block[Math.toIntExact(data.stream().filter(d -> d.isForceLiteral() == false).count())]; + int realPosition = between(0, positions - 1); + try { + int blocksIndex = 0; + for (TestCaseSupplier.TypedData d : data) { + blocks[blocksIndex] = blockWithRandomGarbage( + context.blockFactory(), + d.type(), + onePositionPage.getBlock(blocksIndex), + positions, + realPosition + ); + blocksIndex++; + } + try ( + EvalOperator.ExpressionEvaluator eval = evaluator(expression).get(context); + Block block = eval.eval(new Page(positions, blocks)) + ) { + assertThat(block.getPositionCount(), is(positions)); + assertThat(toJavaObjectUnsignedLongAware(block, realPosition), testCase.getMatcher()); + assertThat("evaluates to tracked block", block.blockFactory(), sameInstance(context.blockFactory())); + } + } finally { + Releasables.close(onePositionPage::releaseBlocks, Releasables.wrap(blocks)); + } + } + + private Block blockWithRandomGarbage( + BlockFactory blockFactory, + DataType type, + Block singlePositionBlock, + int totalPositions, + int insertLocation + ) { + try (Block.Builder builder = PlannerUtils.toElementType(type).newBlockBuilder(totalPositions, blockFactory)) { + for (int p = 0; p < totalPositions; p++) { + Block copyFrom = p == insertLocation + ? singlePositionBlock + : BlockUtils.constantBlock(TestBlockFactory.getNonBreakingInstance(), randomLiteral(type).value(), 1); + builder.copyFrom(copyFrom, 0, 1); + } + return builder.build(); + } } }