Skip to content

Commit e48139f

Browse files
authored
ESQL: Add extension point for extra plan checks (#128840)
1 parent 94c63ca commit e48139f

File tree

9 files changed

+210
-5
lines changed

9 files changed

+210
-5
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
apply plugin: 'elasticsearch.internal-es-plugin'
2+
apply plugin: 'elasticsearch.internal-java-rest-test'
3+
apply plugin: org.elasticsearch.gradle.internal.precommit.CheckstylePrecommitPlugin
4+
apply plugin: org.elasticsearch.gradle.internal.precommit.ForbiddenApisPrecommitPlugin
5+
apply plugin: org.elasticsearch.gradle.internal.precommit.ForbiddenPatternsPrecommitPlugin
6+
apply plugin: org.elasticsearch.gradle.internal.precommit.FilePermissionsPrecommitPlugin
7+
apply plugin: org.elasticsearch.gradle.internal.precommit.LoggerUsagePrecommitPlugin
8+
apply plugin: org.elasticsearch.gradle.internal.precommit.TestingConventionsPrecommitPlugin
9+
10+
11+
esplugin {
12+
name = 'extra-checkers'
13+
description = 'An example plugin disallowing CATEGORIZE'
14+
classname ='org.elasticsearch.xpack.esql.qa.extra.ExtraCheckersPlugin'
15+
extendedPlugins = ['x-pack-esql']
16+
}
17+
18+
dependencies {
19+
compileOnly project(':x-pack:plugin:esql')
20+
compileOnly project(':x-pack:plugin:esql-core')
21+
clusterPlugins project(':x-pack:plugin:esql:qa:server:extra-checkers')
22+
}
23+
24+
tasks.named('javaRestTest') {
25+
usesDefaultDistribution("to be triaged")
26+
maxParallelForks = 1
27+
jvmArgs('--add-opens=java.base/java.nio=ALL-UNNAMED')
28+
}
29+
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.qa.extra;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
11+
12+
import org.apache.http.util.EntityUtils;
13+
import org.elasticsearch.client.Request;
14+
import org.elasticsearch.client.Response;
15+
import org.elasticsearch.client.ResponseException;
16+
import org.elasticsearch.test.TestClustersThreadFilter;
17+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
18+
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
19+
import org.elasticsearch.test.rest.ESRestTestCase;
20+
import org.junit.ClassRule;
21+
22+
import java.io.IOException;
23+
24+
import static org.hamcrest.Matchers.containsString;
25+
import static org.hamcrest.Matchers.equalTo;
26+
27+
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
28+
public class ExtraCheckersIT extends ESRestTestCase {
29+
@ClassRule
30+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
31+
.distribution(DistributionType.DEFAULT)
32+
.setting("xpack.security.enabled", "false")
33+
.setting("xpack.license.self_generated.type", "trial")
34+
.shared(true)
35+
.plugin("extra-checkers")
36+
.build();
37+
38+
public void testWithCategorize() {
39+
ResponseException e = expectThrows(ResponseException.class, () -> runEsql("""
40+
{
41+
"query": "ROW message=\\"foo bar\\" | STATS COUNT(*) BY CATEGORIZE(message) | LIMIT 1"
42+
}"""));
43+
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
44+
assertThat(e.getMessage(), containsString("line 1:43: CATEGORIZE is unsupported"));
45+
}
46+
47+
public void testWithoutCategorize() throws IOException {
48+
String result = runEsql("""
49+
{
50+
"query": "ROW message=\\"foo bar\\" | STATS COUNT(*) | LIMIT 1"
51+
}""");
52+
assertThat(result, containsString("""
53+
"columns" : [
54+
{
55+
"name" : "COUNT(*)",
56+
"type" : "long"
57+
}
58+
],
59+
"values" : [
60+
[
61+
1
62+
]
63+
]
64+
"""));
65+
}
66+
67+
private String runEsql(String json) throws IOException {
68+
Request request = new Request("POST", "/_query");
69+
request.setJsonEntity(json);
70+
request.addParameter("error_trace", "");
71+
request.addParameter("pretty", "");
72+
Response response = client().performRequest(request);
73+
return EntityUtils.toString(response.getEntity());
74+
}
75+
76+
@Override
77+
protected String getTestRestCluster() {
78+
return cluster.getHttpAddresses();
79+
}
80+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.qa.extra;
9+
10+
import org.elasticsearch.xpack.esql.analysis.Verifier;
11+
import org.elasticsearch.xpack.esql.common.Failure;
12+
import org.elasticsearch.xpack.esql.common.Failures;
13+
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
14+
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
15+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
16+
17+
import java.util.List;
18+
import java.util.function.BiConsumer;
19+
20+
public class DisallowCategorize implements Verifier.ExtraCheckers {
21+
@Override
22+
public List<BiConsumer<LogicalPlan, Failures>> extra() {
23+
return List.of(DisallowCategorize::disallowCategorize);
24+
}
25+
26+
private static void disallowCategorize(LogicalPlan plan, Failures failures) {
27+
if (plan instanceof Aggregate) {
28+
plan.forEachExpression(Categorize.class, cat -> failures.add(new Failure(cat, "CATEGORIZE is unsupported")));
29+
}
30+
}
31+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.qa.extra;
9+
10+
import org.elasticsearch.plugins.Plugin;
11+
import org.elasticsearch.xpack.esql.analysis.Verifier;
12+
13+
/**
14+
* Marker plugin to enable {@link Verifier.ExtraCheckers}.
15+
*/
16+
public class ExtraCheckersPlugin extends Plugin {}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#
2+
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
# or more contributor license agreements. Licensed under the Elastic License
4+
# 2.0; you may not use this file except in compliance with the Elastic License
5+
# 2.0.
6+
#
7+
8+
org.elasticsearch.xpack.esql.qa.extra.DisallowCategorize

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.ArrayList;
4040
import java.util.BitSet;
4141
import java.util.Collection;
42+
import java.util.Collections;
4243
import java.util.HashSet;
4344
import java.util.List;
4445
import java.util.Set;
@@ -54,13 +55,29 @@
5455
* step does type resolution and fails queries based on invalid type expressions.
5556
*/
5657
public class Verifier {
58+
public interface ExtraCheckers {
59+
/**
60+
* Build a list of checks to perform on the plan. Each one is called once per
61+
* {@link LogicalPlan} node in the plan.
62+
*/
63+
List<BiConsumer<LogicalPlan, Failures>> extra();
64+
}
5765

66+
/**
67+
* Extra plan verification checks defined in plugins.
68+
*/
69+
private final List<ExtraCheckers> extraCheckers;
5870
private final Metrics metrics;
5971
private final XPackLicenseState licenseState;
6072

6173
public Verifier(Metrics metrics, XPackLicenseState licenseState) {
74+
this(metrics, licenseState, Collections.emptyList());
75+
}
76+
77+
public Verifier(Metrics metrics, XPackLicenseState licenseState, List<ExtraCheckers> extraCheckers) {
6278
this.metrics = metrics;
6379
this.licenseState = licenseState;
80+
this.extraCheckers = extraCheckers;
6481
}
6582

6683
/**
@@ -84,6 +101,9 @@ Collection<Failure> verify(LogicalPlan plan, BitSet partialMetrics) {
84101

85102
// collect plan checkers
86103
var planCheckers = planCheckers(plan);
104+
for (ExtraCheckers e : extraCheckers) {
105+
planCheckers.addAll(e.extra());
106+
}
87107

88108
// Concrete verifications
89109
plan.forEachDown(p -> {
@@ -186,6 +206,9 @@ else if (p instanceof Lookup lookup) {
186206
});
187207
}
188208

209+
/**
210+
* Build a list of checkers based on the components in the plan.
211+
*/
189212
private static List<BiConsumer<LogicalPlan, Failures>> planCheckers(LogicalPlan plan) {
190213
List<BiConsumer<LogicalPlan, Failures>> planCheckers = new ArrayList<>();
191214
Consumer<? super Node<?>> collectPlanCheckers = p -> {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetryManager;
3333
import org.elasticsearch.xpack.esql.telemetry.QueryMetric;
3434

35+
import java.util.List;
36+
3537
import static org.elasticsearch.action.ActionListener.wrap;
3638

3739
public class PlanExecutor {
@@ -45,13 +47,19 @@ public class PlanExecutor {
4547
private final PlanTelemetryManager planTelemetryManager;
4648
private final EsqlQueryLog queryLog;
4749

48-
public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry, XPackLicenseState licenseState, EsqlQueryLog queryLog) {
50+
public PlanExecutor(
51+
IndexResolver indexResolver,
52+
MeterRegistry meterRegistry,
53+
XPackLicenseState licenseState,
54+
EsqlQueryLog queryLog,
55+
List<Verifier.ExtraCheckers> extraCheckers
56+
) {
4957
this.indexResolver = indexResolver;
5058
this.preAnalyzer = new PreAnalyzer();
5159
this.functionRegistry = new EsqlFunctionRegistry();
5260
this.mapper = new Mapper();
5361
this.metrics = new Metrics(functionRegistry);
54-
this.verifier = new Verifier(metrics, licenseState);
62+
this.verifier = new Verifier(metrics, licenseState, extraCheckers);
5563
this.planTelemetryManager = new PlanTelemetryManager(meterRegistry);
5664
this.queryLog = queryLog;
5765
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.features.NodeFeature;
4242
import org.elasticsearch.license.XPackLicenseState;
4343
import org.elasticsearch.plugins.ActionPlugin;
44+
import org.elasticsearch.plugins.ExtensiblePlugin;
4445
import org.elasticsearch.plugins.Plugin;
4546
import org.elasticsearch.rest.RestController;
4647
import org.elasticsearch.rest.RestHandler;
@@ -66,6 +67,7 @@
6667
import org.elasticsearch.xpack.esql.action.RestEsqlListQueriesAction;
6768
import org.elasticsearch.xpack.esql.action.RestEsqlQueryAction;
6869
import org.elasticsearch.xpack.esql.action.RestEsqlStopAsyncAction;
70+
import org.elasticsearch.xpack.esql.analysis.Verifier;
6971
import org.elasticsearch.xpack.esql.enrich.EnrichLookupOperator;
7072
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator;
7173
import org.elasticsearch.xpack.esql.execution.PlanExecutor;
@@ -83,7 +85,7 @@
8385
import java.util.function.Predicate;
8486
import java.util.function.Supplier;
8587

86-
public class EsqlPlugin extends Plugin implements ActionPlugin {
88+
public class EsqlPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin {
8789
public static final boolean INLINESTATS_FEATURE_FLAG = new FeatureFlag("esql_inlinestats").isEnabled();
8890

8991
public static final String ESQL_WORKER_THREAD_POOL_NAME = "esql_worker";
@@ -187,6 +189,8 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
187189
Setting.Property.Dynamic
188190
);
189191

192+
private final List<Verifier.ExtraCheckers> extraCheckers = new ArrayList<>();
193+
190194
@Override
191195
public Collection<?> createComponents(PluginServices services) {
192196
CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request");
@@ -204,7 +208,8 @@ public Collection<?> createComponents(PluginServices services) {
204208
new IndexResolver(services.client()),
205209
services.telemetryProvider().getMeterRegistry(),
206210
getLicenseState(),
207-
new EsqlQueryLog(services.clusterService().getClusterSettings(), services.slowLogFieldProvider())
211+
new EsqlQueryLog(services.clusterService().getClusterSettings(), services.slowLogFieldProvider()),
212+
extraCheckers
208213
),
209214
new ExchangeService(
210215
services.clusterService().getSettings(),
@@ -335,4 +340,9 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
335340
)
336341
);
337342
}
343+
344+
@Override
345+
public void loadExtensions(ExtensionLoader loader) {
346+
extraCheckers.addAll(loader.loadExtensions(Verifier.ExtraCheckers.class));
347+
}
338348
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void testFailedMetric() {
150150
return null;
151151
}).when(esqlClient).execute(eq(EsqlResolveFieldsAction.TYPE), any(), any());
152152

153-
var planExecutor = new PlanExecutor(indexResolver, MeterRegistry.NOOP, new XPackLicenseState(() -> 0L), mockQueryLog());
153+
var planExecutor = new PlanExecutor(indexResolver, MeterRegistry.NOOP, new XPackLicenseState(() -> 0L), mockQueryLog(), List.of());
154154
var enrichResolver = mockEnrichResolver();
155155

156156
var request = new EsqlQueryRequest();

0 commit comments

Comments
 (0)