Skip to content

Commit 3874506

Browse files
ESQL: Logical Planning on the Lookup Node (#144241)
* Lookup Logical Planning * Address code review comments, UTs Assisted by Cursor/Claude
1 parent 9db4138 commit 3874506

File tree

21 files changed

+786
-95
lines changed

21 files changed

+786
-95
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupQueryOperator.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public final class LookupQueryOperator implements Operator {
5555
private final IndexSearcher searcher;
5656
private final Warnings warnings;
5757
private final int maxPageSize;
58+
private final boolean emptyResult;
5859

5960
private Page currentInputPage;
6061
private int queryPosition = -1;
@@ -77,7 +78,8 @@ public LookupQueryOperator(
7778
IndexedByShardId<? extends ShardContext> shardContexts,
7879
int shardId,
7980
SearchExecutionContext searchExecutionContext,
80-
Warnings warnings
81+
Warnings warnings,
82+
boolean emptyResult
8183
) {
8284
this.blockFactory = blockFactory;
8385
this.maxPageSize = maxPageSize;
@@ -86,6 +88,7 @@ public LookupQueryOperator(
8688
this.shardContext = shardContexts.get(shardId);
8789
this.shardContext.incRef();
8890
this.searchExecutionContext = searchExecutionContext;
91+
this.emptyResult = emptyResult;
8992
try {
9093
if (shardContext.searcher().getIndexReader() instanceof DirectoryReader directoryReader) {
9194
// This optimization is currently disabled for ParallelCompositeReader
@@ -105,10 +108,14 @@ public void addInput(Page page) {
105108
if (currentInputPage != null) {
106109
throw new IllegalStateException("Operator already has input page, must consume it first");
107110
}
108-
currentInputPage = page;
109-
queryPosition = -1; // Reset query position for new page
110111
pagesReceived++;
111112
rowsReceived += page.getPositionCount();
113+
if (emptyResult) {
114+
page.releaseBlocks();
115+
return;
116+
}
117+
currentInputPage = page;
118+
queryPosition = -1; // Reset query position for new page
112119
}
113120

114121
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/LookupQueryOperatorTests.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,20 +124,21 @@ public Operator get(DriverContext driverContext) {
124124
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
125125
0,
126126
directoryData.searchExecutionContext,
127-
warnings()
127+
warnings(),
128+
false
128129
);
129130
}
130131

131132
@Override
132133
public String describe() {
133-
return "LookupQueryOperator[maxPageSize=256]";
134+
return "LookupQueryOperator[maxPageSize=256, emptyResult=false]";
134135
}
135136
};
136137
}
137138

138139
@Override
139140
protected Matcher<String> expectedDescriptionOfSimple() {
140-
return equalTo("LookupQueryOperator[maxPageSize=256]");
141+
return equalTo("LookupQueryOperator[maxPageSize=256, emptyResult=false]");
141142
}
142143

143144
@Override
@@ -180,7 +181,8 @@ public void testNoMatchesScenario() throws Exception {
180181
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(noMatchDirectory.reader)),
181182
0,
182183
noMatchDirectory.searchExecutionContext,
183-
warnings()
184+
warnings(),
185+
false
184186
)
185187
) {
186188
// Create input with non-matching terms
@@ -237,7 +239,8 @@ public void testGetOutputNeverNullWhileCanProduceMore() throws Exception {
237239
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
238240
0,
239241
directoryData.searchExecutionContext,
240-
warnings()
242+
warnings(),
243+
false
241244
)
242245
) {
243246
// Create input with many matching terms
@@ -283,7 +286,8 @@ public void testMixedMatchesAndNoMatches() throws Exception {
283286
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
284287
0,
285288
directoryData.searchExecutionContext,
286-
warnings()
289+
warnings(),
290+
false
287291
)
288292
) {
289293
// Mix of matching and non-matching terms
@@ -325,6 +329,48 @@ public void testMixedMatchesAndNoMatches() throws Exception {
325329
}
326330
}
327331

332+
/**
333+
* Test that when emptyResult=true the operator discards all input pages without producing output.
334+
*/
335+
public void testEmptyResultDiscardsInput() {
336+
DriverContext driverContext = driverContext();
337+
QueryList queryList = QueryList.rawTermQueryList(directoryData.field, AliasFilter.EMPTY, 0, ElementType.BYTES_REF);
338+
339+
try (
340+
LookupQueryOperator operator = new LookupQueryOperator(
341+
driverContext.blockFactory(),
342+
LookupQueryOperator.DEFAULT_MAX_PAGE_SIZE,
343+
queryList,
344+
new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)),
345+
0,
346+
directoryData.searchExecutionContext,
347+
warnings(),
348+
true
349+
)
350+
) {
351+
assertTrue("Should need input initially", operator.needsInput());
352+
assertFalse("Should not be finished before finish() is called", operator.isFinished());
353+
354+
// Feed multiple pages with terms that would normally match
355+
for (int p = 0; p < 3; p++) {
356+
try (BytesRefBlock.Builder builder = driverContext.blockFactory().newBytesRefBlockBuilder(10)) {
357+
for (int i = 0; i < 10; i++) {
358+
builder.appendBytesRef(new BytesRef("term-" + i));
359+
}
360+
operator.addInput(new Page(builder.build()));
361+
}
362+
363+
assertNull("Should never produce output when emptyResult=true", operator.getOutput());
364+
assertFalse("Should not be able to produce more data", operator.canProduceMoreDataWithoutExtraInput());
365+
assertTrue("Should still need input (not finished)", operator.needsInput());
366+
}
367+
368+
operator.finish();
369+
assertTrue("Should be finished after finish()", operator.isFinished());
370+
assertNull("Should return null after finish()", operator.getOutput());
371+
}
372+
}
373+
328374
private static Warnings warnings() {
329375
return Warnings.createWarnings(DriverContext.WarningsMode.COLLECT, new TestWarningsSource("test"));
330376
}

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ public enum Config {
469469

470470
private final Map<Config, Set<String>> includes = new HashMap<>();
471471
private final Map<Config, Set<String>> excludes = new HashMap<>();
472+
private final Map<String, String> constantValues = new HashMap<>();
472473

473474
public TestConfigurableSearchStats include(Config key, String... fields) {
474475
// If this method is called with no fields, it is interpreted to mean include none, so we include a dummy field
@@ -513,6 +514,16 @@ public boolean hasExactSubfield(FieldName field) {
513514
return isConfigationSet(Config.EXACT_SUBFIELD, field.string());
514515
}
515516

517+
public TestConfigurableSearchStats withConstantValue(String field, String value) {
518+
constantValues.put(field, value);
519+
return this;
520+
}
521+
522+
@Override
523+
public String constantValue(FieldName name) {
524+
return constantValues.get(name.string());
525+
}
526+
516527
@Override
517528
public String toString() {
518529
return "TestConfigurableSearchStats{" + "includes=" + includes + ", excludes=" + excludes + '}';

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -127,27 +127,19 @@ public void testJoinOnFourKeys() throws IOException {
127127
new String[] { "one", "two", "three", "four" },
128128
new Integer[] { 1, 2, 3, 4 }, }
129129
),
130-
buildGreaterThanFilter(1L)
130+
1L
131131
);
132132
}
133133

134134
public void testLongKey() throws IOException {
135-
runLookup(
136-
List.of(DataType.LONG),
137-
new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }),
138-
buildGreaterThanFilter(0L)
139-
);
135+
runLookup(List.of(DataType.LONG), new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }), 0L);
140136
}
141137

142138
/**
143139
* LOOKUP multiple results match.
144140
*/
145141
public void testLookupIndexMultiResults() throws IOException {
146-
runLookup(
147-
List.of(DataType.KEYWORD),
148-
new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }),
149-
buildGreaterThanFilter(-1L)
150-
);
142+
runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }), -1L);
151143
}
152144

153145
public void testJoinOnTwoKeysMultiResults() throws IOException {
@@ -236,19 +228,28 @@ public void populate(int docCount, List<String> expected, Predicate<Integer> fil
236228
}
237229
}
238230

239-
private PhysicalPlan buildGreaterThanFilter(long value) {
240-
FieldAttribute filterAttribute = new FieldAttribute(
231+
private static PhysicalPlan buildGreaterThanFilter(long value, FieldAttribute filterAttribute) {
232+
Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
233+
EsRelation esRelation = new EsRelation(
241234
Source.EMPTY,
242-
"l",
243-
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
235+
"test",
236+
IndexMode.LOOKUP,
237+
Map.of(),
238+
Map.of(),
239+
Map.of(),
240+
List.of(filterAttribute)
244241
);
245-
Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
246-
EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), Map.of(), Map.of(), List.of());
247242
Filter filter = new Filter(Source.EMPTY, esRelation, greaterThan);
248243
return new FragmentExec(filter);
249244
}
250245

251-
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, PhysicalPlan pushedDownFilter) throws IOException {
246+
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, Long filterValue) throws IOException {
247+
FieldAttribute lAttribute = new FieldAttribute(
248+
Source.EMPTY,
249+
"l",
250+
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
251+
);
252+
PhysicalPlan pushedDownFilter = filterValue != null ? buildGreaterThanFilter(filterValue, lAttribute) : null;
252253
String[] fieldMappers = new String[keyTypes.size() * 2];
253254
for (int i = 0; i < keyTypes.size(); i++) {
254255
fieldMappers[2 * i] = "key" + i;
@@ -403,13 +404,7 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
403404
ctx -> internalCluster().getInstance(TransportEsqlQueryAction.class, finalNodeWithShard).getLookupFromIndexService(),
404405
"lookup",
405406
"lookup",
406-
List.of(
407-
new FieldAttribute(
408-
Source.EMPTY,
409-
"l",
410-
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
411-
)
412-
),
407+
List.of(lAttribute),
413408
Source.EMPTY,
414409
pushedDownFilter,
415410
Predicates.combineAnd(joinOnConditions),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlanner.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.compute.lucene.ShardContext;
1919
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
2020
import org.elasticsearch.compute.operator.DriverContext;
21+
import org.elasticsearch.compute.operator.EvalOperator.EvalOperatorFactory;
2122
import org.elasticsearch.compute.operator.FilterOperator;
2223
import org.elasticsearch.compute.operator.Operator;
2324
import org.elasticsearch.compute.operator.Operator.OperatorFactory;
@@ -37,6 +38,7 @@
3738
import org.elasticsearch.index.query.SearchExecutionContext;
3839
import org.elasticsearch.search.internal.AliasFilter;
3940
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
41+
import org.elasticsearch.xpack.esql.core.expression.Alias;
4042
import org.elasticsearch.xpack.esql.core.expression.Attribute;
4143
import org.elasticsearch.xpack.esql.core.expression.Expression;
4244
import org.elasticsearch.xpack.esql.core.expression.Expressions;
@@ -45,6 +47,7 @@
4547
import org.elasticsearch.xpack.esql.core.tree.Source;
4648
import org.elasticsearch.xpack.esql.core.type.DataType;
4749
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
50+
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
4851
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
4952
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
5053
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
@@ -271,6 +274,8 @@ private PhysicalOperation planLookupNode(
271274
);
272275
} else if (node instanceof FieldExtractExec fieldExtractExec) {
273276
return planFieldExtractExec(plannerSettings, fieldExtractExec, source);
277+
} else if (node instanceof EvalExec evalExec) {
278+
return planEvalExec(evalExec, source, foldCtx);
274279
} else if (node instanceof FilterExec filterExec) {
275280
return planFilterExec(filterExec, source, foldCtx);
276281
} else if (node instanceof ProjectExec projectExec) {
@@ -304,7 +309,8 @@ private PhysicalOperation planParameterizedQueryExec(
304309
parameterizedQueryExec.joinOnConditions(),
305310
parameterizedQueryExec.query(),
306311
lookupSource,
307-
queryListFromPlanFactory
312+
queryListFromPlanFactory,
313+
parameterizedQueryExec.emptyResult()
308314
);
309315

310316
return PhysicalOperation.fromSource(sourceFactory, layout).with(enrichQueryFactory, layout);
@@ -408,6 +414,16 @@ public String describe() {
408414
}, layout);
409415
}
410416

417+
private PhysicalOperation planEvalExec(EvalExec evalExec, PhysicalOperation source, FoldContext foldCtx) {
418+
for (Alias field : evalExec.fields()) {
419+
var evaluatorSupplier = EvalMapper.toEvaluator(foldCtx, field.child(), source.layout());
420+
Layout.Builder layout = source.layout().builder();
421+
layout.append(field.toAttribute());
422+
source = source.with(new EvalOperatorFactory(evaluatorSupplier), layout.build());
423+
}
424+
return source;
425+
}
426+
411427
private PhysicalOperation planFilterExec(FilterExec filterExec, PhysicalOperation source, FoldContext foldCtx) {
412428
return source.with(
413429
new FilterOperator.FilterOperatorFactory(EvalMapper.toEvaluator(foldCtx, filterExec.condition(), source.layout())),
@@ -441,7 +457,8 @@ private record LookupQueryOperatorFactory(
441457
@Nullable Expression joinOnConditions,
442458
@Nullable QueryBuilder query,
443459
Source planSource,
444-
QueryListFromPlanFactory queryListFromPlanFactory
460+
QueryListFromPlanFactory queryListFromPlanFactory,
461+
boolean emptyResult
445462
) implements OperatorFactory {
446463
@Override
447464
public Operator get(DriverContext driverContext) {
@@ -468,13 +485,14 @@ public Operator get(DriverContext driverContext) {
468485
shardContexts,
469486
shardId,
470487
searchExecutionContext,
471-
warnings
488+
warnings,
489+
emptyResult
472490
);
473491
}
474492

475493
@Override
476494
public String describe() {
477-
return "LookupQueryOperator[maxPageSize=" + maxPageSize + "]";
495+
return "LookupQueryOperator[maxPageSize=" + maxPageSize + ", emptyResult=" + emptyResult + "]";
478496
}
479497
}
480498

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@
5757
import org.elasticsearch.xpack.esql.expression.predicate.Predicates;
5858
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
5959
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
60+
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
6061
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
62+
import org.elasticsearch.xpack.esql.optimizer.LookupLogicalOptimizer;
6163
import org.elasticsearch.xpack.esql.optimizer.LookupPhysicalPlanOptimizer;
6264
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
6365
import org.elasticsearch.xpack.esql.plan.logical.Filter;
@@ -849,9 +851,10 @@ public static LogicalPlan buildLocalLogicalPlan(
849851
/**
850852
* Builds the output attributes for a {@link ParameterizedQuery}, mirroring how {@link EsRelation}
851853
* exposes all index fields. This ensures the logical verifier can validate that all field references
852-
* in the plan are satisfied. At the physical level, {@code ReplaceSourceAttributes}
854+
* in the plan are satisfied. At the physical level,
855+
* {@link org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ReplaceSourceAttributes ReplaceSourceAttributes}
853856
* strips the output back down to just {@code [_doc, _positions]}, and {@code InsertFieldExtraction}
854-
* adds the needed fields back — the same pattern used for {@code EsRelation / ReplaceSourceAttributes}.
857+
* adds the needed fields back — the same pattern used for {@code EsRelation}.
855858
*/
856859
private static List<Attribute> buildParameterizedQueryOutput(
857860
FieldAttribute docAttribute,
@@ -879,7 +882,7 @@ private static List<Attribute> buildParameterizedQueryOutput(
879882

880883
/**
881884
* Builds the physical plan for the lookup node by running:
882-
* LocalMapper.map -> LookupPhysicalPlanOptimizer.
885+
* LookupLogicalOptimizer -> LocalMapper.map -> LookupPhysicalPlanOptimizer.
883886
* The caller is responsible for building the logical plan via {@link #buildLocalLogicalPlan}.
884887
*/
885888
public static PhysicalPlan createLookupPhysicalPlan(
@@ -890,7 +893,9 @@ public static PhysicalPlan createLookupPhysicalPlan(
890893
SearchStats searchStats,
891894
EsqlFlags flags
892895
) {
893-
PhysicalPlan physicalPlan = LocalMapper.INSTANCE.map(logicalPlan);
896+
LogicalPlan optimizedLogical = new LookupLogicalOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats))
897+
.localOptimize(logicalPlan);
898+
PhysicalPlan physicalPlan = LocalMapper.INSTANCE.map(optimizedLogical);
894899
LocalPhysicalOptimizerContext context = new LocalPhysicalOptimizerContext(
895900
plannerSettings,
896901
flags,

0 commit comments

Comments
 (0)