diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java index c27d0a6fbb865..791f674a8d4bb 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java @@ -533,7 +533,7 @@ public Comparator comparator() { } } - record ActualResults( + public record ActualResults( List columnNames, List columnTypes, List dataTypes, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 44466cebb7dac..5f6ea6c1457a7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -11,32 +11,15 @@ import org.elasticsearch.Build; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.common.Randomness; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.logging.HeaderWarning; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.MockBigArrays; -import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.iterable.Iterables; -import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.operator.Driver; -import org.elasticsearch.compute.operator.DriverRunner; -import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; -import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.core.Releasables; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.Tuple; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; -import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.TestThreadPool; @@ -46,55 +29,25 @@ import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.esql.CsvTestUtils.ActualResults; -import org.elasticsearch.xpack.esql.CsvTestUtils.Type; import org.elasticsearch.xpack.esql.action.EsqlCapabilities; -import org.elasticsearch.xpack.esql.analysis.Analyzer; -import org.elasticsearch.xpack.esql.analysis.AnalyzerContext; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; import org.elasticsearch.xpack.esql.analysis.PreAnalyzer; import org.elasticsearch.xpack.esql.core.CsvSpecReader; import org.elasticsearch.xpack.esql.core.SpecReader; -import org.elasticsearch.xpack.esql.core.expression.Expressions; -import org.elasticsearch.xpack.esql.core.expression.function.FunctionRegistry; import org.elasticsearch.xpack.esql.core.index.EsIndex; import org.elasticsearch.xpack.esql.core.index.IndexResolution; import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan; -import org.elasticsearch.xpack.esql.enrich.EnrichLookupService; import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; -import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; -import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext; -import org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer; -import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; -import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; -import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; -import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext; -import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer; -import org.elasticsearch.xpack.esql.optimizer.TestLocalPhysicalPlanOptimizer; -import org.elasticsearch.xpack.esql.optimizer.TestPhysicalPlanOptimizer; import org.elasticsearch.xpack.esql.parser.EsqlParser; import org.elasticsearch.xpack.esql.plan.logical.Enrich; -import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; -import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; -import org.elasticsearch.xpack.esql.plan.physical.OutputExec; -import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; -import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; -import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlan; -import org.elasticsearch.xpack.esql.planner.Mapper; -import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.TestPhysicalOperationProviders; import org.elasticsearch.xpack.esql.plugin.EsqlFeatures; -import org.elasticsearch.xpack.esql.plugin.QueryPragmas; -import org.elasticsearch.xpack.esql.session.EsqlConfiguration; -import org.elasticsearch.xpack.esql.stats.DisabledSearchStats; -import org.elasticsearch.xpack.esql.type.EsqlDataTypes; import org.junit.After; import org.junit.Before; -import org.mockito.Mockito; import java.io.IOException; import java.net.URL; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -107,7 +60,6 @@ import static org.elasticsearch.xpack.esql.CsvTestUtils.loadCsvSpecValues; import static org.elasticsearch.xpack.esql.CsvTestUtils.loadPageFromCsv; import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP; -import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources; import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.cap; @@ -140,16 +92,11 @@ * * When we add support for more field types, CsvTests should change to support the new Block types. Same goes for employees.csv file * (the schema needs adjustment) and the mapping-default.json file (to add or change an existing field). - * When we add more operators, optimization rules to the logical or physical plan optimizers, there may be the need to change the operators - * in TestPhysicalOperationProviders or adjust TestPhysicalPlanOptimizer. For example, the TestPhysicalPlanOptimizer is skipping any - * rules that push operations to ES itself (a Limit for example). The TestPhysicalOperationProviders is a bit more complicated than that: - * it’s creating its own Source physical operator, aggregation operator (just a tiny bit of it) and field extract operator. * * To log the results logResults() should return "true". */ // @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.compute:TRACE", reason = "debug") public class CsvTests extends ESTestCase { - private static final Logger LOGGER = LogManager.getLogger(CsvTests.class); private final String fileName; @@ -157,16 +104,9 @@ public class CsvTests extends ESTestCase { private final String testName; private final Integer lineNumber; private final CsvSpecReader.CsvTestCase testCase; - - private final EsqlConfiguration configuration = EsqlTestUtils.configuration( - new QueryPragmas(Settings.builder().put("page_size", randomPageSize()).build()) - ); - private final FunctionRegistry functionRegistry = new EsqlFunctionRegistry(); private final EsqlParser parser = new EsqlParser(); - private final Mapper mapper = new Mapper(functionRegistry); - private final PhysicalPlanOptimizer physicalPlanOptimizer = new TestPhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)); private ThreadPool threadPool; - private Executor executor; + private TestQueryRunner queryRunner; @ParametersFactory(argumentFormatting = "%2$s.%3$s") public static List readScriptSpec() throws Exception { @@ -178,6 +118,7 @@ public static List readScriptSpec() throws Exception { @Before public void setUp() throws Exception { super.setUp(); + Executor executor; if (randomBoolean()) { int numThreads = randomBoolean() ? 1 : between(2, 16); threadPool = new TestThreadPool( @@ -190,6 +131,7 @@ public void setUp() throws Exception { executor = threadPool.executor(ThreadPool.Names.SEARCH); } HeaderWarning.setThreadContext(threadPool.getThreadContext()); + queryRunner = new TestQueryRunner(threadPool, executor); } @After @@ -203,14 +145,6 @@ public void tearDown() throws Exception { super.tearDown(); } - private int randomPageSize() { - if (randomBoolean()) { - return between(1, 16); - } else { - return between(1, 16 * 1024); - } - } - public CsvTests(String fileName, String groupName, String testName, Integer lineNumber, CsvSpecReader.CsvTestCase testCase) { this.fileName = fileName; this.groupName = groupName; @@ -255,8 +189,12 @@ public boolean logResults() { } private void doTest() throws Exception { - BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); - var actualResults = executePlan(bigArrays); + var parsed = parser.createStatement(testCase.query); + var testDataset = testsDataset(parsed); + var indexResolution = loadIndexResolution(testDataset.mappingFileName(), testDataset.indexName()); + var enrichPolicies = loadEnrichPolicies(); + + var actualResults = queryRunner.executePlan(parsed, testOperationProviders(testDataset), indexResolution, enrichPolicies); try { var expected = loadCsvSpecValues(testCase.expectedResults); @@ -265,7 +203,7 @@ private void doTest() throws Exception { assertWarnings(actualResults.responseHeaders().getOrDefault("Warning", List.of())); } finally { Releasables.close(() -> Iterators.map(actualResults.pages().iterator(), p -> p::releaseBlocks)); - assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + assertThat(queryRunner.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); } } @@ -319,18 +257,6 @@ private static EnrichPolicy loadEnrichPolicyMapping(String policyFileName) { } } - private PhysicalPlan physicalPlan(LogicalPlan parsed, CsvTestsDataLoader.TestsDataset dataset) { - var indexResolution = loadIndexResolution(dataset.mappingFileName(), dataset.indexName()); - var enrichPolicies = loadEnrichPolicies(); - var analyzer = new Analyzer(new AnalyzerContext(configuration, functionRegistry, indexResolution, enrichPolicies), TEST_VERIFIER); - var analyzed = analyzer.analyze(parsed); - var logicalOptimized = new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration)).optimize(analyzed); - var physicalPlan = mapper.map(logicalOptimized); - var optimizedPlan = EstimatesRowSize.estimateRowSize(0, physicalPlanOptimizer.optimize(physicalPlan)); - opportunisticallyAssertPlanSerialization(physicalPlan, optimizedPlan); // comment out to disable serialization - return optimizedPlan; - } - private static CsvTestsDataLoader.TestsDataset testsDataset(LogicalPlan parsed) { var preAnalysis = new PreAnalyzer().preAnalyze(parsed); var indices = preAnalysis.indices; @@ -357,102 +283,6 @@ private static TestPhysicalOperationProviders testOperationProviders(CsvTestsDat return new TestPhysicalOperationProviders(testData.v1(), testData.v2()); } - private ActualResults executePlan(BigArrays bigArrays) throws Exception { - var parsed = parser.createStatement(testCase.query); - var testDataset = testsDataset(parsed); - - String sessionId = "csv-test"; - BlockFactory blockFactory = new BlockFactory( - bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), - bigArrays, - ByteSizeValue.ofBytes(randomLongBetween(1, BlockFactory.DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE.getBytes() * 2)) - ); - ExchangeSourceHandler exchangeSource = new ExchangeSourceHandler(between(1, 64), executor); - ExchangeSinkHandler exchangeSink = new ExchangeSinkHandler(blockFactory, between(1, 64), threadPool::relativeTimeInMillis); - LocalExecutionPlanner executionPlanner = new LocalExecutionPlanner( - sessionId, - "", - new CancellableTask(1, "transport", "esql", null, TaskId.EMPTY_TASK_ID, Map.of()), - bigArrays, - blockFactory, - randomNodeSettings(), - configuration, - exchangeSource, - exchangeSink, - Mockito.mock(EnrichLookupService.class), - testOperationProviders(testDataset) - ); - // - // Keep in sync with ComputeService#execute - // - PhysicalPlan physicalPlan = physicalPlan(parsed, testDataset); - Tuple coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode( - physicalPlan, - configuration - ); - PhysicalPlan coordinatorPlan = coordinatorAndDataNodePlan.v1(); - PhysicalPlan dataNodePlan = coordinatorAndDataNodePlan.v2(); - - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Coordinator plan\n" + coordinatorPlan); - LOGGER.trace("DataNode plan\n" + dataNodePlan); - } - - List columnNames = Expressions.names(coordinatorPlan.output()); - List dataTypes = new ArrayList<>(columnNames.size()); - List columnTypes = coordinatorPlan.output() - .stream() - .peek(o -> dataTypes.add(EsqlDataTypes.outputType(o.dataType()))) - .map(o -> Type.asType(o.dataType().nameUpper())) - .toList(); - - List drivers = new ArrayList<>(); - List collectedPages = Collections.synchronizedList(new ArrayList<>()); - - // replace fragment inside the coordinator plan - try { - LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan(new OutputExec(coordinatorPlan, collectedPages::add)); - drivers.addAll(coordinatorNodeExecutionPlan.createDrivers(sessionId)); - if (dataNodePlan != null) { - var searchStats = new DisabledSearchStats(); - var logicalTestOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, searchStats)); - var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(configuration, searchStats) - ); - - var csvDataNodePhysicalPlan = PlannerUtils.localPlan(dataNodePlan, logicalTestOptimizer, physicalTestOptimizer); - exchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, randomIntBetween(1, 3)); - LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan(csvDataNodePhysicalPlan); - drivers.addAll(dataNodeExecutionPlan.createDrivers(sessionId)); - Randomness.shuffle(drivers); - } - // Execute the driver - DriverRunner runner = new DriverRunner(threadPool.getThreadContext()) { - @Override - protected void start(Driver driver, ActionListener driverListener) { - Driver.start(threadPool.getThreadContext(), executor, driver, between(1, 1000), driverListener); - } - }; - PlainActionFuture future = new PlainActionFuture<>(); - runner.runToCompletion(drivers, ActionListener.releaseAfter(future, () -> Releasables.close(drivers)).map(ignore -> { - var responseHeaders = threadPool.getThreadContext().getResponseHeaders(); - return new ActualResults(columnNames, columnTypes, dataTypes, collectedPages, responseHeaders); - })); - return future.actionGet(TimeValue.timeValueSeconds(30)); - } finally { - Releasables.close(() -> Releasables.close(drivers)); - } - } - - private Settings randomNodeSettings() { - Settings.Builder builder = Settings.builder(); - if (randomBoolean()) { - builder.put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_SIZE_SETTING, ByteSizeValue.ofBytes(randomIntBetween(0, 4096))); - builder.put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_MAX_SIZE_SETTING, ByteSizeValue.ofBytes(randomIntBetween(0, 16 * 1024))); - } - return builder.build(); - } - private Throwable reworkException(Throwable th) { StackTraceElement[] stackTrace = th.getStackTrace(); StackTraceElement[] redone = new StackTraceElement[stackTrace.length + 1]; @@ -463,20 +293,6 @@ private Throwable reworkException(Throwable th) { return th; } - // Asserts that the serialization and deserialization of the plan creates an equivalent plan. - private void opportunisticallyAssertPlanSerialization(PhysicalPlan... plans) { - for (var plan : plans) { - var tmp = plan; - do { - if (tmp instanceof LocalSourceExec) { - return; // skip plans with localSourceExec - } - } while (tmp.children().isEmpty() == false && (tmp = tmp.children().get(0)) != null); - - SerializationTestUtils.assertSerialization(plan, configuration); - } - } - private void assertWarnings(List warnings) { List normalized = new ArrayList<>(warnings.size()); for (String w : warnings) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/TestQueryRunner.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/TestQueryRunner.java new file mode 100644 index 0000000000000..a6924557991df --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/TestQueryRunner.java @@ -0,0 +1,246 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.Driver; +import org.elasticsearch.compute.operator.DriverRunner; +import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; +import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.esql.CsvTestUtils.ActualResults; +import org.elasticsearch.xpack.esql.CsvTestUtils.Type; +import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.analysis.AnalyzerContext; +import org.elasticsearch.xpack.esql.analysis.EnrichResolution; +import org.elasticsearch.xpack.esql.core.expression.Expressions; +import org.elasticsearch.xpack.esql.core.expression.function.FunctionRegistry; +import org.elasticsearch.xpack.esql.core.index.IndexResolution; +import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.enrich.EnrichLookupService; +import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; +import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.TestLocalPhysicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.TestPhysicalPlanOptimizer; +import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; +import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; +import org.elasticsearch.xpack.esql.plan.physical.OutputExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders; +import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; +import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlan; +import org.elasticsearch.xpack.esql.planner.Mapper; +import org.elasticsearch.xpack.esql.planner.PlannerUtils; +import org.elasticsearch.xpack.esql.planner.TestPhysicalOperationProviders; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.elasticsearch.xpack.esql.session.EsqlConfiguration; +import org.elasticsearch.xpack.esql.stats.DisabledSearchStats; +import org.elasticsearch.xpack.esql.type.EsqlDataTypes; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; + +/** + * Executes a query without creating a full ES instance. To account for this, uses specialized {@link TestPhysicalOperationProviders} and + * a{@link TestPhysicalPlanOptimizer}. + * + * When we add more operators, optimization rules to the logical or physical plan optimizers, there may be the need to change the operators + * in TestPhysicalOperationProviders or adjust TestPhysicalPlanOptimizer. For example, the TestPhysicalPlanOptimizer is skipping any + * rules that push operations to ES itself (a Limit for example). The TestPhysicalOperationProviders is a bit more complicated than that: + * it’s creating its own Source physical operator, aggregation operator (just a tiny bit of it) and field extract operator. + */ +public class TestQueryRunner { + private static final Logger LOGGER = LogManager.getLogger(TestQueryRunner.class); + private final EsqlConfiguration configuration = EsqlTestUtils.configuration( + new QueryPragmas(Settings.builder().put("page_size", randomPageSize()).build()) + ); + private final FunctionRegistry functionRegistry = new EsqlFunctionRegistry(); + private final Mapper mapper = new Mapper(functionRegistry); + private final PhysicalPlanOptimizer physicalPlanOptimizer = new TestPhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)); + private final ThreadPool threadPool; + private final Executor executor; + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); + + public TestQueryRunner(ThreadPool threadPool, Executor executor) { + this.threadPool = threadPool; + this.executor = executor; + } + + private int randomPageSize() { + if (ESTestCase.randomBoolean()) { + return ESTestCase.between(1, 16); + } else { + return ESTestCase.between(1, 16 * 1024); + } + } + + private PhysicalPlan physicalPlan(LogicalPlan parsed, IndexResolution indexResolution, EnrichResolution enrichPolicies) { + var analyzer = new Analyzer(new AnalyzerContext(configuration, functionRegistry, indexResolution, enrichPolicies), TEST_VERIFIER); + var analyzed = analyzer.analyze(parsed); + var logicalOptimized = new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration)).optimize(analyzed); + var physicalPlan = mapper.map(logicalOptimized); + var optimizedPlan = EstimatesRowSize.estimateRowSize(0, physicalPlanOptimizer.optimize(physicalPlan)); + opportunisticallyAssertPlanSerialization(physicalPlan, optimizedPlan); // comment out to disable serialization + return optimizedPlan; + } + + public ActualResults executePlan( + LogicalPlan parsedQuery, + AbstractPhysicalOperationProviders testOperationProviders, + IndexResolution indexResolution, + EnrichResolution enrichPolicies + ) { + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); + String sessionId = "csv-test"; + BlockFactory blockFactory = new BlockFactory( + bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), + bigArrays, + ByteSizeValue.ofBytes(ESTestCase.randomLongBetween(1, BlockFactory.DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE.getBytes() * 2)) + ); + ExchangeSourceHandler exchangeSource = new ExchangeSourceHandler(ESTestCase.between(1, 64), executor); + ExchangeSinkHandler exchangeSink = new ExchangeSinkHandler( + blockFactory, + ESTestCase.between(1, 64), + threadPool::relativeTimeInMillis + ); + LocalExecutionPlanner executionPlanner = new LocalExecutionPlanner( + sessionId, + "", + new CancellableTask(1, "transport", "esql", null, TaskId.EMPTY_TASK_ID, Map.of()), + bigArrays, + blockFactory, + randomNodeSettings(), + configuration, + exchangeSource, + exchangeSink, + Mockito.mock(EnrichLookupService.class), + testOperationProviders + ); + // + // Keep in sync with ComputeService#execute + // + PhysicalPlan physicalPlan = physicalPlan(parsedQuery, indexResolution, enrichPolicies); + Tuple coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode( + physicalPlan, + configuration + ); + PhysicalPlan coordinatorPlan = coordinatorAndDataNodePlan.v1(); + PhysicalPlan dataNodePlan = coordinatorAndDataNodePlan.v2(); + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Coordinator plan\n" + coordinatorPlan); + LOGGER.trace("DataNode plan\n" + dataNodePlan); + } + + List columnNames = Expressions.names(coordinatorPlan.output()); + List dataTypes = new ArrayList<>(columnNames.size()); + List columnTypes = coordinatorPlan.output() + .stream() + .peek(o -> dataTypes.add(EsqlDataTypes.outputType(o.dataType()))) + .map(o -> Type.asType(o.dataType().nameUpper())) + .toList(); + + List drivers = new ArrayList<>(); + List collectedPages = Collections.synchronizedList(new ArrayList<>()); + + // replace fragment inside the coordinator plan + try { + LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan(new OutputExec(coordinatorPlan, collectedPages::add)); + drivers.addAll(coordinatorNodeExecutionPlan.createDrivers(sessionId)); + if (dataNodePlan != null) { + var searchStats = new DisabledSearchStats(); + var logicalTestOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, searchStats)); + var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer( + new LocalPhysicalOptimizerContext(configuration, searchStats) + ); + + var csvDataNodePhysicalPlan = PlannerUtils.localPlan(dataNodePlan, logicalTestOptimizer, physicalTestOptimizer); + exchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, ESTestCase.randomIntBetween(1, 3)); + LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan(csvDataNodePhysicalPlan); + drivers.addAll(dataNodeExecutionPlan.createDrivers(sessionId)); + Randomness.shuffle(drivers); + } + // Execute the driver + DriverRunner runner = new DriverRunner(threadPool.getThreadContext()) { + @Override + protected void start(Driver driver, ActionListener driverListener) { + Driver.start(threadPool.getThreadContext(), executor, driver, ESTestCase.between(1, 1000), driverListener); + } + }; + PlainActionFuture future = new PlainActionFuture<>(); + runner.runToCompletion(drivers, ActionListener.releaseAfter(future, () -> Releasables.close(drivers)).map(ignore -> { + var responseHeaders = threadPool.getThreadContext().getResponseHeaders(); + return new ActualResults(columnNames, columnTypes, dataTypes, collectedPages, responseHeaders); + })); + return future.actionGet(TimeValue.timeValueSeconds(30)); + } finally { + Releasables.close(() -> Releasables.close(drivers)); + } + } + + private Settings randomNodeSettings() { + Settings.Builder builder = Settings.builder(); + if (ESTestCase.randomBoolean()) { + builder.put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_SIZE_SETTING, ByteSizeValue.ofBytes(ESTestCase.randomIntBetween(0, 4096))); + builder.put( + BlockFactory.LOCAL_BREAKER_OVER_RESERVED_MAX_SIZE_SETTING, + ByteSizeValue.ofBytes(ESTestCase.randomIntBetween(0, 16 * 1024)) + ); + } + return builder.build(); + } + + public CircuitBreakerService breakerService() { + return bigArrays.breakerService(); + } + + // Asserts that the serialization and deserialization of the plan creates an equivalent plan. + private void opportunisticallyAssertPlanSerialization(PhysicalPlan... plans) { + for (var plan : plans) { + var tmp = plan; + do { + if (tmp instanceof LocalSourceExec) { + return; // skip plans with localSourceExec + } + } while (tmp.children().isEmpty() == false && (tmp = tmp.children().get(0)) != null); + + SerializationTestUtils.assertSerialization(plan, configuration); + } + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/OptimizerExecutabilityTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/OptimizerExecutabilityTests.java new file mode 100644 index 0000000000000..75a726aacff94 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/OptimizerExecutabilityTests.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.apache.lucene.search.IndexSearcher; +import org.elasticsearch.common.logging.HeaderWarning; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.MapperServiceTestCase; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.esql.TestQueryRunner; +import org.elasticsearch.xpack.esql.analysis.EnrichResolution; +import org.elasticsearch.xpack.esql.core.index.EsIndex; +import org.elasticsearch.xpack.esql.core.index.IndexResolution; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.parser.EsqlParser; +import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * Tests that run against actual indices, to confirm that optimized plans remain executable. + */ +public class OptimizerExecutabilityTests extends MapperServiceTestCase { + private final EsqlParser parser = new EsqlParser(); + // TODO: add constant_keyword, scaled_float, unsigned_long, version, wildcard + // Currently leads to error: Failed to parse mapping: No handler for type [version] declared on field [version] + private static final String MAPPING_ALL_TYPES = """ + { + "mappings": { + "properties" : { + "alias-integer": { + "type": "alias", + "path": "integer" + }, + "boolean": { + "type": "boolean" + }, + "byte" : { + "type" : "byte" + }, + "date": { + "type": "date" + }, + "double": { + "type": "double" + }, + "float": { + "type": "float" + }, + "half_float": { + "type": "half_float" + }, + "integer" : { + "type" : "integer" + }, + "ip": { + "type": "ip" + }, + "keyword" : { + "type" : "keyword" + }, + "long": { + "type": "long" + }, + "short": { + "type": "short" + }, + "text" : { + "type" : "text" + } + } + } + }"""; + + public void testOutOfRangePushdown() throws IOException { + // This comparison is out of range and should fail. (It fails on when using a real Lucene index.) + runQuery("from testidx | where integer < 1E300"); + } + + private void runQuery(String query) throws IOException { + ThreadPool threadPool; + Executor executor; + + if (randomBoolean()) { + int numThreads = randomBoolean() ? 1 : between(2, 16); + threadPool = new TestThreadPool( + "CsvTests", + new FixedExecutorBuilder(Settings.EMPTY, "esql_test", numThreads, 1024, "esql", EsExecutors.TaskTrackingConfig.DEFAULT) + ); + executor = threadPool.executor("esql_test"); + } else { + threadPool = new TestThreadPool(getTestName()); + executor = threadPool.executor(ThreadPool.Names.SEARCH); + } + + HeaderWarning.setThreadContext(threadPool.getThreadContext()); + var queryRunner = new TestQueryRunner(threadPool, executor); + + var parsed = parser.createStatement(query); + MapperService mapperService = createMapperService(MAPPING_ALL_TYPES); + + ParsedDocument doc = mapperService.documentMapper().parse(source(""" + { "integer" : 1 } + """)); + + withLuceneIndex(mapperService, iw -> iw.addDocument(doc.rootDoc()), ir -> { + IndexSearcher searcher = newSearcher(ir); + SearchExecutionContext ctx = createSearchExecutionContext(mapperService, searcher); + + var actualResults = queryRunner.executePlan( + parsed, + new EsPhysicalOperationProviders(List.of(new EsPhysicalOperationProviders.DefaultShardContext(0, ctx, AliasFilter.EMPTY))), + IndexResolution.valid(new EsIndex("testidx", Map.of("integer", new EsField("integer", DataType.INTEGER, Map.of(), true)))), + new EnrichResolution() + ); + + assertWarnings("No limit defined, adding default limit of [1000]"); + + HeaderWarning.removeThreadContext(threadPool.getThreadContext()); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + }); + + } +}