Skip to content

Commit 65e332e

Browse files
committed
WIP: Config refactor, moved to EsqlSession, and added transport version for it
1 parent 05129b0 commit 65e332e

File tree

8 files changed

+218
-102
lines changed

8 files changed

+218
-102
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,13 @@
1616
import org.elasticsearch.xpack.esql.analysis.PreAnalyzer;
1717
import org.elasticsearch.xpack.esql.analysis.Verifier;
1818
import org.elasticsearch.xpack.esql.common.Failures;
19-
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
2019
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
2120
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
22-
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
23-
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
24-
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer;
25-
import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext;
2621
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2722
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
23+
import org.elasticsearch.xpack.esql.plugin.EsqlQueryClusterSettings;
2824
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
2925
import org.elasticsearch.xpack.esql.querylog.EsqlQueryLog;
30-
import org.elasticsearch.xpack.esql.session.Configuration;
3126
import org.elasticsearch.xpack.esql.session.EsqlSession;
3227
import org.elasticsearch.xpack.esql.session.IndexResolver;
3328
import org.elasticsearch.xpack.esql.session.Result;
@@ -72,8 +67,7 @@ public PlanExecutor(
7267
public void esql(
7368
EsqlQueryRequest request,
7469
String sessionId,
75-
Configuration cfg,
76-
FoldContext foldContext,
70+
EsqlQueryClusterSettings esqlQueryClusterSettings,
7771
EnrichPolicyResolver enrichPolicyResolver,
7872
EsqlExecutionInfo executionInfo,
7973
IndicesExpressionGrouper indicesExpressionGrouper,
@@ -84,13 +78,11 @@ public void esql(
8478
final PlanTelemetry planTelemetry = new PlanTelemetry(functionRegistry);
8579
final var session = new EsqlSession(
8680
sessionId,
87-
cfg,
81+
esqlQueryClusterSettings,
8882
indexResolver,
8983
enrichPolicyResolver,
9084
preAnalyzer,
91-
new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(foldContext, services.inferenceService())),
9285
functionRegistry,
93-
new LogicalPlanOptimizer(new LogicalOptimizerContext(cfg, foldContext)),
9486
mapper,
9587
verifier,
9688
planTelemetry,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/EsqlParser.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
2323
import org.elasticsearch.xpack.esql.plan.EsqlStatement;
2424
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
25-
import org.elasticsearch.xpack.esql.session.Configuration;
2625
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
2726

2827
import java.util.BitSet;
@@ -100,41 +99,40 @@ public void setEsqlConfig(EsqlConfig config) {
10099
}
101100

102101
// testing utility
103-
public LogicalPlan createStatement(String query, Configuration configuration) {
104-
return createStatement(query, new QueryParams(), configuration);
102+
public LogicalPlan createStatement(String query) {
103+
return createStatement(query, new QueryParams());
105104
}
106105

107106
// testing utility
108-
public LogicalPlan createStatement(String query, QueryParams params, Configuration configuration) {
109-
return createStatement(query, params, new PlanTelemetry(new EsqlFunctionRegistry()), configuration);
107+
public LogicalPlan createStatement(String query, QueryParams params) {
108+
return createStatement(query, params, new PlanTelemetry(new EsqlFunctionRegistry()));
110109
}
111110

112-
public LogicalPlan createStatement(String query, QueryParams params, PlanTelemetry metrics, Configuration configuration) {
111+
public LogicalPlan createStatement(String query, QueryParams params, PlanTelemetry metrics) {
113112
if (log.isDebugEnabled()) {
114113
log.debug("Parsing as statement: {}", query);
115114
}
116-
return invokeParser(query, params, metrics, EsqlBaseParser::singleStatement, AstBuilder::plan, configuration);
115+
return invokeParser(query, params, metrics, EsqlBaseParser::singleStatement, AstBuilder::plan);
117116
}
118117

119118
// testing utility
120-
public EsqlStatement createQuery(String query, QueryParams params, Configuration configuration) {
121-
return createQuery(query, params, new PlanTelemetry(new EsqlFunctionRegistry()), configuration);
119+
public EsqlStatement createQuery(String query, QueryParams params) {
120+
return createQuery(query, params, new PlanTelemetry(new EsqlFunctionRegistry()));
122121
}
123122

124-
public EsqlStatement createQuery(String query, QueryParams params, PlanTelemetry metrics, Configuration configuration) {
123+
public EsqlStatement createQuery(String query, QueryParams params, PlanTelemetry metrics) {
125124
if (log.isDebugEnabled()) {
126125
log.debug("Parsing as statement: {}", query);
127126
}
128-
return invokeParser(query, params, metrics, EsqlBaseParser::statements, AstBuilder::statement, configuration);
127+
return invokeParser(query, params, metrics, EsqlBaseParser::statements, AstBuilder::statement);
129128
}
130129

131130
private <T> T invokeParser(
132131
String query,
133132
QueryParams params,
134133
PlanTelemetry metrics,
135134
Function<EsqlBaseParser, ParserRuleContext> parseFunction,
136-
BiFunction<AstBuilder, ParserRuleContext, T> result,
137-
Configuration configuration
135+
BiFunction<AstBuilder, ParserRuleContext, T> result
138136
) {
139137
if (query.length() > MAX_LENGTH) {
140138
throw new ParsingException("ESQL statement is too large [{} characters > {}]", query.length(), MAX_LENGTH);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/QuerySettings.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77

88
package org.elasticsearch.xpack.esql.plan;
99

10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.common.io.stream.Writeable;
1014
import org.elasticsearch.core.Nullable;
1115
import org.elasticsearch.transport.RemoteClusterService;
1216
import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
@@ -15,8 +19,11 @@
1519
import org.elasticsearch.xpack.esql.expression.Foldables;
1620
import org.elasticsearch.xpack.esql.parser.ParsingException;
1721

22+
import java.io.IOException;
1823
import java.time.ZoneId;
1924
import java.time.ZoneOffset;
25+
import java.util.HashMap;
26+
import java.util.Map;
2027
import java.util.function.Function;
2128

2229
public class QuerySettings {
@@ -78,6 +85,49 @@ public static void validate(EsqlStatement statement, RemoteClusterService cluste
7885
}
7986
}
8087

88+
public static QuerySettingsMap toMap(EsqlStatement statement) {
89+
var settings = new HashMap<String, Expression>();
90+
for (QuerySetting setting : statement.settings()) {
91+
settings.put(setting.name(), setting.value());
92+
}
93+
return new QuerySettingsMap(settings);
94+
}
95+
96+
public static class QuerySettingsMap implements Writeable {
97+
private static final TransportVersion ESQL_CONFIGURATION_QUERY_SETTINGS = TransportVersion.fromName(
98+
"esql_configuration_query_settings"
99+
);
100+
101+
private final Map<String, Expression> settings;
102+
103+
public QuerySettingsMap(Map<String, Expression> settings) {
104+
this.settings = settings;
105+
}
106+
107+
public QuerySettingsMap(StreamInput in) throws IOException {
108+
if (in.getTransportVersion().supports(ESQL_CONFIGURATION_QUERY_SETTINGS)) {
109+
this.settings = in.readMap(StreamInput::readString, r -> r.readNamedWriteable(Expression.class));
110+
} else {
111+
this.settings = Map.of();
112+
}
113+
}
114+
115+
@Override
116+
public void writeTo(StreamOutput out) throws IOException {
117+
if (out.getTransportVersion().supports(ESQL_CONFIGURATION_QUERY_SETTINGS)) {
118+
out.writeMap(settings, StreamOutput::writeString, StreamOutput::writeNamedWriteable);
119+
}
120+
}
121+
122+
public <T> T get(QuerySettingDef<T> def, RemoteClusterService clusterService) {
123+
Expression value = settings.get(def.name());
124+
if (value == null) {
125+
return def.defaultValueSupplier.apply(clusterService);
126+
}
127+
return def.get(value, clusterService);
128+
}
129+
}
130+
81131
/**
82132
* Definition of a query setting.
83133
*
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plugin;
9+
10+
public record EsqlQueryClusterSettings(
11+
int resultTruncationMaxSize,
12+
int resultTruncationDefaultSize,
13+
int timeseriesResultTruncationMaxSize,
14+
int timeseriesResultTruncationDefaultSize
15+
) {}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
4545
import org.elasticsearch.xpack.esql.action.EsqlQueryTask;
4646
import org.elasticsearch.xpack.esql.core.async.AsyncTaskManagementService;
47-
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
4847
import org.elasticsearch.xpack.esql.enrich.AbstractLookupService;
4948
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
5049
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
@@ -58,11 +57,9 @@
5857
import org.elasticsearch.xpack.esql.session.Result;
5958

6059
import java.io.IOException;
61-
import java.time.ZoneOffset;
6260
import java.util.ArrayList;
6361
import java.util.Collections;
6462
import java.util.List;
65-
import java.util.Locale;
6663
import java.util.Map;
6764
import java.util.concurrent.Executor;
6865
import java.util.concurrent.atomic.AtomicInteger;
@@ -240,29 +237,11 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
240237
request.allowPartialResults(defaultAllowPartialResults);
241238
}
242239
EsqlFlags flags = computeService.createFlags();
243-
Configuration configuration = new Configuration(
244-
ZoneOffset.UTC,
245-
request.locale() != null ? request.locale() : Locale.US,
246-
// TODO: plug-in security
247-
null,
248-
clusterService.getClusterName().value(),
249-
request.pragmas(),
250-
resultTruncationMaxSize,
251-
resultTruncationDefaultSize,
252-
request.query(),
253-
request.profile(),
254-
request.tables(),
255-
System.nanoTime(),
256-
request.allowPartialResults(),
257-
timeseriesResultTruncationMaxSize,
258-
timeseriesResultTruncationDefaultSize
259-
);
260240
String sessionId = sessionID(task);
261241
// async-query uses EsqlQueryTask, so pull the EsqlExecutionInfo out of the task
262242
// sync query uses CancellableTask which does not have EsqlExecutionInfo, so create one
263243
EsqlExecutionInfo executionInfo = getOrCreateExecutionInfo(task, request);
264-
FoldContext foldCtx = configuration.newFoldContext();
265-
PlanRunner planRunner = (plan, resultListener) -> computeService.execute(
244+
PlanRunner planRunner = (plan, configuration, foldCtx, resultListener) -> computeService.execute(
266245
sessionId,
267246
(CancellableTask) task,
268247
flags,
@@ -275,8 +254,12 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
275254
planExecutor.esql(
276255
request,
277256
sessionId,
278-
configuration,
279-
foldCtx,
257+
new EsqlQueryClusterSettings(
258+
resultTruncationMaxSize,
259+
resultTruncationDefaultSize,
260+
timeseriesResultTruncationMaxSize,
261+
timeseriesResultTruncationDefaultSize
262+
),
280263
enrichPolicyResolver,
281264
executionInfo,
282265
remoteClusterService,
@@ -285,7 +268,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
285268
ActionListener.wrap(result -> {
286269
recordCCSTelemetry(task, executionInfo, request, null);
287270
planExecutor.metrics().recordTook(executionInfo.overallTook().millis());
288-
var response = toResponse(task, request, configuration, result);
271+
var response = toResponse(task, request, request.profile(), result);
289272
assert response.isAsync() == request.async() : "The response must be async if the request was async";
290273

291274
if (response.isAsync()) {
@@ -385,7 +368,7 @@ private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) {
385368
);
386369
}
387370

388-
private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) {
371+
private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, boolean profileEnabled, Result result) {
389372
List<ColumnInfoImpl> columns = result.schema().stream().map(c -> {
390373
List<String> originalTypes;
391374
if (c instanceof UnsupportedAttribute ua) {
@@ -397,7 +380,7 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config
397380
}
398381
return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes);
399382
}).toList();
400-
EsqlQueryResponse.Profile profile = configuration.profile()
383+
EsqlQueryResponse.Profile profile = profileEnabled
401384
? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles())
402385
: null;
403386
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.compute.data.BlockStreamInput;
1717
import org.elasticsearch.xpack.esql.Column;
1818
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
19+
import org.elasticsearch.xpack.esql.plan.QuerySettings.QuerySettingsMap;
1920
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
2021

2122
import java.io.IOException;
@@ -47,6 +48,7 @@ public class Configuration implements Writeable {
4748
private final ZoneId zoneId;
4849

4950
private final QueryPragmas pragmas;
51+
private final QuerySettingsMap settings;
5052

5153
private final int resultTruncationMaxSizeRegular;
5254
private final int resultTruncationDefaultSizeRegular;
@@ -69,6 +71,7 @@ public Configuration(
6971
String username,
7072
String clusterName,
7173
QueryPragmas pragmas,
74+
QuerySettingsMap settings,
7275
int resultTruncationMaxSizeRegular,
7376
int resultTruncationDefaultSizeRegular,
7477
String query,
@@ -85,6 +88,7 @@ public Configuration(
8588
this.clusterName = clusterName;
8689
this.locale = locale;
8790
this.pragmas = pragmas;
91+
this.settings = settings;
8892
this.resultTruncationMaxSizeRegular = resultTruncationMaxSizeRegular;
8993
this.resultTruncationDefaultSizeRegular = resultTruncationDefaultSizeRegular;
9094
this.resultTruncationMaxSizeTimeseries = resultTruncationMaxSizeTimeseries;
@@ -122,6 +126,7 @@ public Configuration(BlockStreamInput in) throws IOException {
122126
this.resultTruncationMaxSizeTimeseries = this.resultTruncationMaxSizeRegular;
123127
this.resultTruncationDefaultSizeTimeseries = this.resultTruncationDefaultSizeRegular;
124128
}
129+
this.settings = new QuerySettingsMap(in);
125130
}
126131

127132
@Override
@@ -147,6 +152,7 @@ public void writeTo(StreamOutput out) throws IOException {
147152
out.writeVInt(resultTruncationMaxSizeTimeseries);
148153
out.writeVInt(resultTruncationDefaultSizeTimeseries);
149154
}
155+
settings.writeTo(out);
150156
}
151157

152158
public ZoneId zoneId() {
@@ -227,6 +233,7 @@ public Configuration withoutTables() {
227233
username,
228234
clusterName,
229235
pragmas,
236+
settings,
230237
resultTruncationMaxSizeRegular,
231238
resultTruncationDefaultSizeRegular,
232239
query,

0 commit comments

Comments
 (0)