Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ public Comparator<Object> comparator() {
}
}

record ActualResults(
public record ActualResults(
List<String> columnNames,
List<Type> columnTypes,
List<String> dataTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,15 @@

import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverRunner;
import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.TestThreadPool;
Expand All @@ -46,55 +29,25 @@
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.CsvTestUtils.ActualResults;
import org.elasticsearch.xpack.esql.CsvTestUtils.Type;
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
import org.elasticsearch.xpack.esql.analysis.PreAnalyzer;
import org.elasticsearch.xpack.esql.core.CsvSpecReader;
import org.elasticsearch.xpack.esql.core.SpecReader;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.esql.core.index.EsIndex;
import org.elasticsearch.xpack.esql.core.index.IndexResolution;
import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer;
import org.elasticsearch.xpack.esql.optimizer.TestLocalPhysicalPlanOptimizer;
import org.elasticsearch.xpack.esql.optimizer.TestPhysicalPlanOptimizer;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlan;
import org.elasticsearch.xpack.esql.planner.Mapper;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.planner.TestPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.plugin.EsqlFeatures;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.esql.stats.DisabledSearchStats;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mockito;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -107,7 +60,6 @@
import static org.elasticsearch.xpack.esql.CsvTestUtils.loadCsvSpecValues;
import static org.elasticsearch.xpack.esql.CsvTestUtils.loadPageFromCsv;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.cap;
Expand Down Expand Up @@ -140,33 +92,21 @@
*
* When we add support for more field types, CsvTests should change to support the new Block types. Same goes for employees.csv file
* (the schema needs adjustment) and the mapping-default.json file (to add or change an existing field).
* When we add more operators, optimization rules to the logical or physical plan optimizers, there may be the need to change the operators
* in TestPhysicalOperationProviders or adjust TestPhysicalPlanOptimizer. For example, the TestPhysicalPlanOptimizer is skipping any
* rules that push operations to ES itself (a Limit for example). The TestPhysicalOperationProviders is a bit more complicated than that:
* it’s creating its own Source physical operator, aggregation operator (just a tiny bit of it) and field extract operator.
*
* To log the results logResults() should return "true".
*/
// @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.compute:TRACE", reason = "debug")
public class CsvTests extends ESTestCase {

private static final Logger LOGGER = LogManager.getLogger(CsvTests.class);

private final String fileName;
private final String groupName;
private final String testName;
private final Integer lineNumber;
private final CsvSpecReader.CsvTestCase testCase;

private final EsqlConfiguration configuration = EsqlTestUtils.configuration(
new QueryPragmas(Settings.builder().put("page_size", randomPageSize()).build())
);
private final FunctionRegistry functionRegistry = new EsqlFunctionRegistry();
private final EsqlParser parser = new EsqlParser();
private final Mapper mapper = new Mapper(functionRegistry);
private final PhysicalPlanOptimizer physicalPlanOptimizer = new TestPhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration));
private ThreadPool threadPool;
private Executor executor;
private TestQueryRunner queryRunner;

@ParametersFactory(argumentFormatting = "%2$s.%3$s")
public static List<Object[]> readScriptSpec() throws Exception {
Expand All @@ -178,6 +118,7 @@ public static List<Object[]> readScriptSpec() throws Exception {
@Before
public void setUp() throws Exception {
super.setUp();
Executor executor;
if (randomBoolean()) {
int numThreads = randomBoolean() ? 1 : between(2, 16);
threadPool = new TestThreadPool(
Expand All @@ -190,6 +131,7 @@ public void setUp() throws Exception {
executor = threadPool.executor(ThreadPool.Names.SEARCH);
}
HeaderWarning.setThreadContext(threadPool.getThreadContext());
queryRunner = new TestQueryRunner(threadPool, executor);
}

@After
Expand All @@ -203,14 +145,6 @@ public void tearDown() throws Exception {
super.tearDown();
}

private int randomPageSize() {
if (randomBoolean()) {
return between(1, 16);
} else {
return between(1, 16 * 1024);
}
}

public CsvTests(String fileName, String groupName, String testName, Integer lineNumber, CsvSpecReader.CsvTestCase testCase) {
this.fileName = fileName;
this.groupName = groupName;
Expand Down Expand Up @@ -255,8 +189,12 @@ public boolean logResults() {
}

private void doTest() throws Exception {
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking();
var actualResults = executePlan(bigArrays);
var parsed = parser.createStatement(testCase.query);
var testDataset = testsDataset(parsed);
var indexResolution = loadIndexResolution(testDataset.mappingFileName(), testDataset.indexName());
var enrichPolicies = loadEnrichPolicies();

var actualResults = queryRunner.executePlan(parsed, testOperationProviders(testDataset), indexResolution, enrichPolicies);
try {
var expected = loadCsvSpecValues(testCase.expectedResults);

Expand All @@ -265,7 +203,7 @@ private void doTest() throws Exception {
assertWarnings(actualResults.responseHeaders().getOrDefault("Warning", List.of()));
} finally {
Releasables.close(() -> Iterators.map(actualResults.pages().iterator(), p -> p::releaseBlocks));
assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L));
assertThat(queryRunner.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L));
}
}

Expand Down Expand Up @@ -319,18 +257,6 @@ private static EnrichPolicy loadEnrichPolicyMapping(String policyFileName) {
}
}

private PhysicalPlan physicalPlan(LogicalPlan parsed, CsvTestsDataLoader.TestsDataset dataset) {
var indexResolution = loadIndexResolution(dataset.mappingFileName(), dataset.indexName());
var enrichPolicies = loadEnrichPolicies();
var analyzer = new Analyzer(new AnalyzerContext(configuration, functionRegistry, indexResolution, enrichPolicies), TEST_VERIFIER);
var analyzed = analyzer.analyze(parsed);
var logicalOptimized = new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration)).optimize(analyzed);
var physicalPlan = mapper.map(logicalOptimized);
var optimizedPlan = EstimatesRowSize.estimateRowSize(0, physicalPlanOptimizer.optimize(physicalPlan));
opportunisticallyAssertPlanSerialization(physicalPlan, optimizedPlan); // comment out to disable serialization
return optimizedPlan;
}

private static CsvTestsDataLoader.TestsDataset testsDataset(LogicalPlan parsed) {
var preAnalysis = new PreAnalyzer().preAnalyze(parsed);
var indices = preAnalysis.indices;
Expand All @@ -357,102 +283,6 @@ private static TestPhysicalOperationProviders testOperationProviders(CsvTestsDat
return new TestPhysicalOperationProviders(testData.v1(), testData.v2());
}

private ActualResults executePlan(BigArrays bigArrays) throws Exception {
var parsed = parser.createStatement(testCase.query);
var testDataset = testsDataset(parsed);

String sessionId = "csv-test";
BlockFactory blockFactory = new BlockFactory(
bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST),
bigArrays,
ByteSizeValue.ofBytes(randomLongBetween(1, BlockFactory.DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE.getBytes() * 2))
);
ExchangeSourceHandler exchangeSource = new ExchangeSourceHandler(between(1, 64), executor);
ExchangeSinkHandler exchangeSink = new ExchangeSinkHandler(blockFactory, between(1, 64), threadPool::relativeTimeInMillis);
LocalExecutionPlanner executionPlanner = new LocalExecutionPlanner(
sessionId,
"",
new CancellableTask(1, "transport", "esql", null, TaskId.EMPTY_TASK_ID, Map.of()),
bigArrays,
blockFactory,
randomNodeSettings(),
configuration,
exchangeSource,
exchangeSink,
Mockito.mock(EnrichLookupService.class),
testOperationProviders(testDataset)
);
//
// Keep in sync with ComputeService#execute
//
PhysicalPlan physicalPlan = physicalPlan(parsed, testDataset);
Tuple<PhysicalPlan, PhysicalPlan> coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(
physicalPlan,
configuration
);
PhysicalPlan coordinatorPlan = coordinatorAndDataNodePlan.v1();
PhysicalPlan dataNodePlan = coordinatorAndDataNodePlan.v2();

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Coordinator plan\n" + coordinatorPlan);
LOGGER.trace("DataNode plan\n" + dataNodePlan);
}

List<String> columnNames = Expressions.names(coordinatorPlan.output());
List<String> dataTypes = new ArrayList<>(columnNames.size());
List<Type> columnTypes = coordinatorPlan.output()
.stream()
.peek(o -> dataTypes.add(EsqlDataTypes.outputType(o.dataType())))
.map(o -> Type.asType(o.dataType().nameUpper()))
.toList();

List<Driver> drivers = new ArrayList<>();
List<Page> collectedPages = Collections.synchronizedList(new ArrayList<>());

// replace fragment inside the coordinator plan
try {
LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan(new OutputExec(coordinatorPlan, collectedPages::add));
drivers.addAll(coordinatorNodeExecutionPlan.createDrivers(sessionId));
if (dataNodePlan != null) {
var searchStats = new DisabledSearchStats();
var logicalTestOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, searchStats));
var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer(
new LocalPhysicalOptimizerContext(configuration, searchStats)
);

var csvDataNodePhysicalPlan = PlannerUtils.localPlan(dataNodePlan, logicalTestOptimizer, physicalTestOptimizer);
exchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, randomIntBetween(1, 3));
LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan(csvDataNodePhysicalPlan);
drivers.addAll(dataNodeExecutionPlan.createDrivers(sessionId));
Randomness.shuffle(drivers);
}
// Execute the driver
DriverRunner runner = new DriverRunner(threadPool.getThreadContext()) {
@Override
protected void start(Driver driver, ActionListener<Void> driverListener) {
Driver.start(threadPool.getThreadContext(), executor, driver, between(1, 1000), driverListener);
}
};
PlainActionFuture<ActualResults> future = new PlainActionFuture<>();
runner.runToCompletion(drivers, ActionListener.releaseAfter(future, () -> Releasables.close(drivers)).map(ignore -> {
var responseHeaders = threadPool.getThreadContext().getResponseHeaders();
return new ActualResults(columnNames, columnTypes, dataTypes, collectedPages, responseHeaders);
}));
return future.actionGet(TimeValue.timeValueSeconds(30));
} finally {
Releasables.close(() -> Releasables.close(drivers));
}
}

private Settings randomNodeSettings() {
Settings.Builder builder = Settings.builder();
if (randomBoolean()) {
builder.put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_SIZE_SETTING, ByteSizeValue.ofBytes(randomIntBetween(0, 4096)));
builder.put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_MAX_SIZE_SETTING, ByteSizeValue.ofBytes(randomIntBetween(0, 16 * 1024)));
}
return builder.build();
}

private Throwable reworkException(Throwable th) {
StackTraceElement[] stackTrace = th.getStackTrace();
StackTraceElement[] redone = new StackTraceElement[stackTrace.length + 1];
Expand All @@ -463,20 +293,6 @@ private Throwable reworkException(Throwable th) {
return th;
}

// Asserts that the serialization and deserialization of the plan creates an equivalent plan.
private void opportunisticallyAssertPlanSerialization(PhysicalPlan... plans) {
for (var plan : plans) {
var tmp = plan;
do {
if (tmp instanceof LocalSourceExec) {
return; // skip plans with localSourceExec
}
} while (tmp.children().isEmpty() == false && (tmp = tmp.children().get(0)) != null);

SerializationTestUtils.assertSerialization(plan, configuration);
}
}

private void assertWarnings(List<String> warnings) {
List<String> normalized = new ArrayList<>(warnings.size());
for (String w : warnings) {
Expand Down
Loading