Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.openjdk.jmh.infra.Blackhole;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -110,7 +111,7 @@ public void setup() {
new EnrichResolution(),
InferenceResolution.EMPTY
),
new Verifier(new Metrics(functionRegistry), new XPackLicenseState(() -> 0L))
new Verifier(List.of(), new Metrics(functionRegistry), new XPackLicenseState(() -> 0L))
);
defaultOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config, FoldContext.small()));
}
Expand Down
29 changes: 29 additions & 0 deletions x-pack/plugin/esql/qa/server/extra-checkers/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apply plugin: 'elasticsearch.internal-es-plugin'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should actually use elasticsearch.base-internal-es-plugin here, since this is a "test" plugin and not intended to be published or included in Elasticsearch.

apply plugin: 'elasticsearch.internal-java-rest-test'
apply plugin: org.elasticsearch.gradle.internal.precommit.CheckstylePrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.ForbiddenApisPrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.ForbiddenPatternsPrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.FilePermissionsPrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.LoggerUsagePrecommitPlugin
apply plugin: org.elasticsearch.gradle.internal.precommit.TestingConventionsPrecommitPlugin


esplugin {
name = 'extra-checkers'
description = 'An example plugin disallowing CATEGORIZE'
classname ='org.elasticsearch.xpack.esql.qa.extra.ExtraCheckersPlugin'
extendedPlugins = ['x-pack-esql']
}

dependencies {
compileOnly project(':x-pack:plugin:esql')
compileOnly project(':x-pack:plugin:esql-core')
clusterPlugins project(':x-pack:plugin:esql:qa:server:extra-checkers')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be unnecessary. The internal-es-plugin plugin should automatically add this.

}

tasks.named('javaRestTest') {
usesDefaultDistribution("to be triaged")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make a best effort to not use the default distribution here. We should avoid adding more tests that use this.

maxParallelForks = 1
jvmArgs('--add-opens=java.base/java.nio=ALL-UNNAMED')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this required?

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.extra;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.ClassRule;

import java.io.IOException;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class ExtraCheckersIT extends ESRestTestCase {
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.setting("xpack.security.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
.shared(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary. There's only one test in this project.

.plugin("extra-checkers")
.build();

public void testWithCategorize() {
ResponseException e = expectThrows(ResponseException.class, () -> runEsql("""
{
"query": "ROW message=\\"foo bar\\" | STATS COUNT(*) BY CATEGORIZE(message) | LIMIT 1"
}"""));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(e.getMessage(), containsString("line 1:43: CATEGORIZE is unsupported"));
}

public void testWithoutCategorize() throws IOException {
String result = runEsql("""
{
"query": "ROW message=\\"foo bar\\" | STATS COUNT(*) | LIMIT 1"
}""");
assertThat(result, containsString("""
"columns" : [
{
"name" : "COUNT(*)",
"type" : "long"
}
],
"values" : [
[
1
]
]
"""));
}

private String runEsql(String json) throws IOException {
Request request = new Request("POST", "/_query");
request.setJsonEntity(json);
request.addParameter("error_trace", "");
request.addParameter("pretty", "");
Response response = client().performRequest(request);
return EntityUtils.toString(response.getEntity());
}

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.extra;

import org.elasticsearch.xpack.esql.analysis.Verifier;
import org.elasticsearch.xpack.esql.common.Failure;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

import java.util.List;
import java.util.function.BiConsumer;

public class DisallowCategorize implements Verifier.ExtraCheckers {
@Override
public List<BiConsumer<LogicalPlan, Failures>> extra() {
return List.of(DisallowCategorize::disallowCategorize);
}

private static void disallowCategorize(LogicalPlan plan, Failures failures) {
if (plan instanceof Aggregate) {
plan.forEachExpression(Categorize.class, cat -> failures.add(new Failure(cat, "CATEGORIZE is unsupported")));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.extra;

import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.esql.analysis.Verifier;

/**
* Marker plugin to enable {@link Verifier.ExtraCheckers}.
*/
public class ExtraCheckersPlugin extends Plugin {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just because ES requires our plugin to include something that extends Plugin even if we're just providing stuff loaded by SPI?

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
#

org.elasticsearch.xpack.esql.qa.extra.DisallowCategorize
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,11 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
return new LogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small());
}

public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));
public static final Verifier TEST_VERIFIER = new Verifier(
List.of(),
new Metrics(new EsqlFunctionRegistry()),
new XPackLicenseState(() -> 0L)
);

public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
mock(TransportService.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,23 @@
* step does type resolution and fails queries based on invalid type expressions.
*/
public class Verifier {
public interface ExtraCheckers {
/**
* Build a list of checks to perform on the plan. Each one is called once per
* {@link LogicalPlan} node in the plan.
*/
List<BiConsumer<LogicalPlan, Failures>> extra();
}

/**
* Extra plan verification checks defined in plugins.
*/
private final List<ExtraCheckers> extraCheckers;
private final Metrics metrics;
private final XPackLicenseState licenseState;

public Verifier(Metrics metrics, XPackLicenseState licenseState) {
public Verifier(List<ExtraCheckers> extraCheckers, Metrics metrics, XPackLicenseState licenseState) {
this.extraCheckers = extraCheckers;
this.metrics = metrics;
this.licenseState = licenseState;
}
Expand All @@ -84,6 +96,9 @@ Collection<Failure> verify(LogicalPlan plan, BitSet partialMetrics) {

// collect plan checkers
var planCheckers = planCheckers(plan);
for (ExtraCheckers e : extraCheckers) {
planCheckers.addAll(e.extra());
}

// Concrete verifications
plan.forEachDown(p -> {
Expand Down Expand Up @@ -186,6 +201,9 @@ else if (p instanceof Lookup lookup) {
});
}

/**
* Build a list of checkers based on the components in the plan.
*/
private static List<BiConsumer<LogicalPlan, Failures>> planCheckers(LogicalPlan plan) {
List<BiConsumer<LogicalPlan, Failures>> planCheckers = new ArrayList<>();
Consumer<? super Node<?>> collectPlanCheckers = p -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetryManager;
import org.elasticsearch.xpack.esql.telemetry.QueryMetric;

import java.util.List;

import static org.elasticsearch.action.ActionListener.wrap;

public class PlanExecutor {
Expand All @@ -45,13 +47,19 @@ public class PlanExecutor {
private final PlanTelemetryManager planTelemetryManager;
private final EsqlQueryLog queryLog;

public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry, XPackLicenseState licenseState, EsqlQueryLog queryLog) {
public PlanExecutor(
IndexResolver indexResolver,
MeterRegistry meterRegistry,
XPackLicenseState licenseState,
EsqlQueryLog queryLog,
List<Verifier.ExtraCheckers> extraCheckers
) {
this.indexResolver = indexResolver;
this.preAnalyzer = new PreAnalyzer();
this.functionRegistry = new EsqlFunctionRegistry();
this.mapper = new Mapper();
this.metrics = new Metrics(functionRegistry);
this.verifier = new Verifier(metrics, licenseState);
this.verifier = new Verifier(extraCheckers, metrics, licenseState);
this.planTelemetryManager = new PlanTelemetryManager(meterRegistry);
this.queryLog = queryLog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand All @@ -66,6 +67,7 @@
import org.elasticsearch.xpack.esql.action.RestEsqlListQueriesAction;
import org.elasticsearch.xpack.esql.action.RestEsqlQueryAction;
import org.elasticsearch.xpack.esql.action.RestEsqlStopAsyncAction;
import org.elasticsearch.xpack.esql.analysis.Verifier;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupOperator;
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator;
import org.elasticsearch.xpack.esql.execution.PlanExecutor;
Expand All @@ -83,7 +85,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

public class EsqlPlugin extends Plugin implements ActionPlugin {
public class EsqlPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin {
public static final boolean INLINESTATS_FEATURE_FLAG = new FeatureFlag("esql_inlinestats").isEnabled();

public static final String ESQL_WORKER_THREAD_POOL_NAME = "esql_worker";
Expand Down Expand Up @@ -187,6 +189,8 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
Setting.Property.Dynamic
);

private final List<Verifier.ExtraCheckers> extraCheckers = new ArrayList<>();

@Override
public Collection<?> createComponents(PluginServices services) {
CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request");
Expand All @@ -204,7 +208,8 @@ public Collection<?> createComponents(PluginServices services) {
new IndexResolver(services.client()),
services.telemetryProvider().getMeterRegistry(),
getLicenseState(),
new EsqlQueryLog(services.clusterService().getClusterSettings(), services.slowLogFieldProvider())
new EsqlQueryLog(services.clusterService().getClusterSettings(), services.slowLogFieldProvider()),
extraCheckers
),
new ExchangeService(
services.clusterService().getSettings(),
Expand Down Expand Up @@ -335,4 +340,9 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
)
);
}

@Override
public void loadExtensions(ExtensionLoader loader) {
extraCheckers.addAll(loader.loadExtensions(Verifier.ExtraCheckers.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private static Analyzer analyzer(EsqlFunctionRegistry registry, License.Operatio
defaultEnrichResolution(),
emptyInferenceResolution()
),
new Verifier(new Metrics(new EsqlFunctionRegistry()), getLicenseState(operationMode))
new Verifier(List.of(), new Metrics(new EsqlFunctionRegistry()), getLicenseState(operationMode))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private Analyzer makeAnalyzer(String mappingFileName, EnrichResolution enrichRes
enrichResolution,
emptyInferenceResolution()
),
new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L))
new Verifier(List.of(), new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private static Analyzer makeAnalyzer(String mappingFileName) {
emptyPolicyResolution(),
emptyInferenceResolution()
),
new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L))
new Verifier(List.of(), new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void testFailedMetric() {
return null;
}).when(esqlClient).execute(eq(EsqlResolveFieldsAction.TYPE), any(), any());

var planExecutor = new PlanExecutor(indexResolver, MeterRegistry.NOOP, new XPackLicenseState(() -> 0L), mockQueryLog());
var planExecutor = new PlanExecutor(indexResolver, MeterRegistry.NOOP, new XPackLicenseState(() -> 0L), mockQueryLog(), List.of());
var enrichResolver = mockEnrichResolver();

var request = new EsqlQueryRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void testTwoWhereQuery() {

public void testTwoQueriesExecuted() {
Metrics metrics = new Metrics(new EsqlFunctionRegistry());
Verifier verifier = new Verifier(metrics, new XPackLicenseState(() -> 0L));
Verifier verifier = new Verifier(List.of(), metrics, new XPackLicenseState(() -> 0L));
esqlWithVerifier("""
from employees
| where languages > 2
Expand Down Expand Up @@ -253,7 +253,7 @@ public void testTwoQueriesExecuted() {

public void testMultipleFunctions() {
Metrics metrics = new Metrics(new EsqlFunctionRegistry());
Verifier verifier = new Verifier(metrics, new XPackLicenseState(() -> 0L));
Verifier verifier = new Verifier(List.of(), metrics, new XPackLicenseState(() -> 0L));
esqlWithVerifier("""
from employees
| where languages > 2
Expand Down Expand Up @@ -552,7 +552,7 @@ private Counters esql(String esql, Verifier v) {
Metrics metrics = null;
if (v == null) {
metrics = new Metrics(new EsqlFunctionRegistry());
verifier = new Verifier(metrics, new XPackLicenseState(() -> 0L));
verifier = new Verifier(List.of(), metrics, new XPackLicenseState(() -> 0L));
}
analyzer(verifier).analyze(parser.createStatement(esql));

Expand Down