|
115 | 115 | import org.elasticsearch.xpack.esql.plan.physical.ShowExec; |
116 | 116 | import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec; |
117 | 117 | import org.elasticsearch.xpack.esql.plan.physical.TopNExec; |
| 118 | +import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec; |
118 | 119 | import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec; |
119 | 120 | import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders.ShardContext; |
120 | 121 | import org.elasticsearch.xpack.esql.plugin.QueryPragmas; |
@@ -259,9 +260,12 @@ private PhysicalOperation plan(PhysicalPlan node, LocalExecutionPlannerContext c |
259 | 260 | return planRerank(rerank, context); |
260 | 261 | } else if (node instanceof ChangePointExec changePoint) { |
261 | 262 | return planChangePoint(changePoint, context); |
| 263 | + } else if (node instanceof CompletionExec completion) { |
| 264 | + return planCompletion(completion, context); |
262 | 265 | } else if (node instanceof SampleExec Sample) { |
263 | 266 | return planSample(Sample, context); |
264 | 267 | } |
| 268 | + |
265 | 269 | // source nodes |
266 | 270 | else if (node instanceof EsQueryExec esQuery) { |
267 | 271 | return planEsQueryNode(esQuery, context); |
@@ -296,6 +300,19 @@ else if (node instanceof OutputExec outputExec) { |
296 | 300 | throw new EsqlIllegalArgumentException("unknown physical plan node [" + node.nodeName() + "]"); |
297 | 301 | } |
298 | 302 |
|
| 303 | + private PhysicalOperation planCompletion(CompletionExec completion, LocalExecutionPlannerContext context) { |
| 304 | + PhysicalOperation source = plan(completion.child(), context); |
| 305 | + String inferenceId = BytesRefs.toString(completion.inferenceId().fold(context.foldCtx())); |
| 306 | + Layout outputLayout = source.layout.builder().append(completion.targetField()).build(); |
| 307 | + EvalOperator.ExpressionEvaluator.Factory promptEvaluatorFactory = EvalMapper.toEvaluator( |
| 308 | + context.foldCtx(), |
| 309 | + completion.prompt(), |
| 310 | + source.layout |
| 311 | + ); |
| 312 | + |
| 313 | + return source.with(new CompletionOperator.Factory(inferenceRunner, inferenceId, promptEvaluatorFactory), outputLayout); |
| 314 | + } |
| 315 | + |
299 | 316 | private PhysicalOperation planRrfScoreEvalExec(RrfScoreEvalExec rrf, LocalExecutionPlannerContext context) { |
300 | 317 | PhysicalOperation source = plan(rrf.child(), context); |
301 | 318 |
|
|
0 commit comments