Skip to content

Commit d14ebcd

Browse files
EsQueryExec refactor
1 parent 5112eb4 commit d14ebcd

File tree

10 files changed

+133
-215
lines changed

10 files changed

+133
-215
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,10 @@ private static PhysicalPlan rewrite(
126126
queryExec.indexMode(),
127127
queryExec.indexNameWithModes(),
128128
queryExec.output(),
129-
query,
130129
queryExec.limit(),
131130
queryExec.sorts(),
132-
queryExec.estimatedRowSize()
131+
queryExec.estimatedRowSize(),
132+
List.of(new EsQueryExec.QueryBuilderAndTags(query, List.of()))
133133
);
134134
// If the eval contains other aliases, not just field attributes, we need to keep them in the plan
135135
PhysicalPlan plan = evalFields.isEmpty() ? queryExec : new EvalExec(filterExec.source(), queryExec, evalFields);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTags.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,6 @@ private static PhysicalPlan planRoundTo(RoundTo roundTo, EvalExec evalExec, EsQu
426426
queryExec.indexMode(),
427427
queryExec.indexNameWithModes(),
428428
newAttributes,
429-
queryExec.query(),
430429
queryExec.limit(),
431430
queryExec.sorts(),
432431
queryExec.estimatedRowSize(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
3737
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
3838
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
39-
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
4039
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
4140
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
4241
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
@@ -105,7 +104,6 @@ public static List<NamedWriteableRegistry.Entry> physical() {
105104
CompletionExec.ENTRY,
106105
DissectExec.ENTRY,
107106
EnrichExec.ENTRY,
108-
EsQueryExec.ENTRY,
109107
EsSourceExec.ENTRY,
110108
EvalExec.ENTRY,
111109
ExchangeExec.ENTRY,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java

Lines changed: 56 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
package org.elasticsearch.xpack.esql.plan.physical;
99

10-
import org.elasticsearch.common.Strings;
11-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1210
import org.elasticsearch.common.io.stream.StreamInput;
1311
import org.elasticsearch.common.io.stream.StreamOutput;
1412
import org.elasticsearch.index.IndexMode;
@@ -28,32 +26,19 @@
2826
import org.elasticsearch.xpack.esql.core.type.DataType;
2927
import org.elasticsearch.xpack.esql.core.type.EsField;
3028
import org.elasticsearch.xpack.esql.expression.Order;
31-
import org.elasticsearch.xpack.esql.index.EsIndex;
32-
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
33-
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
3429

3530
import java.io.IOException;
3631
import java.util.List;
3732
import java.util.Map;
3833
import java.util.Objects;
3934

40-
import static org.elasticsearch.TransportVersions.ESQL_SKIP_ES_INDEX_SERIALIZATION;
41-
4235
public class EsQueryExec extends LeafExec implements EstimatesRowSize {
43-
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
44-
PhysicalPlan.class,
45-
"EsQueryExec",
46-
EsQueryExec::readFrom
47-
);
48-
4936
public static final EsField DOC_ID_FIELD = new EsField("_doc", DataType.DOC_DATA_TYPE, Map.of(), false);
50-
public static final List<Sort> NO_SORTS = List.of(); // only exists to mimic older serialization, but we no longer serialize sorts
5137

5238
private final String indexPattern;
5339
private final IndexMode indexMode;
5440
private final Map<String, IndexMode> indexNameWithModes;
5541
private final List<Attribute> attrs;
56-
private final QueryBuilder query;
5742
private final Expression limit;
5843
private final List<Sort> sorts;
5944

@@ -128,21 +113,17 @@ public EsQueryExec(
128113
List<Attribute> attributes,
129114
QueryBuilder query
130115
) {
131-
this(source, indexPattern, indexMode, indexNameWithModes, attributes, query, null, null, null, null);
132-
}
133-
134-
public EsQueryExec(
135-
Source source,
136-
String indexPattern,
137-
IndexMode indexMode,
138-
Map<String, IndexMode> indexNameWithModes,
139-
List<Attribute> attrs,
140-
QueryBuilder query,
141-
Expression limit,
142-
List<Sort> sorts,
143-
Integer estimatedRowSize
144-
) {
145-
this(source, indexPattern, indexMode, indexNameWithModes, attrs, query, limit, sorts, estimatedRowSize, null);
116+
this(
117+
source,
118+
indexPattern,
119+
indexMode,
120+
indexNameWithModes,
121+
attributes,
122+
null,
123+
null,
124+
null,
125+
List.of(new QueryBuilderAndTags(query, List.of()))
126+
);
146127
}
147128

148129
public EsQueryExec(
@@ -151,7 +132,6 @@ public EsQueryExec(
151132
IndexMode indexMode,
152133
Map<String, IndexMode> indexNameWithModes,
153134
List<Attribute> attrs,
154-
QueryBuilder query,
155135
Expression limit,
156136
List<Sort> sorts,
157137
Integer estimatedRowSize,
@@ -162,69 +142,21 @@ public EsQueryExec(
162142
this.indexMode = indexMode;
163143
this.indexNameWithModes = indexNameWithModes;
164144
this.attrs = attrs;
165-
this.query = query;
166145
this.limit = limit;
167146
this.sorts = sorts;
168147
this.estimatedRowSize = estimatedRowSize;
148+
// cannot keep the ctor with QueryBuilder as it has the same number of arguments as this ctor, EsqlNodeSubclassTests will fail
169149
this.queryBuilderAndTags = queryBuilderAndTags;
170150
}
171151

172-
/**
173-
* The matching constructor is used during physical plan optimization and needs valid sorts. But we no longer serialize sorts.
174-
* If this cluster node is talking to an older instance it might receive a plan with sorts, but it will ignore them.
175-
*/
176-
private static EsQueryExec readFrom(StreamInput in) throws IOException {
177-
var source = Source.readFrom((PlanStreamInput) in);
178-
String indexPattern;
179-
Map<String, IndexMode> indexNameWithModes;
180-
if (in.getTransportVersion().onOrAfter(ESQL_SKIP_ES_INDEX_SERIALIZATION)) {
181-
indexPattern = in.readString();
182-
indexNameWithModes = in.readMap(IndexMode::readFrom);
183-
} else {
184-
var index = EsIndex.readFrom(in);
185-
indexPattern = index.name();
186-
indexNameWithModes = index.indexNameWithModes();
187-
}
188-
var indexMode = EsRelation.readIndexMode(in);
189-
var attrs = in.readNamedWriteableCollectionAsList(Attribute.class);
190-
var query = in.readOptionalNamedWriteable(QueryBuilder.class);
191-
var limit = in.readOptionalNamedWriteable(Expression.class);
192-
in.readOptionalCollectionAsList(EsQueryExec::readSort);
193-
var rowSize = in.readOptionalVInt();
194-
// Ignore sorts from the old serialization format
195-
// queryBuilderAndTags is built on data node, serialization is not needed
196-
return new EsQueryExec(source, indexPattern, indexMode, indexNameWithModes, attrs, query, limit, NO_SORTS, rowSize, null);
197-
}
198-
199-
private static Sort readSort(StreamInput in) throws IOException {
200-
return FieldSort.readFrom(in);
201-
}
202-
203-
private static void writeSort(StreamOutput out, Sort sort) {
204-
throw new IllegalStateException("sorts are no longer serialized");
205-
}
206-
207152
@Override
208153
public void writeTo(StreamOutput out) throws IOException {
209-
Source.EMPTY.writeTo(out);
210-
if (out.getTransportVersion().onOrAfter(ESQL_SKIP_ES_INDEX_SERIALIZATION)) {
211-
out.writeString(indexPattern);
212-
out.writeMap(indexNameWithModes, (o, v) -> IndexMode.writeTo(v, out));
213-
} else {
214-
new EsIndex(indexPattern, Map.of(), indexNameWithModes).writeTo(out);
215-
}
216-
EsRelation.writeIndexMode(out, indexMode());
217-
out.writeNamedWriteableCollection(output());
218-
out.writeOptionalNamedWriteable(query());
219-
out.writeOptionalNamedWriteable(limit());
220-
out.writeOptionalCollection(NO_SORTS, EsQueryExec::writeSort);
221-
out.writeOptionalVInt(estimatedRowSize());
222-
// queryBuilderAndTags is built on data node, serialization is not needed
154+
throw new UnsupportedOperationException("not serialized");
223155
}
224156

225157
@Override
226158
public String getWriteableName() {
227-
return ENTRY.name;
159+
throw new UnsupportedOperationException("not serialized");
228160
}
229161

230162
public static boolean isSourceAttribute(Attribute attr) {
@@ -249,7 +181,6 @@ protected NodeInfo<EsQueryExec> info() {
249181
indexMode,
250182
indexNameWithModes,
251183
attrs,
252-
query,
253184
limit,
254185
sorts,
255186
estimatedRowSize,
@@ -269,8 +200,13 @@ public Map<String, IndexMode> indexNameWithModes() {
269200
return indexNameWithModes;
270201
}
271202

203+
/**
204+
* query is merged into queryBuilderAndTags, keep this method as it is called by too many places.
205+
* If this method is called, the caller looks for the original queryBuilder, before {@code ReplaceRoundToWithQueryAndTags} converts it
206+
* to multiple queries with tags.
207+
*/
272208
public QueryBuilder query() {
273-
return query;
209+
return queryWithoutTag();
274210
}
275211

276212
@Override
@@ -312,7 +248,7 @@ public PhysicalPlan estimateRowSize(State state) {
312248
}
313249
return Objects.equals(this.estimatedRowSize, size)
314250
? this
315-
: new EsQueryExec(source(), indexPattern, indexMode, indexNameWithModes, attrs, query, limit, sorts, size, queryBuilderAndTags);
251+
: new EsQueryExec(source(), indexPattern, indexMode, indexNameWithModes, attrs, limit, sorts, size, queryBuilderAndTags);
316252
}
317253

318254
public EsQueryExec withLimit(Expression limit) {
@@ -324,7 +260,6 @@ public EsQueryExec withLimit(Expression limit) {
324260
indexMode,
325261
indexNameWithModes,
326262
attrs,
327-
query,
328263
limit,
329264
sorts,
330265
estimatedRowSize,
@@ -349,28 +284,32 @@ public EsQueryExec withSorts(List<Sort> sorts) {
349284
indexMode,
350285
indexNameWithModes,
351286
attrs,
352-
query,
353287
limit,
354288
sorts,
355289
estimatedRowSize,
356290
queryBuilderAndTags
357291
);
358292
}
359293

294+
/**
295+
* query is merged into queryBuilderAndTags, keep this method as it is called by too many places.
296+
* If this method is called, the caller looks for the original queryBuilder, before {@code ReplaceRoundToWithQueryAndTags} converts it
297+
* to multiple queries with tags.
298+
*/
360299
public EsQueryExec withQuery(QueryBuilder query) {
361-
return Objects.equals(this.query, query)
300+
QueryBuilder thisQuery = queryWithoutTag();
301+
return Objects.equals(thisQuery, query)
362302
? this
363303
: new EsQueryExec(
364304
source(),
365305
indexPattern,
366306
indexMode,
367307
indexNameWithModes,
368308
attrs,
369-
query,
370309
limit,
371310
sorts,
372311
estimatedRowSize,
373-
queryBuilderAndTags
312+
List.of(new QueryBuilderAndTags(query, List.of()))
374313
);
375314
}
376315

@@ -383,9 +322,35 @@ public boolean canSubstituteRoundToWithQueryBuilderAndTags() {
383322
return indexMode != IndexMode.TIME_SERIES && (sorts == null || sorts.isEmpty());
384323
}
385324

325+
/**
326+
* Returns the original queryBuilder before {@code ReplaceRoundToWithQueryAndTags} converts it to multiple queryBuilder with tags.
327+
* If we reach here, the caller is looking for the original query before the rule converts it. If there are multiple queries in
328+
* queryBuilderAndTags or if the single query in queryBuilderAndTags already has a tag, that means
329+
* {@code ReplaceRoundToWithQueryAndTags} already applied to the original query, the original query cannot be retrieved any more,
330+
* exception will be thrown.
331+
*/
332+
private QueryBuilder queryWithoutTag() {
333+
QueryBuilder queryWithoutTag;
334+
if (queryBuilderAndTags == null || queryBuilderAndTags.isEmpty()) {
335+
return null;
336+
} else if (queryBuilderAndTags.size() == 1) {
337+
QueryBuilderAndTags firstQuery = this.queryBuilderAndTags.get(0);
338+
if (firstQuery.tags().isEmpty()) {
339+
queryWithoutTag = firstQuery.query();
340+
} else {
341+
throw new UnsupportedOperationException("query is converted to query with tags: " + "[" + firstQuery + "]");
342+
}
343+
} else {
344+
throw new UnsupportedOperationException(
345+
"query is converted to multiple queries and tags: " + "[" + this.queryBuilderAndTags + "]"
346+
);
347+
}
348+
return queryWithoutTag;
349+
}
350+
386351
@Override
387352
public int hashCode() {
388-
return Objects.hash(indexPattern, indexMode, indexNameWithModes, attrs, query, limit, sorts, queryBuilderAndTags);
353+
return Objects.hash(indexPattern, indexMode, indexNameWithModes, attrs, limit, sorts, queryBuilderAndTags);
389354
}
390355

391356
@Override
@@ -403,7 +368,6 @@ public boolean equals(Object obj) {
403368
&& Objects.equals(indexMode, other.indexMode)
404369
&& Objects.equals(indexNameWithModes, other.indexNameWithModes)
405370
&& Objects.equals(attrs, other.attrs)
406-
&& Objects.equals(query, other.query)
407371
&& Objects.equals(limit, other.limit)
408372
&& Objects.equals(sorts, other.sorts)
409373
&& Objects.equals(estimatedRowSize, other.estimatedRowSize)
@@ -419,9 +383,6 @@ public String nodeString() {
419383
+ "indexMode["
420384
+ indexMode
421385
+ "], "
422-
+ "query["
423-
+ (query != null ? Strings.toString(query, false, true) : "")
424-
+ "]"
425386
+ NodeUtils.limitedToString(attrs)
426387
+ ", limit["
427388
+ (limit != null ? limit.toString() : "")

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,8 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
289289
for (Sort sort : sorts) {
290290
sortBuilders.add(sort.sortBuilder());
291291
}
292-
// LuceneTopNSourceOperator does not support QueryAndTags
292+
// LuceneTopNSourceOperator does not support QueryAndTags, if there are multiple queries or if the single query has tags,
293+
// UnsupportedOperationException will be thrown by esQueryExec.query()
293294
luceneFactory = new LuceneTopNSourceOperator.Factory(
294295
shardContexts,
295296
querySupplier(esQueryExec.query()),
@@ -303,9 +304,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
303304
} else {
304305
luceneFactory = new LuceneSourceOperator.Factory(
305306
shardContexts,
306-
esQueryExec.queryBuilderAndTags() != null && esQueryExec.queryBuilderAndTags().isEmpty() == false
307-
? querySupplier(esQueryExec.queryBuilderAndTags())
308-
: querySupplier(esQueryExec.query()),
307+
querySupplier(esQueryExec.queryBuilderAndTags()),
309308
context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
310309
context.queryPragmas().taskConcurrency(),
311310
context.pageSize(rowEstimatedSize),

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,17 @@ public TestPhysicalPlanBuilder limit(int limit) {
582582

583583
public TopNExec build() {
584584
List<Attribute> attributes = new ArrayList<>(fields.values());
585-
PhysicalPlan child = new EsQueryExec(Source.EMPTY, this.index, indexMode, Map.of(), attributes, null, null, List.of(), 0);
585+
PhysicalPlan child = new EsQueryExec(
586+
Source.EMPTY,
587+
this.index,
588+
indexMode,
589+
Map.of(),
590+
attributes,
591+
null,
592+
List.of(),
593+
0,
594+
List.of(new EsQueryExec.QueryBuilderAndTags(null, List.of()))
595+
);
586596
if (aliases.isEmpty() == false) {
587597
child = new EvalExec(Source.EMPTY, child, aliases);
588598
}

0 commit comments

Comments
 (0)