Skip to content

Commit a437da3

Browse files
committed
Create a ScoreOperator that can be planned via the LocalExecutionPlanner when an Filter is being planned
1 parent e4eb86d commit a437da3

File tree

3 files changed

+39
-3
lines changed

3 files changed

+39
-3
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ScoreOperator.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@
77

88
package org.elasticsearch.compute.operator;
99

10+
import org.elasticsearch.compute.data.Block;
1011
import org.elasticsearch.compute.data.BlockFactory;
12+
import org.elasticsearch.compute.data.DocVector;
1113
import org.elasticsearch.compute.data.DoubleBlock;
14+
import org.elasticsearch.compute.data.DoubleVector;
1215
import org.elasticsearch.compute.data.Page;
1316
import org.elasticsearch.core.Releasable;
1417
import org.elasticsearch.core.Releasables;
@@ -42,8 +45,25 @@ public ScoreOperator(BlockFactory blockFactory, ExpressionScorer scorer) {
4245

4346
@Override
4447
protected Page process(Page page) {
45-
DoubleBlock block = scorer.score(page);
46-
return page.appendBlock(block);
48+
assert page.getBlockCount() >= 2 : "Expected at least 2 blocks, got " + page.getBlockCount();
49+
assert page.getBlock(0).asVector() instanceof DocVector : "Expected a DocVector, got " + page.getBlock(0).asVector();
50+
assert page.getBlock(1).asVector() instanceof DoubleVector : "Expected a DoubleVector, got " + page.getBlock(1).asVector();
51+
52+
Block[] blocks = new Block[page.getBlockCount()];
53+
blocks[0] = page.getBlock(0);
54+
try (DoubleBlock evalScores = scorer.score(page); DoubleBlock existingScores = page.getBlock(1)) {
55+
// TODO Optimize for constant zero scores?
56+
int rowCount = page.getPositionCount();
57+
DoubleVector.Builder builder = blockFactory.newDoubleVectorFixedBuilder(rowCount);
58+
for (int i = 0; i < rowCount; i++) {
59+
builder.appendDouble(existingScores.getDouble(i) + evalScores.getDouble(i));
60+
}
61+
blocks[1] = builder.build().asBlock();
62+
}
63+
for (int i = 2; i < blocks.length; i++) {
64+
blocks[i] = page.getBlock(i);
65+
}
66+
return new Page(blocks);
4767
}
4868

4969
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.compute.operator.Operator.OperatorFactory;
3131
import org.elasticsearch.compute.operator.OutputOperator.OutputOperatorFactory;
3232
import org.elasticsearch.compute.operator.RowInTableLookupOperator;
33+
import org.elasticsearch.compute.operator.ScoreOperator;
3334
import org.elasticsearch.compute.operator.ShowOperator;
3435
import org.elasticsearch.compute.operator.SinkOperator;
3536
import org.elasticsearch.compute.operator.SinkOperator.SinkOperatorFactory;
@@ -94,6 +95,7 @@
9495
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
9596
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders.ShardContext;
9697
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
98+
import org.elasticsearch.xpack.esql.score.ScoreMapper;
9799
import org.elasticsearch.xpack.esql.session.Configuration;
98100

99101
import java.util.ArrayList;
@@ -676,10 +678,18 @@ private PhysicalOperation planProject(ProjectExec project, LocalExecutionPlanner
676678
private PhysicalOperation planFilter(FilterExec filter, LocalExecutionPlannerContext context) {
677679
PhysicalOperation source = plan(filter.child(), context);
678680
// TODO: should this be extracted into a separate eval block?
679-
return source.with(
681+
PhysicalOperation filterOperation = source.with(
680682
new FilterOperatorFactory(EvalMapper.toEvaluator(context.foldCtx(), filter.condition(), source.layout, shardContexts)),
681683
source.layout
682684
);
685+
if (PlannerUtils.usesScoring(filter)) {
686+
// Add scorer operator to blend the filter expression scores on the overall scores
687+
filterOperation = filterOperation.with(
688+
new ScoreOperator.ScoreOperatorFactory(ScoreMapper.toScorer(filter.condition(), shardContexts)),
689+
filterOperation.layout
690+
);
691+
}
692+
return filterOperation;
683693
}
684694

685695
private PhysicalOperation planLimit(LimitExec limit, LocalExecutionPlannerContext context) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
2222
import org.elasticsearch.xpack.esql.core.expression.Expression;
2323
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
24+
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
2425
import org.elasticsearch.xpack.esql.core.tree.Node;
2526
import org.elasticsearch.xpack.esql.core.tree.Source;
2627
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -31,6 +32,7 @@
3132
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer;
3233
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
3334
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer;
35+
import org.elasticsearch.xpack.esql.plan.QueryPlan;
3436
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
3537
import org.elasticsearch.xpack.esql.plan.logical.Filter;
3638
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
@@ -307,4 +309,8 @@ public static ElementType toElementType(DataType dataType, MappedFieldType.Field
307309
new NoopCircuitBreaker("noop-esql-breaker"),
308310
BigArrays.NON_RECYCLING_INSTANCE
309311
);
312+
313+
public static boolean usesScoring(QueryPlan<?> plan) {
314+
return plan.output().stream().anyMatch(attr -> attr instanceof MetadataAttribute ma && ma.name().equals(MetadataAttribute.SCORE));
315+
}
310316
}

0 commit comments

Comments
 (0)