Skip to content

Commit f1ac221

Browse files
authored
ESQL: introduce a pre-mapping logical plan processing step (#121260) (#121382)
This adds a pre-mapping logical plan processing step, occurring after the logical optimisation, but before mapping it to a physical plan. This step can perform async actions, if needed, and involves using a new `TransportActionServices` record with all available services. Furthermore, the query rewriting step part of the `FullTextFunction`s planning (occurring on the coordinator only) is refactored a bit to update the queries in-place. The verification done by `Match` and `Term` involving checking on the argument type is also now pulled back from post-optimisation to post-analysis. Their respective tests are moved accordingly as well. (cherry picked from commit 0393e56)
1 parent a5dcd14 commit f1ac221

File tree

21 files changed

+311
-308
lines changed

21 files changed

+311
-308
lines changed

docs/changelog/121260.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121260
2+
summary: Introduce a pre-mapping logical plan processing step
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/Holder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,16 @@ public void set(T value) {
2626
this.value = value;
2727
}
2828

29+
/**
30+
* Sets a value in the holder, but only if none has already been set.
31+
* @param value the new value to set.
32+
*/
33+
public void setIfAbsent(T value) {
34+
if (this.value == null) {
35+
this.value = value;
36+
}
37+
}
38+
2939
public T get() {
3040
return value;
3141
}

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.apache.lucene.sandbox.document.HalfFloatPoint;
1212
import org.apache.lucene.util.BytesRef;
1313
import org.elasticsearch.ExceptionsHelper;
14+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
15+
import org.elasticsearch.cluster.service.ClusterService;
1416
import org.elasticsearch.common.Strings;
1517
import org.elasticsearch.common.breaker.CircuitBreaker;
1618
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
@@ -31,9 +33,11 @@
3133
import org.elasticsearch.geo.ShapeTestUtils;
3234
import org.elasticsearch.index.IndexMode;
3335
import org.elasticsearch.license.XPackLicenseState;
36+
import org.elasticsearch.search.SearchService;
3437
import org.elasticsearch.tasks.TaskCancelledException;
3538
import org.elasticsearch.test.ESTestCase;
3639
import org.elasticsearch.transport.RemoteTransportException;
40+
import org.elasticsearch.transport.TransportService;
3741
import org.elasticsearch.xcontent.json.JsonXContent;
3842
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
3943
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
@@ -72,8 +76,8 @@
7276
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
7377
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
7478
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
79+
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
7580
import org.elasticsearch.xpack.esql.session.Configuration;
76-
import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
7781
import org.elasticsearch.xpack.esql.stats.SearchStats;
7882
import org.elasticsearch.xpack.esql.telemetry.Metrics;
7983
import org.elasticsearch.xpack.versionfield.Version;
@@ -140,6 +144,7 @@
140144
import static org.hamcrest.Matchers.instanceOf;
141145
import static org.junit.Assert.assertNotNull;
142146
import static org.junit.Assert.assertNull;
147+
import static org.mockito.Mockito.mock;
143148

144149
public final class EsqlTestUtils {
145150

@@ -360,7 +365,14 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
360365

361366
public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));
362367

363-
public static final QueryBuilderResolver MOCK_QUERY_BUILDER_RESOLVER = new MockQueryBuilderResolver();
368+
public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
369+
mock(TransportService.class),
370+
mock(SearchService.class),
371+
null,
372+
mock(ClusterService.class),
373+
mock(IndexNameExpressionResolver.class),
374+
null
375+
);
364376

365377
private EsqlTestUtils() {}
366378

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/MockQueryBuilderResolver.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/MatchFunctionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ public void testWhereMatchWithRow() {
246246
var error = expectThrows(ElasticsearchException.class, () -> run(query));
247247
assertThat(
248248
error.getMessage(),
249-
containsString("[MATCH] function cannot operate on [\"a brown fox\"], which is not a field from an index mapping")
249+
containsString("line 2:15: [MATCH] function cannot operate on [content], which is not a field from an index mapping")
250250
);
251251
}
252252

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/MatchOperatorIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ public void testWhereMatchWithRow() {
230230
var error = expectThrows(ElasticsearchException.class, () -> run(query));
231231
assertThat(
232232
error.getMessage(),
233-
containsString("[:] operator cannot operate on [\"a brown fox\"], which is not a field from an index mapping")
233+
containsString("line 2:9: [:] operator cannot operate on [content], which is not a field from an index mapping")
234234
);
235235
}
236236

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
2222
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
2323
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
24+
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
2425
import org.elasticsearch.xpack.esql.session.Configuration;
2526
import org.elasticsearch.xpack.esql.session.EsqlSession;
2627
import org.elasticsearch.xpack.esql.session.IndexResolver;
27-
import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
2828
import org.elasticsearch.xpack.esql.session.Result;
2929
import org.elasticsearch.xpack.esql.telemetry.Metrics;
3030
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
@@ -62,7 +62,7 @@ public void esql(
6262
EsqlExecutionInfo executionInfo,
6363
IndicesExpressionGrouper indicesExpressionGrouper,
6464
EsqlSession.PlanRunner planRunner,
65-
QueryBuilderResolver queryBuilderResolver,
65+
TransportActionServices services,
6666
ActionListener<Result> listener
6767
) {
6868
final PlanTelemetry planTelemetry = new PlanTelemetry(functionRegistry);
@@ -78,7 +78,7 @@ public void esql(
7878
verifier,
7979
planTelemetry,
8080
indicesExpressionGrouper,
81-
queryBuilderResolver
81+
services
8282
);
8383
QueryMetric clientId = QueryMetric.fromString("rest");
8484
metrics.total(clientId);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.expression.function.fulltext;
99

10-
import org.apache.lucene.util.BytesRef;
10+
import org.elasticsearch.common.lucene.BytesRefs;
1111
import org.elasticsearch.compute.lucene.LuceneQueryExpressionEvaluator;
1212
import org.elasticsearch.compute.lucene.LuceneQueryExpressionEvaluator.ShardConfig;
1313
import org.elasticsearch.compute.operator.EvalOperator;
@@ -110,11 +110,7 @@ public Expression query() {
110110
*/
111111
public Object queryAsObject() {
112112
Object queryAsObject = query().fold(FoldContext.small() /* TODO remove me */);
113-
if (queryAsObject instanceof BytesRef bytesRef) {
114-
return bytesRef.utf8ToString();
115-
}
116-
117-
return queryAsObject;
113+
return BytesRefs.toString(queryAsObject);
118114
}
119115

120116
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/Match.java

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.common.unit.Fuzziness;
1616
import org.elasticsearch.index.query.QueryBuilder;
17-
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
17+
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
1818
import org.elasticsearch.xpack.esql.common.Failure;
1919
import org.elasticsearch.xpack.esql.common.Failures;
2020
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.xpack.esql.core.type.DataType;
3131
import org.elasticsearch.xpack.esql.core.type.DataTypeConverter;
3232
import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField;
33+
import org.elasticsearch.xpack.esql.core.util.Check;
3334
import org.elasticsearch.xpack.esql.core.util.NumericUtils;
3435
import org.elasticsearch.xpack.esql.expression.function.Example;
3536
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
@@ -38,6 +39,7 @@
3839
import org.elasticsearch.xpack.esql.expression.function.Param;
3940
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractConvertFunction;
4041
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
42+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
4143
import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
4244
import org.elasticsearch.xpack.esql.querydsl.query.MatchQuery;
4345
import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter;
@@ -48,6 +50,7 @@
4850
import java.util.Map;
4951
import java.util.Objects;
5052
import java.util.Set;
53+
import java.util.function.BiConsumer;
5154

5255
import static java.util.Map.entry;
5356
import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
@@ -88,7 +91,7 @@
8891
/**
8992
* Full text function that performs a {@link org.elasticsearch.xpack.esql.querydsl.query.MatchQuery} .
9093
*/
91-
public class Match extends FullTextFunction implements OptionalArgument, PostOptimizationVerificationAware {
94+
public class Match extends FullTextFunction implements OptionalArgument, PostAnalysisPlanVerificationAware {
9295

9396
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Match", Match::readFrom);
9497
public static final Set<DataType> FIELD_DATA_TYPES = Set.of(
@@ -429,23 +432,23 @@ public Expression replaceQueryBuilder(QueryBuilder queryBuilder) {
429432
}
430433

431434
@Override
432-
public void postOptimizationVerification(Failures failures) {
433-
Expression fieldExpression = field();
434-
// Field may be converted to other data type (field_name :: data_type), so we need to check the original field
435-
if (fieldExpression instanceof AbstractConvertFunction convertFunction) {
436-
fieldExpression = convertFunction.field();
437-
}
438-
if (fieldExpression instanceof FieldAttribute == false) {
439-
failures.add(
440-
Failure.fail(
441-
field,
442-
"[{}] {} cannot operate on [{}], which is not a field from an index mapping",
443-
functionName(),
444-
functionType(),
445-
field.sourceText()
446-
)
447-
);
448-
}
435+
public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
436+
return (plan, failures) -> {
437+
super.postAnalysisPlanVerification().accept(plan, failures);
438+
plan.forEachExpression(Match.class, m -> {
439+
if (m.fieldAsFieldAttribute() == null) {
440+
failures.add(
441+
Failure.fail(
442+
m.field(),
443+
"[{}] {} cannot operate on [{}], which is not a field from an index mapping",
444+
functionName(),
445+
functionType(),
446+
m.field().sourceText()
447+
)
448+
);
449+
}
450+
});
451+
};
449452
}
450453

451454
@Override
@@ -476,22 +479,24 @@ public Object queryAsObject() {
476479

477480
@Override
478481
protected Query translate(TranslatorHandler handler) {
482+
var fieldAttribute = fieldAsFieldAttribute();
483+
Check.notNull(fieldAttribute, "Match must have a field attribute as the first argument");
484+
String fieldName = fieldAttribute.name();
485+
if (fieldAttribute.field() instanceof MultiTypeEsField multiTypeEsField) {
486+
// If we have multiple field types, we allow the query to be done, but getting the underlying field name
487+
fieldName = multiTypeEsField.getName();
488+
}
489+
// Make query lenient so mixed field types can be queried when a field type is incompatible with the value provided
490+
return new MatchQuery(source(), fieldName, queryAsObject(), matchQueryOptions());
491+
}
492+
493+
private FieldAttribute fieldAsFieldAttribute() {
479494
Expression fieldExpression = field;
480495
// Field may be converted to other data type (field_name :: data_type), so we need to check the original field
481496
if (fieldExpression instanceof AbstractConvertFunction convertFunction) {
482497
fieldExpression = convertFunction.field();
483498
}
484-
if (fieldExpression instanceof FieldAttribute fieldAttribute) {
485-
String fieldName = fieldAttribute.name();
486-
if (fieldAttribute.field() instanceof MultiTypeEsField multiTypeEsField) {
487-
// If we have multiple field types, we allow the query to be done, but getting the underlying field name
488-
fieldName = multiTypeEsField.getName();
489-
}
490-
// Make query lenient so mixed field types can be queried when a field type is incompatible with the value provided
491-
return new MatchQuery(source(), fieldName, queryAsObject(), matchQueryOptions());
492-
}
493-
494-
throw new IllegalArgumentException("Match must have a field attribute as the first argument");
499+
return fieldExpression instanceof FieldAttribute fieldAttribute ? fieldAttribute : null;
495500
}
496501

497502
@Override
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.fulltext;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.ResolvedIndices;
12+
import org.elasticsearch.index.query.QueryBuilder;
13+
import org.elasticsearch.index.query.QueryRewriteContext;
14+
import org.elasticsearch.index.query.Rewriteable;
15+
import org.elasticsearch.xpack.esql.core.util.Holder;
16+
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
17+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
18+
import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
19+
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
20+
import org.elasticsearch.xpack.esql.session.IndexResolver;
21+
22+
import java.io.IOException;
23+
import java.util.HashSet;
24+
import java.util.Set;
25+
26+
/**
27+
* Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match}
28+
* will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator.
29+
* {@link QueryBuilderResolver#resolveQueryBuilders(LogicalPlan, TransportActionServices, ActionListener)} will rewrite the plan by
30+
* replacing {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s.
31+
*/
32+
public final class QueryBuilderResolver {
33+
34+
private QueryBuilderResolver() {}
35+
36+
public static void resolveQueryBuilders(LogicalPlan plan, TransportActionServices services, ActionListener<LogicalPlan> listener) {
37+
var hasFullTextFunctions = plan.anyMatch(p -> {
38+
Holder<Boolean> hasFullTextFunction = new Holder<>(false);
39+
p.forEachExpression(FullTextFunction.class, unused -> hasFullTextFunction.set(true));
40+
return hasFullTextFunction.get();
41+
});
42+
if (hasFullTextFunctions) {
43+
Rewriteable.rewriteAndFetch(
44+
new FullTextFunctionsRewritable(plan),
45+
queryRewriteContext(services, indexNames(plan)),
46+
listener.delegateFailureAndWrap((l, r) -> l.onResponse(r.plan))
47+
);
48+
} else {
49+
listener.onResponse(plan);
50+
}
51+
}
52+
53+
private static QueryRewriteContext queryRewriteContext(TransportActionServices services, Set<String> indexNames) {
54+
ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions(
55+
indexNames.toArray(String[]::new),
56+
IndexResolver.FIELD_CAPS_INDICES_OPTIONS,
57+
services.clusterService().state(),
58+
services.indexNameExpressionResolver(),
59+
services.transportService().getRemoteClusterService(),
60+
System.currentTimeMillis()
61+
);
62+
63+
return services.searchService().getRewriteContext(System::currentTimeMillis, resolvedIndices, null);
64+
}
65+
66+
private static Set<String> indexNames(LogicalPlan plan) {
67+
Set<String> indexNames = new HashSet<>();
68+
plan.forEachDown(EsRelation.class, esRelation -> indexNames.addAll(esRelation.concreteIndices()));
69+
return indexNames;
70+
}
71+
72+
private record FullTextFunctionsRewritable(LogicalPlan plan) implements Rewriteable<QueryBuilderResolver.FullTextFunctionsRewritable> {
73+
@Override
74+
public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException {
75+
Holder<IOException> exceptionHolder = new Holder<>();
76+
Holder<Boolean> updated = new Holder<>(false);
77+
LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, f -> {
78+
QueryBuilder builder = f.queryBuilder(), initial = builder;
79+
builder = builder == null ? f.asQuery(TranslatorHandler.TRANSLATOR_HANDLER).asBuilder() : builder;
80+
try {
81+
builder = builder.rewrite(ctx);
82+
} catch (IOException e) {
83+
exceptionHolder.setIfAbsent(e);
84+
}
85+
var rewritten = builder != initial;
86+
updated.set(updated.get() || rewritten);
87+
return rewritten ? f.replaceQueryBuilder(builder) : f;
88+
});
89+
if (exceptionHolder.get() != null) {
90+
throw exceptionHolder.get();
91+
}
92+
return updated.get() ? new FullTextFunctionsRewritable(newPlan) : this;
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)