Skip to content

Commit 03c6110

Browse files
committed
Physical plan planning.
1 parent 7322c0c commit 03c6110

File tree

4 files changed

+46
-4
lines changed

4 files changed

+46
-4
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/embedding/DenseEmbeddingOperator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ public DenseEmbeddingOperator(
4343
DriverContext driverContext,
4444
InferenceRunner inferenceRunner,
4545
ThreadPool threadPool,
46-
int dimensions,
4746
String inferenceId,
47+
int dimensions,
4848
ExpressionEvaluator inputEvaluator
4949
) {
5050
super(driverContext, inferenceRunner, BulkInferenceExecutionConfig.DEFAULT, threadPool, inferenceId);
@@ -100,8 +100,8 @@ protected DenseEmbeddingOperatorOutputBuilder outputBuilder(Page input) {
100100
*/
101101
public record Factory(
102102
InferenceRunner inferenceRunner,
103-
int dimensions,
104103
String inferenceId,
104+
int dimensions,
105105
ExpressionEvaluator.Factory inputEvaluatorFactory
106106
) implements OperatorFactory {
107107

@@ -116,8 +116,8 @@ public Operator get(DriverContext driverContext) {
116116
driverContext,
117117
inferenceRunner,
118118
inferenceRunner.threadPool(),
119-
dimensions,
120119
inferenceId,
120+
dimensions,
121121
inputEvaluatorFactory().get(driverContext)
122122
);
123123
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.elasticsearch.xpack.esql.inference.InferenceRunner;
8989
import org.elasticsearch.xpack.esql.inference.XContentRowEncoder;
9090
import org.elasticsearch.xpack.esql.inference.completion.CompletionOperator;
91+
import org.elasticsearch.xpack.esql.inference.embedding.DenseEmbeddingOperator;
9192
import org.elasticsearch.xpack.esql.inference.rerank.RerankOperator;
9293
import org.elasticsearch.xpack.esql.plan.logical.Fork;
9394
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
@@ -119,6 +120,7 @@
119120
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
120121
import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec;
121122
import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec;
123+
import org.elasticsearch.xpack.esql.plan.physical.inference.embedding.DenseVectorEmbeddingExec;
122124
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders.ShardContext;
123125
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
124126
import org.elasticsearch.xpack.esql.score.ScoreMapper;
@@ -266,6 +268,8 @@ private PhysicalOperation plan(PhysicalPlan node, LocalExecutionPlannerContext c
266268
return planChangePoint(changePoint, context);
267269
} else if (node instanceof CompletionExec completion) {
268270
return planCompletion(completion, context);
271+
} else if (node instanceof DenseVectorEmbeddingExec embedding) {
272+
return planDenseVectorEmbedding(embedding, context);
269273
} else if (node instanceof SampleExec Sample) {
270274
return planSample(Sample, context);
271275
}
@@ -319,6 +323,31 @@ private PhysicalOperation planCompletion(CompletionExec completion, LocalExecuti
319323
return source.with(new CompletionOperator.Factory(inferenceRunner, inferenceId, promptEvaluatorFactory), outputLayout);
320324
}
321325

326+
private PhysicalOperation planDenseVectorEmbedding(DenseVectorEmbeddingExec embedding, LocalExecutionPlannerContext context) {
327+
PhysicalOperation source = plan(embedding.child(), context);
328+
String inferenceId = BytesRefs.toString(embedding.inferenceId().fold(context.foldCtx()));
329+
330+
int dimensions;
331+
if (embedding.dimensions() instanceof Literal literal) {
332+
Object val = literal.value() instanceof BytesRef br ? BytesRefs.toString(br) : literal.value();
333+
dimensions = stringToInt(val.toString());
334+
} else {
335+
throw new EsqlIllegalArgumentException("dimensions only supported with literal values");
336+
}
337+
338+
Layout outputLayout = source.layout.builder().append(embedding.targetField()).build();
339+
EvalOperator.ExpressionEvaluator.Factory inputEvaluatorFactory = EvalMapper.toEvaluator(
340+
context.foldCtx(),
341+
embedding.input(),
342+
source.layout
343+
);
344+
345+
return source.with(
346+
new DenseEmbeddingOperator.Factory(inferenceRunner, inferenceId, dimensions, inputEvaluatorFactory),
347+
outputLayout
348+
);
349+
}
350+
322351
private PhysicalOperation planRrfScoreEvalExec(RrfScoreEvalExec rrf, LocalExecutionPlannerContext context) {
323352
PhysicalOperation source = plan(rrf.child(), context);
324353

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
3030
import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;
3131
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
32+
import org.elasticsearch.xpack.esql.plan.logical.inference.embedding.DenseVectorEmbedding;
3233
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
3334
import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
3435
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
@@ -47,6 +48,7 @@
4748
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
4849
import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec;
4950
import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec;
51+
import org.elasticsearch.xpack.esql.plan.physical.inference.embedding.DenseVectorEmbeddingExec;
5052
import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
5153

5254
import java.util.List;
@@ -106,6 +108,17 @@ static PhysicalPlan mapUnary(UnaryPlan p, PhysicalPlan child) {
106108
return new CompletionExec(completion.source(), child, completion.inferenceId(), completion.prompt(), completion.targetField());
107109
}
108110

111+
if (p instanceof DenseVectorEmbedding embedding) {
112+
return new DenseVectorEmbeddingExec(
113+
embedding.source(),
114+
child,
115+
embedding.inferenceId(),
116+
embedding.dimensions(),
117+
embedding.input(),
118+
embedding.embeddingField()
119+
);
120+
}
121+
109122
if (p instanceof Enrich enrich) {
110123
return new EnrichExec(
111124
enrich.source(),

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/inference/embedding/DenseEmbeddingOperatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public DenseEmbeddingOperatorTests(
8080

8181
@Override
8282
protected Operator.OperatorFactory simple(SimpleOptions options) {
83-
return new DenseEmbeddingOperator.Factory(mockedSimpleInferenceRunner(), dimensions, SIMPLE_INFERENCE_ID, evaluatorFactory(0));
83+
return new DenseEmbeddingOperator.Factory(mockedSimpleInferenceRunner(), SIMPLE_INFERENCE_ID, dimensions, evaluatorFactory(0));
8484
}
8585

8686
@Override

0 commit comments

Comments
 (0)