diff --git a/server/src/main/resources/transport/definitions/referable/esql_fuse_linear_operator_status.csv b/server/src/main/resources/transport/definitions/referable/esql_fuse_linear_operator_status.csv new file mode 100644 index 0000000000000..833feabb55a10 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/esql_fuse_linear_operator_status.csv @@ -0,0 +1 @@ +9171000 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index 2c15e0254cbe8..f69c2d7fac9a6 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -transform_check_for_dangling_tasks,9170000 +esql_fuse_linear_operator_status,9171000 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/fuse/LinearScoreEvalOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/fuse/LinearScoreEvalOperator.java index 08a6d47e64f91..300668163c39e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/fuse/LinearScoreEvalOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/fuse/LinearScoreEvalOperator.java @@ -8,6 +8,10 @@ package org.elasticsearch.compute.operator.fuse; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.DoubleVector; @@ -15,7 +19,11 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.XContentBuilder; +import java.io.IOException; import java.util.ArrayDeque; import java.util.Collection; import java.util.Deque; @@ -60,6 +68,12 @@ public String describe() { private final Deque outputPages; private boolean finished; + private long emitNanos; + private int pagesReceived = 0; + private int pagesProcessed = 0; + private long rowsReceived = 0; + private long rowsEmitted = 0; + public LinearScoreEvalOperator(int discriminatorPosition, int scorePosition, LinearConfig config) { this.scorePosition = scorePosition; this.discriminatorPosition = discriminatorPosition; @@ -79,6 +93,8 @@ public boolean needsInput() { @Override public void addInput(Page page) { inputPages.add(page); + pagesReceived++; + rowsReceived += page.getPositionCount(); } @Override @@ -90,35 +106,58 @@ public void finish() { } private void createOutputPages() { + final var emitStart = System.nanoTime(); normalizer.preprocess(inputPages, scorePosition, discriminatorPosition); + try { + while (inputPages.isEmpty() == false) { + Page inputPage = inputPages.peek(); + processInputPage(inputPage); + inputPages.removeFirst(); + pagesProcessed += 1; + } + } finally { + emitNanos = System.nanoTime() - emitStart; + Releasables.close(inputPages); + } + } - while (inputPages.isEmpty() == false) { - Page inputPage = inputPages.peek(); + private void processInputPage(Page inputPage) { + BytesRefBlock discriminatorBlock = inputPage.getBlock(discriminatorPosition); + DoubleVectorBlock initialScoreBlock = inputPage.getBlock(scorePosition); - BytesRefBlock discriminatorBlock = inputPage.getBlock(discriminatorPosition); - DoubleVectorBlock initialScoreBlock = inputPage.getBlock(scorePosition); + Page newPage = null; + Block scoreBlock = null; + DoubleVector.Builder scores = null; - DoubleVector.Builder scores = discriminatorBlock.blockFactory().newDoubleVectorBuilder(discriminatorBlock.getPositionCount()); + try { + scores = discriminatorBlock.blockFactory().newDoubleVectorBuilder(discriminatorBlock.getPositionCount()); for (int i = 0; i < inputPage.getPositionCount(); i++) { String discriminator = discriminatorBlock.getBytesRef(i, new BytesRef()).utf8ToString(); var weight = config.weights().get(discriminator) == null ? 1.0 : config.weights().get(discriminator); - Double score = initialScoreBlock.getDouble(i); + double score = initialScoreBlock.getDouble(i); scores.appendDouble(weight * normalizer.normalize(score, discriminator)); } - Block scoreBlock = scores.build().asBlock(); - inputPage = inputPage.appendBlock(scoreBlock); - int[] projections = new int[inputPage.getBlockCount() - 1]; + scoreBlock = scores.build().asBlock(); + newPage = inputPage.appendBlock(scoreBlock); + + int[] projections = new int[newPage.getBlockCount() - 1]; - for (int i = 0; i < inputPage.getBlockCount() - 1; i++) { - projections[i] = i == scorePosition ? inputPage.getBlockCount() - 1 : i; + for (int i = 0; i < newPage.getBlockCount() - 1; i++) { + projections[i] = i == scorePosition ? newPage.getBlockCount() - 1 : i; + } + + outputPages.add(newPage.projectBlocks(projections)); + } finally { + if (newPage != null) { + newPage.releaseBlocks(); + } + if (scoreBlock == null && scores != null) { + Releasables.close(scores); } - inputPages.removeFirst(); - outputPages.add(inputPage.projectBlocks(projections)); - inputPage.releaseBlocks(); } } @@ -132,7 +171,11 @@ public Page getOutput() { if (finished == false || outputPages.isEmpty()) { return null; } - return outputPages.removeFirst(); + + Page page = outputPages.removeFirst(); + rowsEmitted += page.getPositionCount(); + + return page; } @Override @@ -156,6 +199,69 @@ public String toString() { + "]"; } + @Override + public Operator.Status status() { + return new Status(emitNanos, pagesReceived, pagesProcessed, rowsReceived, rowsEmitted); + } + + public record Status(long emitNanos, int pagesReceived, int pagesProcessed, long rowsReceived, long rowsEmitted) + implements + Operator.Status { + + public static final TransportVersion ESQL_FUSE_LINEAR_OPERATOR_STATUS = TransportVersion.fromName( + "esql_fuse_linear_operator_status" + ); + + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Operator.Status.class, + "linearScoreEval", + Status::new + ); + + Status(StreamInput streamInput) throws IOException { + this(streamInput.readLong(), streamInput.readInt(), streamInput.readInt(), streamInput.readLong(), streamInput.readLong()); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + public boolean supportsVersion(TransportVersion version) { + return version.supports(ESQL_FUSE_LINEAR_OPERATOR_STATUS); + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + assert false : "must not be called when overriding supportsVersion"; + throw new UnsupportedOperationException("must not be called when overriding supportsVersion"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(emitNanos); + out.writeInt(pagesReceived); + out.writeInt(pagesProcessed); + out.writeLong(rowsReceived); + out.writeLong(rowsEmitted); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("emit_nanos", emitNanos); + if (builder.humanReadable()) { + builder.field("emit_time", TimeValue.timeValueNanos(emitNanos)); + } + builder.field("pages_received", pagesReceived); + builder.field("pages_processed", pagesProcessed); + builder.field("rows_received", rowsReceived); + builder.field("rows_emitted", rowsEmitted); + return builder.endObject(); + } + } + private Normalizer createNormalizer(LinearConfig.Normalizer normalizer) { return switch (normalizer) { case NONE -> new NoneNormalizer(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/fuse/RrfScoreEvalOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/fuse/RrfScoreEvalOperator.java index 4cd88be59a462..519f04bcf60b7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/fuse/RrfScoreEvalOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/fuse/RrfScoreEvalOperator.java @@ -15,6 +15,7 @@ import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.core.Releasables; import java.util.HashMap; @@ -73,18 +74,30 @@ protected Page process(Page page) { scores.appendDouble(1.0 / (config.rankConstant() + rank) * weight); } - Block scoreBlock = scores.build().asBlock(); - page = page.appendBlock(scoreBlock); + Page newPage = null; + Block scoreBlock = null; - int[] projections = new int[page.getBlockCount() - 1]; - - for (int i = 0; i < page.getBlockCount() - 1; i++) { - projections[i] = i == scorePosition ? page.getBlockCount() - 1 : i; - } try { - return page.projectBlocks(projections); + scoreBlock = scores.build().asBlock(); + newPage = page.appendBlock(scoreBlock); + + int[] projections = new int[newPage.getBlockCount() - 1]; + + for (int i = 0; i < newPage.getBlockCount() - 1; i++) { + projections[i] = i == scorePosition ? newPage.getBlockCount() - 1 : i; + } + return newPage.projectBlocks(projections); } finally { - page.releaseBlocks(); + if (newPage != null) { + newPage.releaseBlocks(); + } else { + // we never got to a point where the new page was constructed, so we need to release the initial one + page.releaseBlocks(); + } + if (scoreBlock == null) { + // we never built scoreBlock, so we need to release the scores builder + Releasables.close(scores); + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/fuse/FuseOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/fuse/FuseOperatorTestCase.java new file mode 100644 index 0000000000000..de42c8106212c --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/fuse/FuseOperatorTestCase.java @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.fuse; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.TriConsumer; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.DoubleVectorBlock; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.SourceOperator; +import org.elasticsearch.compute.test.AbstractBlockSourceOperator; +import org.elasticsearch.compute.test.OperatorTestCase; +import org.elasticsearch.core.Releasables; +import org.junit.Before; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public abstract class FuseOperatorTestCase extends OperatorTestCase { + protected int blocksCount; + protected int discriminatorPosition; + protected int scorePosition; + protected int discriminatorCount; + + @Before + public void initialize() { + discriminatorPosition = randomIntBetween(1, 20); + scorePosition = randomIntBetween(discriminatorPosition + 1, 50); + blocksCount = randomIntBetween(scorePosition + 1, 100); + discriminatorCount = randomIntBetween(1, 20); + } + + protected void assertOutput(List input, List results, TriConsumer assertScore) { + assertEquals(input.size(), results.size()); + + for (int i = 0; i < results.size(); i++) { + Page resultPage = results.get(i); + Page initialPage = input.get(i); + + assertEquals(initialPage.getPositionCount(), resultPage.getPositionCount()); + assertEquals(resultPage.getBlockCount(), blocksCount); + + BytesRefBlock discriminatorBlock = resultPage.getBlock(discriminatorPosition); + DoubleVectorBlock actualScoreBlock = resultPage.getBlock(scorePosition); + DoubleVectorBlock initialScoreBlock = initialPage.getBlock(scorePosition); + + for (int j = 0; j < resultPage.getPositionCount(); j++) { + String discriminator = discriminatorBlock.getBytesRef(j, new BytesRef()).utf8ToString(); + double actualScore = actualScoreBlock.getDouble(j); + double initialScore = initialScoreBlock.getDouble(j); + assertScore.apply(discriminator, actualScore, initialScore); + } + } + } + + @Override + protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + return new AbstractBlockSourceOperator(blockFactory, 8 * 1024) { + @Override + protected int remaining() { + return size - currentPosition; + } + + @Override + protected Page createPage(int positionOffset, int length) { + length = Integer.min(length, remaining()); + Block[] blocks = new Block[blocksCount]; + + try { + for (int b = 0; b < blocksCount; b++) { + if (b == scorePosition) { + try (var builder = blockFactory.newDoubleBlockBuilder(length)) { + for (int i = 0; i < length; i++) { + builder.appendDouble(randomDouble()); + } + blocks[b] = builder.build(); + } + } else { + try (var builder = blockFactory.newBytesRefBlockBuilder(length)) { + for (int i = 0; i < length; i++) { + String stringInput = b == discriminatorPosition + ? "fork" + randomIntBetween(0, discriminatorCount) + : randomAlphaOfLength(10); + + builder.appendBytesRef(new BytesRef(stringInput)); + } + blocks[b] = builder.build(); + } + } + } + } catch (Exception e) { + Releasables.closeExpectNoException(blocks); + throw e; + } + + currentPosition += length; + return new Page(blocks); + } + }; + } + + protected Map randomWeights() { + Map weights = new HashMap<>(); + for (int i = 0; i < discriminatorCount; i++) { + if (randomBoolean()) { + weights.put("fork" + i, randomDouble()); + } + } + return weights; + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/fuse/LinearScoreEvalOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/fuse/LinearScoreEvalOperatorTests.java new file mode 100644 index 0000000000000..4fdea6353332b --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/fuse/LinearScoreEvalOperatorTests.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.fuse; + +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.Operator; +import org.hamcrest.Matcher; +import org.junit.Before; + +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + +public class LinearScoreEvalOperatorTests extends FuseOperatorTestCase { + private LinearConfig config; + + @Before + public void setup() { + config = randomConfig(); + } + + @Override + protected void assertSimpleOutput(List input, List results) { + assertOutput(input, results, (discriminator, actualScore, initialScore) -> { + var weight = config.weights().getOrDefault(discriminator, 1.0); + assertEquals(actualScore, initialScore * weight, 0.00); + }); + } + + @Override + protected Operator.OperatorFactory simple(SimpleOptions options) { + return new LinearScoreEvalOperator.Factory(discriminatorPosition, scorePosition, config); + } + + @Override + protected Matcher expectedDescriptionOfSimple() { + return equalTo( + "LinearScoreEvalOperator[discriminatorPosition=" + + discriminatorPosition + + ", scorePosition=" + + scorePosition + + ", config=" + + config + + "]" + ); + } + + @Override + protected Matcher expectedToStringOfSimple() { + return equalTo( + "LinearScoreEvalOperator[discriminatorPosition=" + + discriminatorPosition + + ", scorePosition=" + + scorePosition + + ", config=" + + config + + "]" + ); + } + + private LinearConfig randomConfig() { + return new LinearConfig(LinearConfig.Normalizer.NONE, randomWeights()); + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/fuse/RrfScoreEvalOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/fuse/RrfScoreEvalOperatorTests.java new file mode 100644 index 0000000000000..7f0347897a814 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/fuse/RrfScoreEvalOperatorTests.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.fuse; + +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.Operator; +import org.hamcrest.Matcher; +import org.junit.Before; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class RrfScoreEvalOperatorTests extends FuseOperatorTestCase { + protected RrfConfig config; + + @Before + public void setup() { + config = randomConfig(); + } + + @Override + protected void assertSimpleOutput(List input, List results) { + Map counts = new HashMap<>(); + + assertOutput(input, results, (discriminator, actualScore, initialScore) -> { + var rank = counts.getOrDefault(discriminator, 1); + var weight = config.weights().getOrDefault(discriminator, 1.0); + assertEquals(actualScore, 1.0 / (config.rankConstant() + rank) * weight, 0.0d); + counts.put(discriminator, rank + 1); + }); + } + + @Override + protected Operator.OperatorFactory simple(SimpleOptions options) { + return new RrfScoreEvalOperator.Factory(discriminatorPosition, scorePosition, config); + } + + @Override + protected Matcher expectedDescriptionOfSimple() { + return equalTo( + "RrfScoreEvalOperator[discriminatorPosition=" + + discriminatorPosition + + ", scorePosition=" + + scorePosition + + ", rrfConfig=" + + config + + "]" + ); + } + + @Override + protected Matcher expectedToStringOfSimple() { + return equalTo("RrfScoreEvalOperator"); + } + + private RrfConfig randomConfig() { + return new RrfConfig((double) randomIntBetween(1, 100), randomWeights()); + } +} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fuse.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fuse.csv-spec index 7604ba1b7dc6c..304f9fa8009b2 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fuse.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fuse.csv-spec @@ -4,7 +4,7 @@ simpleFuse required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: match_operator_colon FROM employees METADATA _id, _index, _score @@ -23,7 +23,7 @@ _score:double | _fork:keyword | emp_no:integer fuseWithMatchAndScore required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: match_operator_colon FROM books METADATA _id, _index, _score @@ -46,7 +46,7 @@ _score:double | _fork:keyword | _id:keyword fuseWithDisjunctionAndPostFilter required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: match_operator_colon FROM books METADATA _id, _index, _score @@ -69,7 +69,7 @@ _score:double | _fork:keyword | _id:keyword fuseWithStats required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: match_operator_colon FROM books METADATA _id, _index, _score @@ -89,7 +89,7 @@ count_fork:long | _fork:keyword fuseWithMultipleForkBranches required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: match_operator_colon FROM books METADATA _id, _index, _score @@ -116,7 +116,7 @@ _score:double | author:keyword | title:keyword | _fork fuseWithSemanticSearch required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -137,7 +137,7 @@ _fork:keyword | _score:double | _id:keyword | semantic_text_field:keyword fuseWithSimpleRrf required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -161,7 +161,7 @@ _score:double | _fork:keyword | _id:keyword fuseWithRrfAndRankConstant required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -185,7 +185,7 @@ _score:double | _fork:keyword | _id:keyword fuseWithRrfAndWeights required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -209,7 +209,7 @@ _score:double | _fork:keyword | _id:keyword fuseWithRrfRankConstantAndWeights required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -233,7 +233,7 @@ _score:double | _fork:keyword | _id:keyword fuseWithRrfAndScoreColumn required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -257,7 +257,7 @@ my_score:double | _fork:keyword | _id:keyword fuseWithRrfAndDiscriminatorColumn required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -283,7 +283,7 @@ _score:double | new_fork:keyword | _id:keyword fuseWithRrfAndKeyColumns required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -309,7 +309,7 @@ _score:double | _fork:keyword | new_id:keyword fuseWithRrfAllOptionsScoreKeyAndGroupColumns required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -337,7 +337,7 @@ new_score:double | new_fork:keyword | new_id:keyword fuseWithSimpleLinear required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -362,7 +362,7 @@ _score:double | _fork:keyword | _id:keyword fuseWithLinearAndL2Norm required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -387,7 +387,7 @@ _score:double | _fork:keyword | _id:keyword fuseWithLinearAndMinMax required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -411,7 +411,7 @@ _score:double | _fork:keyword | _id:keyword fuseWithLinearAndWeights required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -435,7 +435,7 @@ _score:double | _fork:keyword | _id:keyword fuseWithLinearAndPartialWeights required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -459,7 +459,7 @@ _score:double | _fork:keyword | _id:keyword fuseWithLinearWeightsAndMinMax required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -484,7 +484,7 @@ _score:double | _fork:keyword | _id:keyword fuseWithLinearAndScoreColumn required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -509,7 +509,7 @@ my_score:double | _fork:keyword | _id:keyword fuseWithLinearAndDiscriminatorColumn required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -535,7 +535,7 @@ _score:double | new_fork:keyword | _id:keyword fuseWithLinearAndKeyColumns required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -560,7 +560,7 @@ _score:double | _fork:keyword | new_id:keyword fuseWithLinearAllOptionsScoreGroupAndKeyColumns required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -589,7 +589,7 @@ new_score:double | new_fork:keyword | new_id:keyword fuseWithRowAndRRF -required_capability: fuse_v4 +required_capability: fuse_v5 ROW my_score = [0, 1, 2, 3, 4]::double, _index = "my_index", _fork = "foo" | MV_EXPAND my_score @@ -612,7 +612,7 @@ id_4.0 | 0.01538 fuseWithRowLinearAndWeights -required_capability: fuse_v4 +required_capability: fuse_v5 ROW my_score = [0, 1, 2, 3, 4]::double, _index = "my_index", _fork = "foo" | MV_EXPAND my_score @@ -634,7 +634,7 @@ id_0.0 | 0.0 fuseWithRowLinearAndMinMax -required_capability: fuse_v4 +required_capability: fuse_v5 ROW my_score = [0, 1, 2, 3, 4]::double, _index = "my_index", _fork = "foo" | MV_EXPAND my_score @@ -657,7 +657,7 @@ id_0.0 | 0.0 fuseWithRowLinearAndL2Norm -required_capability: fuse_v4 +required_capability: fuse_v5 ROW my_score = [0, 1, 2, 3, 4]::double, _index = "my_index", _fork = "foo" | MV_EXPAND my_score diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rerank.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rerank.csv-spec index 13c365286e53f..145095a1fe4f0 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rerank.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rerank.csv-spec @@ -204,7 +204,7 @@ book_no:keyword | title:text | author reranker after FUSE required_capability: fork_v9 -required_capability: fuse_v4 +required_capability: fuse_v5 required_capability: match_operator_colon required_capability: rerank diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/FuseIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/FuseIT.java index af30c0f5daeca..953678f5a2cf9 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/FuseIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/FuseIT.java @@ -28,7 +28,7 @@ protected Collection> nodePlugins() { @Before public void setupIndex() { - assumeTrue("requires FUSE capability", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("requires FUSE capability", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); createAndPopulateIndex(); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/FuseWithInvalidLicenseIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/FuseWithInvalidLicenseIT.java index 643ef4cbe946b..8d1157318e4ef 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/FuseWithInvalidLicenseIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/FuseWithInvalidLicenseIT.java @@ -31,7 +31,7 @@ protected Collection> nodePlugins() { @Before public void setupIndex() { - assumeTrue("requires FUSE capability", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("requires FUSE capability", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); var indexName = "test"; var client = client().admin().indices(); var CreateRequest = client.prepareCreate(indexName) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 68713c7a77701..35b0e1a5b839a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1368,7 +1368,7 @@ public enum Cap { /** * FUSE command */ - FUSE_V4(Build.current().isSnapshot()), + FUSE_V5(Build.current().isSnapshot()), /** * Support improved behavior for LIKE operator when used with index fields. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index c026718bf178a..54739a6a7d63f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -35,6 +35,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator; +import org.elasticsearch.compute.operator.fuse.LinearScoreEvalOperator; import org.elasticsearch.compute.operator.topn.TopNOperatorStatus; import org.elasticsearch.core.TimeValue; import org.elasticsearch.features.NodeFeature; @@ -344,6 +345,8 @@ public List getNamedWriteables() { entries.add(EnrichLookupOperator.Status.ENTRY); entries.add(LookupFromIndexOperator.Status.ENTRY); entries.add(SampleOperator.Status.ENTRY); + entries.add(LinearScoreEvalOperator.Status.ENTRY); + entries.add(ExpressionQueryBuilder.ENTRY); entries.add(PlanStreamWrapperQueryBuilder.ENTRY); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index 896f9d7036b1f..f312576baedde 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -3625,7 +3625,7 @@ public void testForkError() { } public void testValidFuse() { - assumeTrue("requires FUSE capability", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("requires FUSE capability", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); LogicalPlan plan = analyze(""" from test metadata _id, _index, _score @@ -3649,7 +3649,7 @@ public void testValidFuse() { } public void testFuseError() { - assumeTrue("requires FUSE capability", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("requires FUSE capability", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); var e = expectThrows(VerificationException.class, () -> analyze(""" from test diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java index 70a4b1a4bd403..c416248eb4506 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java @@ -2569,7 +2569,7 @@ public void testInvalidTBucketCalls() { } public void testFuse() { - assumeTrue("FUSE requires corresponding capability", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("FUSE requires corresponding capability", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); String queryPrefix = "from test metadata _score, _index, _id | fork (where true) (where true)"; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java index 58bac20ce90cf..f29df6d9d4252 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java @@ -4218,7 +4218,7 @@ static Alias alias(String name, Expression value) { } public void testValidFuse() { - assumeTrue("FUSE requires corresponding capability", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("FUSE requires corresponding capability", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); LogicalPlan plan = statement(""" FROM foo* METADATA _id, _index, _score @@ -4318,7 +4318,7 @@ public void testValidFuse() { } public void testInvalidFuse() { - assumeTrue("FUSE requires corresponding capability", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("FUSE requires corresponding capability", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); String queryPrefix = "from test metadata _score, _index, _id | fork (where true) (where true)"; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java index b7dcad059824c..4185a2da4e84d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java @@ -2225,7 +2225,7 @@ public void testForkRef4() { } public void testRerankerAfterFuse() { - assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); assertTrue("FORK required", EsqlCapabilities.Cap.FORK_V9.isEnabled()); assertFieldNames(""" FROM books METADATA _id, _index, _score @@ -2240,7 +2240,7 @@ public void testRerankerAfterFuse() { } public void testSimpleFuse() { - assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); assertTrue("FORK required", EsqlCapabilities.Cap.FORK_V9.isEnabled()); assertFieldNames(""" FROM employees METADATA _id, _index, _score @@ -2253,7 +2253,7 @@ public void testSimpleFuse() { } public void testFuseWithMatchAndScore() { - assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); assertTrue("FORK required", EsqlCapabilities.Cap.FORK_V9.isEnabled()); assertFieldNames(""" FROM books METADATA _id, _index, _score @@ -2267,7 +2267,7 @@ public void testFuseWithMatchAndScore() { } public void testFuseWithDisjunctionAndPostFilter() { - assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); assertTrue("FORK required", EsqlCapabilities.Cap.FORK_V9.isEnabled()); assertFieldNames(""" FROM books METADATA _id, _index, _score @@ -2282,7 +2282,7 @@ public void testFuseWithDisjunctionAndPostFilter() { } public void testFuseWithStats() { - assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); assertTrue("FORK required", EsqlCapabilities.Cap.FORK_V9.isEnabled()); assertFieldNames(""" FROM books METADATA _id, _index, _score @@ -2295,7 +2295,7 @@ public void testFuseWithStats() { } public void testFuseWithMultipleForkBranches() { - assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); assertTrue("FORK required", EsqlCapabilities.Cap.FORK_V9.isEnabled()); assertFieldNames(""" FROM books METADATA _id, _index, _score @@ -2312,7 +2312,7 @@ public void testFuseWithMultipleForkBranches() { } public void testFuseWithSemanticSearch() { - assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V4.isEnabled()); + assumeTrue("FUSE required", EsqlCapabilities.Cap.FUSE_V5.isEnabled()); assertTrue("FORK required", EsqlCapabilities.Cap.FORK_V9.isEnabled()); assertFieldNames(""" FROM semantic_text METADATA _id, _score, _index