diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java index c1634bf9885fb..18358df4dbd68 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java @@ -161,7 +161,15 @@ public List currentSchema() { final List previousCommands = new ArrayList<>(); QueryExecuted previousResult; }; - EsqlQueryGenerator.generatePipeline(MAX_DEPTH, sourceCommand(), mappingInfo, exec, requiresTimeSeries(), this); + EsqlQueryGenerator.generatePipeline( + MAX_DEPTH, + sourceCommand(), + EsqlQueryGenerator.PIPE_COMMANDS, + mappingInfo, + exec, + requiresTimeSeries(), + this + ); } } @@ -182,7 +190,8 @@ private static CommandGenerator.ValidationResult checkResults( previousResult == null ? null : previousResult.outputSchema(), previousResult == null ? null : previousResult.result(), result.outputSchema(), - result.result() + result.result(), + false ); if (outputValidation.success() == false) { for (Pattern allowedError : ALLOWED_ERROR_PATTERNS) { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/EsqlQueryGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/EsqlQueryGenerator.java index c6b6d7ee9b1b3..b3be78ed9a6cc 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/EsqlQueryGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/EsqlQueryGenerator.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.esql.generator.command.pipe.TimeSeriesStatsGenerator; import org.elasticsearch.xpack.esql.generator.command.pipe.WhereGenerator; import org.elasticsearch.xpack.esql.generator.command.source.FromGenerator; +import org.elasticsearch.xpack.esql.generator.command.source.SimpleFromGenerator; import org.elasticsearch.xpack.esql.generator.command.source.TimeSeriesGenerator; import java.util.List; @@ -49,7 +50,9 @@ public class EsqlQueryGenerator { /** * These are commands that are at the beginning of the query, eg. FROM */ - static List SOURCE_COMMANDS = List.of(FromGenerator.INSTANCE); + public static List SOURCE_COMMANDS = List.of(FromGenerator.INSTANCE); + + public static List SIMPLIFIED_SOURCE_COMMANDS = List.of(SimpleFromGenerator.INSTANCE); /** * Commands at the beginning of queries that begin queries on time series indices, eg. TS @@ -59,7 +62,7 @@ public class EsqlQueryGenerator { /** * These are downstream commands, ie. that cannot appear as the first command in a query */ - static List PIPE_COMMANDS = List.of( + public static List PIPE_COMMANDS = List.of( ChangePointGenerator.INSTANCE, DissectGenerator.INSTANCE, DropGenerator.INSTANCE, @@ -78,6 +81,25 @@ public class EsqlQueryGenerator { WhereGenerator.INSTANCE ); + /** + * Same as PIPE_COMMANDS but without the more complex commands (Fork, Enrich, Join). + * This is needed in CSV tests, that don't support the full ES capabilities + */ + public static List SIMPLIFIED_PIPE_COMMANDS = List.of( + ChangePointGenerator.INSTANCE, + DissectGenerator.INSTANCE, + DropGenerator.INSTANCE, + EvalGenerator.INSTANCE, + GrokGenerator.INSTANCE, + KeepGenerator.INSTANCE, + LimitGenerator.INSTANCE, + MvExpandGenerator.INSTANCE, + RenameGenerator.INSTANCE, + SortGenerator.INSTANCE, + StatsGenerator.INSTANCE, + WhereGenerator.INSTANCE + ); + static List TIME_SERIES_PIPE_COMMANDS = Stream.concat( PIPE_COMMANDS.stream(), Stream.of(TimeSeriesStatsGenerator.INSTANCE) @@ -87,6 +109,10 @@ public static CommandGenerator sourceCommand() { return randomFrom(SOURCE_COMMANDS); } + public static CommandGenerator simplifiedSourceCommand() { + return randomFrom(SIMPLIFIED_SOURCE_COMMANDS); + } + public static CommandGenerator timeSeriesSourceCommand() { return randomFrom(TIME_SERIES_SOURCE_COMMANDS); } @@ -113,6 +139,7 @@ public interface Executor { public static void generatePipeline( final int depth, CommandGenerator commandGenerator, + List pipelineGenerators, final CommandGenerator.QuerySchema schema, Executor executor, boolean isTimeSeries, @@ -133,7 +160,7 @@ public static void generatePipeline( while (commandAllowed == false) { commandGenerator = isTimeSeries && canGenerateTimeSeries ? randomMetricsPipeCommandGenerator() - : randomPipeCommandGenerator(); + : randomFrom(pipelineGenerators); if (isTimeSeries == false) { commandAllowed = true; } else { @@ -151,6 +178,7 @@ public static void generatePipeline( } } } + desc = commandGenerator.generate(executor.previousCommands(), executor.currentSchema(), schema, queryExecutor); if (desc == CommandGenerator.EMPTY_DESCRIPTION) { continue; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/README.asciidoc b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/README.asciidoc index d9613f01e0b6b..6ba8e4fe0188f 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/README.asciidoc +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/README.asciidoc @@ -17,6 +17,10 @@ All you have to do is: *** Have a look at `EsqlQueryGenerator`, it contains many utility methods that will help you generate random expressions. ** Implement `CommandGenerator.validateOutput()` to validate the output of the query. * Add your class to `EsqlQueryGenerator.SOURCE_COMMANDS` (if it's a source command) or `EsqlQueryGenerator.PIPE_COMMANDS` (if it's a pipe command). + These will be used by `GenerativeIT` to pick a random command to append to the query. +** Also consider adding your generators (or a simplified version of them) + to `EsqlQueryGenerator.SIMPLIFIED_SOURCE_COMMANDS` or `EsqlQueryGenerator.SIMPLIFIED_PIPE_COMMANDS`. + These are used to generate queries in contexts when the full complexity of ES|QL is not supported (eg. in `GenerativeCsvIT`). * Run `GenerativeIT` at least a couple of times: these tests can be pretty noisy. * If you get unexpected errors (real bugs in ES|QL), please open an issue and add the error to `GenerativeRestTest.ALLOWED_ERRORS`. Run tests again until everything works fine. diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/CommandGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/CommandGenerator.java index e18748be154f8..4a67db5901c6a 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/CommandGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/CommandGenerator.java @@ -54,7 +54,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { return VALIDATION_OK; } @@ -71,7 +72,8 @@ public ValidationResult validateOutput( * @param schema The columns returned by the query so far. It contains name and type information for each column. * @param executor * @return All the details about the generated command. See {@link CommandDescription}. - * If something goes wrong and for some reason you can't generate a command, you should return {@link CommandGenerator#EMPTY_DESCRIPTION} + * If something goes wrong and for some reason you can't generate a command, + * you should return {@link CommandGenerator#EMPTY_DESCRIPTION} */ CommandDescription generate( List previousCommands, @@ -89,11 +91,16 @@ CommandDescription generate( * It also contains the context information you stored during command generation. * @param previousColumns The output schema of the original query (without last generated command). * It contains name and type information for each column, see {@link Column} - * @param previousOutput The output of the original query (without last generated command), as a list (rows) of lists (columns) of values + * @param previousOutput The output of the original query (without last generated command), + * as a list (rows) of lists (columns) of values * @param columns The output schema of the full query (WITH last generated command). * @param output The output of the full query (WITH last generated command), as a list (rows) of lists (columns) of values + * @param deterministic True if the query is executed in deterministic mode (eg. in CsvTests), ie. that the + * results (also their order) are stable between multiple executions. + * False if the query is executed in non-deterministic mode (eg. in GenerativeIT, against an ES cluster) * @return The result of the output validation. If the validation succeeds, you should return {@link CommandGenerator#VALIDATION_OK}. - * Also, if for some reason you can't validate the output, just return {@link CommandGenerator#VALIDATION_OK}; for a command, having a generator without + * Also, if for some reason you can't validate the output, just return {@link CommandGenerator#VALIDATION_OK}; + * for a command, having a generator without * validation is much better than having no generator at all. */ ValidationResult validateOutput( @@ -102,20 +109,39 @@ ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ); static ValidationResult expectSameRowCount( List previousCommands, List> previousOutput, - List> output + List> output, + boolean deterministic ) { + if (deterministic && previousOutput.size() != output.size()) { + return new ValidationResult(false, "Expecting [" + previousOutput.size() + "] rows, got [" + output.size() + "]"); + } + + return VALIDATION_OK; + } - // ES|QL is quite non-deterministic in this sense, we can't guarantee it for now - // if (output.size() != previousOutput.size()) { - // return new ValidationResult(false, "Expecting [" + previousOutput.size() + "] rows, but got [" + output.size() + "]"); - // } + static ValidationResult expectSameData(List> before, int beforeCol, List> after, int afterCol) { + if (before.size() != after.size()) { + return new ValidationResult(false, "Expecting same number of rows, got [" + before.size() + "] and [" + after.size() + "]"); + } + for (int i = 0; i < before.size(); i++) { + Object v1 = before.get(i).get(beforeCol); + Object v2 = after.get(i).get(afterCol); + if (v1 == null) { + if (v2 != null) { + return new ValidationResult(false, "Expecting null at row [" + i + "], got [" + v2 + "]"); + } + } else if (v1.equals(v2) == false) { + return new ValidationResult(false, "Expecting [" + v1 + "] at row [" + i + "], got [" + v2 + "]"); + } + } return VALIDATION_OK; } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/ChangePointGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/ChangePointGenerator.java index 2b04cc9aced80..f905ece9c6e50 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/ChangePointGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/ChangePointGenerator.java @@ -49,7 +49,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { return CommandGenerator.expectAtLeastSameNumberOfColumns(previousColumns, columns); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/DissectGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/DissectGenerator.java index 6e0f95c7e7846..db065e6922768 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/DissectGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/DissectGenerator.java @@ -66,7 +66,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { if (commandDescription == EMPTY_DESCRIPTION) { return VALIDATION_OK; @@ -76,6 +77,6 @@ public ValidationResult validateOutput( return new ValidationResult(false, "Expecting at least [" + previousColumns.size() + "] columns, got [" + columns.size() + "]"); } - return CommandGenerator.expectSameRowCount(previousCommands, previousOutput, output); + return CommandGenerator.expectSameRowCount(previousCommands, previousOutput, output, deterministic); } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/DropGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/DropGenerator.java index 97b754692f392..a5e871e0865d9 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/DropGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/DropGenerator.java @@ -73,7 +73,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { if (commandDescription == EMPTY_DESCRIPTION) { return VALIDATION_OK; @@ -86,8 +87,17 @@ public ValidationResult validateOutput( return new ValidationResult(false, "Column [" + droppedColumn + "] was not dropped"); } } - // TODO awaits fix https://github.com/elastic/elasticsearch/issues/120272 - // return CommandGenerator.expectSameRowCount(previousOutput, output); + + if (deterministic) { + for (int columnIdx = 0; columnIdx < columns.size(); columnIdx++) { + Column c = columns.get(columnIdx); + int previousColumnIdx = previousColumns.indexOf(c); + if (previousColumnIdx == -1) { + return new ValidationResult(false, "Column [" + c + "] not in previous output"); + } + CommandGenerator.expectSameData(previousOutput, previousColumnIdx, output, columnIdx); + } + } return VALIDATION_OK; } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/EnrichGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/EnrichGenerator.java index be3d547cf535c..d9cd8cbb99292 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/EnrichGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/EnrichGenerator.java @@ -49,7 +49,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { if (commandDescription == EMPTY_DESCRIPTION) { return VALIDATION_OK; @@ -59,6 +60,6 @@ public ValidationResult validateOutput( return new ValidationResult(false, "Expecting at least [" + previousColumns.size() + "] columns, got [" + columns.size() + "]"); } - return CommandGenerator.expectSameRowCount(previousCommands, previousOutput, output); + return CommandGenerator.expectSameRowCount(previousCommands, previousOutput, output, deterministic); } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/EvalGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/EvalGenerator.java index 8c2841dda3f04..239b470d8697b 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/EvalGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/EvalGenerator.java @@ -69,7 +69,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { List expectedColumns = (List) commandDescription.context().get(NEW_COLUMNS); List resultColNames = columns.stream().map(Column::name).toList(); @@ -87,7 +88,7 @@ public ValidationResult validateOutput( ); } - return CommandGenerator.expectSameRowCount(previousCommands, previousOutput, output); + return CommandGenerator.expectSameRowCount(previousCommands, previousOutput, output, deterministic); } private static String unquote(String colName) { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/ForkGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/ForkGenerator.java index 64f4230d7f17d..0e5f7c50e0c1e 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/ForkGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/ForkGenerator.java @@ -112,13 +112,14 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { return VALIDATION_OK; } }; - EsqlQueryGenerator.generatePipeline(3, gen, schema, exec, false, executor); + EsqlQueryGenerator.generatePipeline(3, gen, EsqlQueryGenerator.PIPE_COMMANDS, schema, exec, false, executor); if (exec.previousCommands().size() > 1) { String previousCmd = exec.previousCommands() .stream() @@ -143,8 +144,9 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { - return CommandGenerator.expectSameRowCount(previousCommands, previousOutput, output); + return CommandGenerator.expectSameRowCount(previousCommands, previousOutput, output, deterministic); } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/GrokGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/GrokGenerator.java index 162ccbfc21e15..ff81a1cc1a7e5 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/GrokGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/GrokGenerator.java @@ -65,7 +65,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { if (commandDescription == EMPTY_DESCRIPTION) { return VALIDATION_OK; @@ -73,6 +74,6 @@ public ValidationResult validateOutput( if (previousColumns.size() > columns.size()) { return new ValidationResult(false, "Expecting at least [" + previousColumns.size() + "] columns, got [" + columns.size() + "]"); } - return CommandGenerator.expectSameRowCount(previousCommands, previousOutput, output); + return CommandGenerator.expectSameRowCount(previousCommands, previousOutput, output, deterministic); } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/KeepGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/KeepGenerator.java index e4cceabb21c9f..31bdcd06ecce1 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/KeepGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/KeepGenerator.java @@ -69,7 +69,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { if (commandDescription == EMPTY_DESCRIPTION) { return VALIDATION_OK; @@ -79,6 +80,16 @@ public ValidationResult validateOutput( return new ValidationResult(false, "Expecting at most [" + previousColumns.size() + "] columns, got [" + columns.size() + "]"); } + if (deterministic) { + for (int columnIdx = 0; columnIdx < columns.size(); columnIdx++) { + Column c = columns.get(columnIdx); + int previousColumnIdx = previousColumns.indexOf(columns.get(columnIdx)); + if (previousColumnIdx == -1) { + return new ValidationResult(false, "Column [" + c + "] not in previous output"); + } + CommandGenerator.expectSameData(previousOutput, previousColumnIdx, output, columnIdx); + } + } return VALIDATION_OK; } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/LimitGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/LimitGenerator.java index bd2f49e4b079b..604b6c94d733e 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/LimitGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/LimitGenerator.java @@ -41,13 +41,24 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { int limit = (int) commandDescription.context().get(LIMIT); if (output.size() > limit) { return new ValidationResult(false, "Expecting at most [" + limit + "] records, got [" + output.size() + "]"); } - return CommandGenerator.expectSameColumns(previousColumns, columns); + + ValidationResult result = CommandGenerator.expectSameColumns(previousColumns, columns); + if (result.success() == false) { + return result; + } + if (deterministic) { + for (int i = 0; i < columns.size(); i++) { + CommandGenerator.expectSameData(previousOutput.subList(0, Math.min(previousOutput.size(), limit)), i, output, i); + } + } + return VALIDATION_OK; } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/LookupJoinGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/LookupJoinGenerator.java index 011db8e2f0e9f..b4027e67b4812 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/LookupJoinGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/LookupJoinGenerator.java @@ -88,7 +88,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { if (commandDescription == EMPTY_DESCRIPTION) { return VALIDATION_OK; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/MvExpandGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/MvExpandGenerator.java index 7be8d0bdff62a..aac31d955d2b4 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/MvExpandGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/MvExpandGenerator.java @@ -43,7 +43,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { if (commandDescription == EMPTY_DESCRIPTION) { return VALIDATION_OK; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/RenameGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/RenameGenerator.java index 132c0dc9840a7..6ee53ea50f60b 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/RenameGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/RenameGenerator.java @@ -89,7 +89,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { if (commandDescription == EMPTY_DESCRIPTION) { return VALIDATION_OK; @@ -97,7 +98,7 @@ public ValidationResult validateOutput( if (previousColumns.size() < columns.size()) { return new ValidationResult(false, "Expecting at most [" + previousColumns.size() + "] columns, got [" + columns.size() + "]"); } - return CommandGenerator.expectSameRowCount(previousCommands, previousOutput, output); + return CommandGenerator.expectSameRowCount(previousCommands, previousOutput, output, deterministic); } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/SampleGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/SampleGenerator.java index f8ed78a09bd79..3474f89195ac2 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/SampleGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/SampleGenerator.java @@ -40,7 +40,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { return CommandGenerator.expectSameColumns(previousColumns, columns); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/SortGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/SortGenerator.java index 4d61c00d1391c..45815d13a0221 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/SortGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/SortGenerator.java @@ -56,7 +56,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { return CommandGenerator.expectSameColumns(previousColumns, columns); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/StatsGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/StatsGenerator.java index 30464c28a8464..1bb1420793337 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/StatsGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/StatsGenerator.java @@ -75,7 +75,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { // TODO validate columns return VALIDATION_OK; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/TimeSeriesStatsGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/TimeSeriesStatsGenerator.java index 35564a01844b8..0b46d83ba1d80 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/TimeSeriesStatsGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/TimeSeriesStatsGenerator.java @@ -106,7 +106,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { // TODO validate columns return VALIDATION_OK; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/WhereGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/WhereGenerator.java index 00a7f6cfe76d2..60e554ba69ef9 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/WhereGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/pipe/WhereGenerator.java @@ -66,7 +66,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { return CommandGenerator.expectSameColumns(previousColumns, columns); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/FromGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/FromGenerator.java index 13bfe92692f1b..49a15d1858a0a 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/FromGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/FromGenerator.java @@ -49,7 +49,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { return VALIDATION_OK; } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/SimpleFromGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/SimpleFromGenerator.java new file mode 100644 index 0000000000000..b86047204e396 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/SimpleFromGenerator.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.generator.command.source; + +import org.elasticsearch.xpack.esql.generator.Column; +import org.elasticsearch.xpack.esql.generator.QueryExecutor; +import org.elasticsearch.xpack.esql.generator.command.CommandGenerator; + +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.ESTestCase.randomIntBetween; + +public class SimpleFromGenerator implements CommandGenerator { + + public static final SimpleFromGenerator INSTANCE = new SimpleFromGenerator(); + + @Override + public CommandDescription generate( + List previousCommands, + List previousOutput, + QuerySchema schema, + QueryExecutor executor + ) { + List availableIndices = schema.baseIndices(); + String idx = availableIndices.get(randomIntBetween(0, availableIndices.size() - 1)); + + String query = "from " + idx; + return new CommandDescription("from", this, query, Map.of()); + } + + @Override + public ValidationResult validateOutput( + List previousCommands, + CommandDescription commandDescription, + List previousColumns, + List> previousOutput, + List columns, + List> output, + boolean deterministic + ) { + return VALIDATION_OK; + } +} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/TimeSeriesGenerator.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/TimeSeriesGenerator.java index 0813860b6ab19..a9e64dcf3c204 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/TimeSeriesGenerator.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/generator/command/source/TimeSeriesGenerator.java @@ -49,7 +49,8 @@ public ValidationResult validateOutput( List previousColumns, List> previousOutput, List columns, - List> output + List> output, + boolean deterministic ) { return VALIDATION_OK; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/GenerativeCsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/GenerativeCsvTests.java new file mode 100644 index 0000000000000..d8e91ac1e7ae8 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/GenerativeCsvTests.java @@ -0,0 +1,785 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.logging.HeaderWarning; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.ConstantNullBlock; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.Driver; +import org.elasticsearch.compute.operator.DriverCompletionInfo; +import org.elasticsearch.compute.operator.DriverRunner; +import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; +import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; +import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.analysis.AnalyzerContext; +import org.elasticsearch.xpack.esql.analysis.EnrichResolution; +import org.elasticsearch.xpack.esql.analysis.PreAnalyzer; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.core.type.InvalidMappedField; +import org.elasticsearch.xpack.esql.enrich.EnrichLookupService; +import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService; +import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; +import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; +import org.elasticsearch.xpack.esql.generator.Column; +import org.elasticsearch.xpack.esql.generator.EsqlQueryGenerator; +import org.elasticsearch.xpack.esql.generator.LookupIdx; +import org.elasticsearch.xpack.esql.generator.LookupIdxColumn; +import org.elasticsearch.xpack.esql.generator.QueryExecuted; +import org.elasticsearch.xpack.esql.generator.QueryExecutor; +import org.elasticsearch.xpack.esql.generator.command.CommandGenerator; +import org.elasticsearch.xpack.esql.index.EsIndex; +import org.elasticsearch.xpack.esql.index.IndexResolution; +import org.elasticsearch.xpack.esql.inference.InferenceService; +import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.TestLocalPhysicalPlanOptimizer; +import org.elasticsearch.xpack.esql.parser.EsqlParser; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec; +import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec; +import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; +import org.elasticsearch.xpack.esql.plan.physical.MergeExec; +import org.elasticsearch.xpack.esql.plan.physical.OutputExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; +import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlan; +import org.elasticsearch.xpack.esql.planner.PlannerUtils; +import org.elasticsearch.xpack.esql.planner.TestPhysicalOperationProviders; +import org.elasticsearch.xpack.esql.planner.mapper.Mapper; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.session.EsqlSession; +import org.elasticsearch.xpack.esql.session.EsqlSession.PlanRunner; +import org.elasticsearch.xpack.esql.session.Result; +import org.elasticsearch.xpack.esql.stats.DisabledSearchStats; +import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.esql.CsvTestUtils.ActualResults; +import static org.elasticsearch.xpack.esql.CsvTestUtils.Type; +import static org.elasticsearch.xpack.esql.CsvTestUtils.loadPageFromCsv; +import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP; +import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_POLICIES; +import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.availableDatasetsForEs; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_PLANNER_SETTINGS; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.mock; + +//@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.compute:TRACE", reason = "debug") +public class GenerativeCsvTests extends ESTestCase implements QueryExecutor { + + private static final Logger LOGGER = LogManager.getLogger(GenerativeCsvTests.class); + + public static final int ITERATIONS = 100; + public static final int MAX_DEPTH = 20; + + public static final Set ALLOWED_ERRORS = Set.of( + "Reference \\[.*\\] is ambiguous", + "Cannot use field \\[.*\\] due to ambiguities", + "cannot sort on .*", + "argument of \\[count.*\\] must", + "Cannot use field \\[.*\\] with unsupported type \\[.*\\]", + "Unbounded SORT not supported yet", + "The field names are too complex to process", // field_caps problem + "must be \\[any type except counter types\\]", // TODO refine the generation of count() + + // Awaiting fixes for query failure + "Unknown column \\[\\]", // https://github.com/elastic/elasticsearch/issues/121741, + // https://github.com/elastic/elasticsearch/issues/125866 + "Plan \\[ProjectExec\\[\\[.* optimized incorrectly due to missing references", + "The incoming YAML document exceeds the limit:", // still to investigate, but it seems to be specific to the test framework + "Data too large", // Circuit breaker exceptions eg. https://github.com/elastic/elasticsearch/issues/130072 + "optimized incorrectly due to missing references", // https://github.com/elastic/elasticsearch/issues/131509 + + // Awaiting fixes for correctness + "Expecting at most \\[.*\\] columns, got \\[.*\\]" // https://github.com/elastic/elasticsearch/issues/129561 + ); + + public static final Set ALLOWED_ERROR_PATTERNS = ALLOWED_ERRORS.stream() + .map(x -> ".*" + x + ".*") + .map(x -> Pattern.compile(x, Pattern.DOTALL)) + .collect(Collectors.toSet()); + + private final Configuration configuration = EsqlTestUtils.configuration( + new QueryPragmas(Settings.builder().put("page_size", randomPageSize()).build()) + ); + private final EsqlFunctionRegistry functionRegistry = new EsqlFunctionRegistry(); + private final EsqlParser parser = new EsqlParser(); + private final Mapper mapper = new Mapper(); + private ThreadPool threadPool; + private Executor executor; + + @Before + public void setUp() throws Exception { + super.setUp(); + if (randomBoolean()) { + int numThreads = randomBoolean() ? 1 : between(2, 16); + threadPool = new TestThreadPool( + "CsvTests", + new FixedExecutorBuilder(Settings.EMPTY, "esql_test", numThreads, 1024, "esql", EsExecutors.TaskTrackingConfig.DEFAULT) + ); + executor = threadPool.executor("esql_test"); + } else { + threadPool = new TestThreadPool(getTestName()); + executor = threadPool.executor(ThreadPool.Names.SEARCH); + } + HeaderWarning.setThreadContext(threadPool.getThreadContext()); + } + + @After + public void teardown() { + HeaderWarning.removeThreadContext(threadPool.getThreadContext()); + } + + @After + public void tearDown() throws Exception { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + super.tearDown(); + } + + private int randomPageSize() { + if (randomBoolean()) { + return between(1, 16); + } else { + return between(1, 16 * 1024); + } + } + + public GenerativeCsvTests() {} + + @Override + protected final boolean enableWarningsCheck() { + return false; // We use our own warnings check + } + + public Set excludedDatasets() { + // these have subfields, ranges and other things that are not properly supported in CSV + return Set.of( + "system_metrics", + "mv_text", + "multi_column_joinable", + "heights", + "firewall_logs", + "app_logs", + "employees_incompatible", + "employees_incompatible_types", + "decades", + "k8s", + "k8s-downsampled", + "ages", + "books" + ); + } + + public void test() throws Exception { + List indices = availableIndices().stream().filter(x -> excludedDatasets().contains(x) == false).toList(); + List lookupIndices = lookupIndices(); + List policies = availableEnrichPolicies(); + CommandGenerator.QuerySchema mappingInfo = new CommandGenerator.QuerySchema(indices, lookupIndices, policies); + + for (int i = 0; i < ITERATIONS; i++) { + var exec = new EsqlQueryGenerator.Executor() { + @Override + public void run(CommandGenerator generator, CommandGenerator.CommandDescription current) { + previousCommands.add(current); + final String command = current.commandString(); + + final QueryExecuted result = previousResult == null + ? execute(command, 0) + : execute(previousResult.query() + command, previousResult.depth()); + previousResult = result; + + logger.trace(() -> "query:" + result.query()); + + final boolean hasException = result.exception() != null; + if (hasException || checkResults(List.of(), generator, current, previousResult, result).success() == false) { + if (hasException) { + checkException(result); + } + continueExecuting = false; + currentSchema = List.of(); + } else { + continueExecuting = true; + currentSchema = result.outputSchema(); + } + } + + @Override + public List previousCommands() { + return previousCommands; + } + + @Override + public boolean continueExecuting() { + return continueExecuting; + } + + @Override + public List currentSchema() { + return currentSchema; + } + + boolean continueExecuting; + List currentSchema; + final List previousCommands = new ArrayList<>(); + QueryExecuted previousResult; + }; + EsqlQueryGenerator.generatePipeline( + MAX_DEPTH, + EsqlQueryGenerator.simplifiedSourceCommand(), + EsqlQueryGenerator.SIMPLIFIED_PIPE_COMMANDS, + mappingInfo, + exec, + false, + this + ); + } + } + + private static CommandGenerator.ValidationResult checkResults( + List previousCommands, + CommandGenerator commandGenerator, + CommandGenerator.CommandDescription commandDescription, + QueryExecuted previousResult, + QueryExecuted result + ) { + CommandGenerator.ValidationResult outputValidation = commandGenerator.validateOutput( + previousCommands, + commandDescription, + previousResult == null ? null : previousResult.outputSchema(), + previousResult == null ? null : previousResult.result(), + result.outputSchema(), + result.result(), + true + ); + if (outputValidation.success() == false) { + for (Pattern allowedError : ALLOWED_ERROR_PATTERNS) { + if (isAllowedError(outputValidation.errorMessage(), allowedError)) { + return outputValidation; + } + } + fail("query: " + result.query() + "\nerror: " + outputValidation.errorMessage()); + } + return outputValidation; + } + + private void checkException(QueryExecuted query) { + for (Pattern allowedError : ALLOWED_ERROR_PATTERNS) { + if (isAllowedError(query.exception().getMessage(), allowedError)) { + return; + } + } + fail("query: " + query.query() + "\nexception: " + query.exception().getMessage()); + } + + /** + * Long lines in exceptions can be split across several lines. When a newline is inserted, the end of the current line and the beginning + * of the new line are marked with a backslash {@code \}; the new line will also have whitespace before the backslash for aligning. + */ + private static final Pattern ERROR_MESSAGE_LINE_BREAK = Pattern.compile("\\\\\n\\s*\\\\"); + + private static boolean isAllowedError(String errorMessage, Pattern allowedPattern) { + String errorWithoutLineBreaks = ERROR_MESSAGE_LINE_BREAK.matcher(errorMessage).replaceAll(""); + return allowedPattern.matcher(errorWithoutLineBreaks).matches(); + } + + private List availableIndices() throws IOException { + return availableDatasetsForEs(false, false, false, false).stream() + .filter(x -> x.requiresInferenceEndpoint() == false) + .map(x -> x.indexName()) + .toList(); + } + + private List lookupIndices() { + List result = new ArrayList<>(); + // we don't have key info from the dataset loader, let's hardcode it for now + result.add(new LookupIdx("languages_lookup", List.of(new LookupIdxColumn("language_code", "integer")))); + result.add(new LookupIdx("message_types_lookup", List.of(new LookupIdxColumn("message", "keyword")))); + List multiColumnJoinableLookupKeys = List.of( + new LookupIdxColumn("id_int", "integer"), + new LookupIdxColumn("name_str", "keyword"), + new LookupIdxColumn("is_active_bool", "boolean"), + new LookupIdxColumn("ip_addr", "ip"), + new LookupIdxColumn("other1", "keyword"), + new LookupIdxColumn("other2", "integer") + ); + result.add(new LookupIdx("multi_column_joinable_lookup", multiColumnJoinableLookupKeys)); + return result; + } + + List availableEnrichPolicies() { + return ENRICH_POLICIES; + } + + private static IndexResolution loadIndexResolution(CsvTestsDataLoader.MultiIndexTestDataset datasets) { + var indexNames = datasets.datasets().stream().map(CsvTestsDataLoader.TestDataset::indexName); + Map indexModes = indexNames.collect(Collectors.toMap(x -> x, x -> IndexMode.STANDARD)); + List mappings = datasets.datasets() + .stream() + .map(ds -> new MappingPerIndex(ds.indexName(), createMappingForIndex(ds))) + .toList(); + var mergedMappings = mergeMappings(mappings); + return IndexResolution.valid( + new EsIndex(datasets.indexPattern(), mergedMappings.mapping, indexModes, mergedMappings.partiallyUnmappedFields) + ); + } + + private static Map createMappingForIndex(CsvTestsDataLoader.TestDataset dataset) { + var mapping = new TreeMap<>(loadMapping(dataset.mappingFileName())); + if (dataset.typeMapping() == null) { + return mapping; + } + for (var entry : dataset.typeMapping().entrySet()) { + if (mapping.containsKey(entry.getKey())) { + DataType dataType = DataType.fromTypeName(entry.getValue()); + EsField field = mapping.get(entry.getKey()); + EsField editedField = new EsField( + field.getName(), + dataType, + field.getProperties(), + field.isAggregatable(), + field.getTimeSeriesFieldType() + ); + mapping.put(entry.getKey(), editedField); + } + } + return mapping; + } + + record MappingPerIndex(String index, Map mapping) {} + + record MergedResult(Map mapping, Set partiallyUnmappedFields) {} + + private static MergedResult mergeMappings(List mappingsPerIndex) { + int numberOfIndices = mappingsPerIndex.size(); + Map> columnNamesToFieldByIndices = new HashMap<>(); + for (var mappingPerIndex : mappingsPerIndex) { + for (var entry : mappingPerIndex.mapping().entrySet()) { + String columnName = entry.getKey(); + EsField field = entry.getValue(); + columnNamesToFieldByIndices.computeIfAbsent(columnName, k -> new HashMap<>()).put(mappingPerIndex.index(), field); + } + } + + var partiallyUnmappedFields = columnNamesToFieldByIndices.entrySet() + .stream() + .filter(e -> e.getValue().size() < numberOfIndices) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + var mappings = columnNamesToFieldByIndices.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> mergeFields(e.getKey(), e.getValue()))); + return new MergedResult(mappings, partiallyUnmappedFields); + } + + private static EsField mergeFields(String index, Map columnNameToField) { + var indexFields = columnNameToField.values(); + if (indexFields.stream().distinct().count() > 1) { + var typesToIndices = new HashMap>(); + for (var typeToIndex : columnNameToField.entrySet()) { + typesToIndices.computeIfAbsent(typeToIndex.getValue().getDataType().typeName(), k -> new HashSet<>()) + .add(typeToIndex.getKey()); + } + return new InvalidMappedField(index, typesToIndices); + } else { + return indexFields.iterator().next(); + } + } + + private static EnrichResolution loadEnrichPolicies() { + EnrichResolution enrichResolution = new EnrichResolution(); + for (CsvTestsDataLoader.EnrichConfig policyConfig : CsvTestsDataLoader.ENRICH_POLICIES) { + EnrichPolicy policy = loadEnrichPolicyMapping(policyConfig.policyFileName()); + CsvTestsDataLoader.TestDataset sourceIndex = CSV_DATASET_MAP.get(policy.getIndices().get(0)); + // this could practically work, but it's wrong: + // EnrichPolicyResolution should contain the policy (system) index, not the source index + EsIndex esIndex = loadIndexResolution(CsvTestsDataLoader.MultiIndexTestDataset.of(sourceIndex.withTypeMapping(Map.of()))).get(); + var concreteIndices = Map.of(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, Iterables.get(esIndex.concreteIndices(), 0)); + enrichResolution.addResolvedPolicy( + policyConfig.policyName(), + Enrich.Mode.ANY, + new ResolvedEnrichPolicy( + policy.getMatchField(), + policy.getType(), + policy.getEnrichFields(), + concreteIndices, + esIndex.mapping() + ) + ); + } + return enrichResolution; + } + + private static EnrichPolicy loadEnrichPolicyMapping(String policyFileName) { + URL policyMapping = CsvTestsDataLoader.class.getResource("/" + policyFileName); + assertThat(policyMapping, is(notNullValue())); + try { + String fileContent = CsvTestsDataLoader.readTextFile(policyMapping); + return EnrichPolicy.fromXContent(JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, fileContent)); + } catch (IOException e) { + throw new IllegalArgumentException("Cannot read resource " + policyFileName); + } + } + + private LogicalPlan analyzedPlan(LogicalPlan parsed, CsvTestsDataLoader.MultiIndexTestDataset datasets) { + var indexResolution = loadIndexResolution(datasets); + var enrichPolicies = loadEnrichPolicies(); + var analyzer = new Analyzer( + new AnalyzerContext(configuration, functionRegistry, indexResolution, enrichPolicies, emptyInferenceResolution()), + TEST_VERIFIER + ); + LogicalPlan plan = analyzer.analyze(parsed); + plan.setAnalyzed(); + LOGGER.debug("Analyzed plan:\n{}", plan); + return plan; + } + + private static CsvTestsDataLoader.MultiIndexTestDataset testDatasets(LogicalPlan parsed) { + var preAnalysis = new PreAnalyzer().preAnalyze(parsed); + if (preAnalysis.indexPattern() == null) { + // If the data set doesn't matter we'll just grab one we know works. Employees is fine. + return CsvTestsDataLoader.MultiIndexTestDataset.of(CSV_DATASET_MAP.get("employees")); + } + + String indexName = preAnalysis.indexPattern().indexPattern(); + List datasets = new ArrayList<>(); + if (indexName.endsWith("*")) { + String indexPrefix = indexName.substring(0, indexName.length() - 1); + for (var entry : CSV_DATASET_MAP.entrySet()) { + if (entry.getKey().startsWith(indexPrefix)) { + datasets.add(entry.getValue()); + } + } + } else { + for (String index : indexName.split(",")) { + var dataset = CSV_DATASET_MAP.get(index); + if (dataset == null) { + throw new IllegalArgumentException("unknown CSV dataset for table [" + index + "]"); + } + datasets.add(dataset); + } + } + if (datasets.isEmpty()) { + throw new IllegalArgumentException("unknown CSV dataset for table [" + indexName + "]"); + } + return new CsvTestsDataLoader.MultiIndexTestDataset(indexName, datasets); + } + + private static TestPhysicalOperationProviders testOperationProviders( + FoldContext foldCtx, + CsvTestsDataLoader.MultiIndexTestDataset datasets + ) throws Exception { + var indexPages = new ArrayList(); + for (CsvTestsDataLoader.TestDataset dataset : datasets.datasets()) { + var testData = loadPageFromCsv(GenerativeCsvTests.class.getResource("/data/" + dataset.dataFileName()), dataset.typeMapping()); + Set mappedFields = loadMapping(dataset.mappingFileName()).keySet(); + indexPages.add(new TestPhysicalOperationProviders.IndexPage(dataset.indexName(), testData.v1(), testData.v2(), mappedFields)); + } + return TestPhysicalOperationProviders.create(foldCtx, indexPages); + } + + @Override + public QueryExecuted execute(String command, int depth) { + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); + try { + ActualResults result = executePlan(command, bigArrays); + + return new QueryExecuted(command, depth, toColumns(result.columnNames(), result.columnTypes()), data(result), null); + } catch (Exception e) { + return new QueryExecuted(command, depth, null, null, e); + } + } + + private List toColumns(List names, List types) { + List columns = new ArrayList<>(names.size()); + for (int i = 0; i < names.size(); i++) { + columns.add(new Column(names.get(i), types.get(i).name(), List.of())); + } + return columns; + } + + private List> data(ActualResults queryResult) { + List> result = new ArrayList<>(); + if (queryResult.pages().isEmpty()) { + return result; + } + int cols = queryResult.columnNames().size(); + for (Page page : queryResult.pages()) { + for (int p = 0; p < page.getPositionCount(); p++) { + List row = new ArrayList<>(cols); + for (int c = 0; c < cols; c++) { + row.add(value(page.getBlock(c), p)); + } + result.add(row); + } + } + return result; + } + + private Object value(Block block, int position) { + BytesRef spare = new BytesRef(); + int valueCount = block.getValueCount(position); + int idx = block.getFirstValueIndex(position); + if (valueCount == 0) { + return null; + } else if (valueCount == 1) { + return extract(block, idx, spare); + } else { + List values = new ArrayList<>(valueCount); + for (int i = 0; i < valueCount; i++) { + values.add(extract(block, idx + i, spare)); + } + return values; + } + } + + private static Object extract(Block block, int idx, BytesRef spare) { + return switch (block) { + case ConstantNullBlock n -> null; + case BooleanBlock b -> b.getBoolean(idx); + case BytesRefBlock b -> b.getBytesRef(idx, spare); + case IntBlock i -> i.getInt(idx); + case LongBlock l -> l.getLong(idx); + case FloatBlock f -> f.getFloat(idx); + case DoubleBlock d -> d.getDouble(idx); + default -> throw new IllegalArgumentException("Unsupported block type: " + block.getClass()); + }; + } + + private ActualResults executePlan(String query, BigArrays bigArrays) throws Exception { + LogicalPlan parsed = parser.createStatement(query, EsqlTestUtils.TEST_CFG); + var testDatasets = testDatasets(parsed); + LogicalPlan analyzed = analyzedPlan(parsed, testDatasets); + + FoldContext foldCtx = FoldContext.small(); + EsqlSession session = new EsqlSession( + getTestName(), + configuration, + null, + null, + null, + new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(foldCtx)), + functionRegistry, + new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration, foldCtx)), + mapper, + TEST_VERIFIER, + new PlanTelemetry(functionRegistry), + null, + EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES + ); + TestPhysicalOperationProviders physicalOperationProviders = testOperationProviders(foldCtx, testDatasets); + + PlainActionFuture listener = new PlainActionFuture<>(); + + session.preOptimizedPlan(analyzed, listener.delegateFailureAndWrap((l, preOptimized) -> { + session.executeOptimizedPlan( + new EsqlQueryRequest(), + new EsqlExecutionInfo(randomBoolean()), + planRunner(bigArrays, foldCtx, physicalOperationProviders), + session.optimizedPlan(preOptimized), + listener.delegateFailureAndWrap( + // Wrap so we can capture the warnings in the calling thread + (next, result) -> next.onResponse( + new ActualResults( + result.schema().stream().map(Attribute::name).toList(), + result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(), + result.schema().stream().map(Attribute::dataType).toList(), + result.pages(), + threadPool.getThreadContext().getResponseHeaders() + ) + ) + ) + ); + })); + + return listener.get(); + } + + private Settings randomNodeSettings() { + Settings.Builder builder = Settings.builder(); + if (randomBoolean()) { + builder.put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_SIZE_SETTING, ByteSizeValue.ofBytes(randomIntBetween(0, 4096))); + builder.put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_MAX_SIZE_SETTING, ByteSizeValue.ofBytes(randomIntBetween(0, 16 * 1024))); + } + return builder.build(); + } + + // Asserts that the serialization and deserialization of the plan creates an equivalent plan. + private void opportunisticallyAssertPlanSerialization(PhysicalPlan plan) { + if (plan.anyMatch( + p -> p instanceof LocalSourceExec || p instanceof HashJoinExec || p instanceof ChangePointExec || p instanceof MergeExec + )) { + return; + } + SerializationTestUtils.assertSerialization(plan, configuration); + } + + PlanRunner planRunner(BigArrays bigArrays, FoldContext foldCtx, TestPhysicalOperationProviders physicalOperationProviders) { + return (physicalPlan, listener) -> executeSubPlan(bigArrays, foldCtx, physicalOperationProviders, physicalPlan, listener); + } + + void executeSubPlan( + BigArrays bigArrays, + FoldContext foldCtx, + TestPhysicalOperationProviders physicalOperationProviders, + PhysicalPlan physicalPlan, + ActionListener listener + ) { + // Keep in sync with ComputeService#execute + opportunisticallyAssertPlanSerialization(physicalPlan); + Tuple coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode( + physicalPlan, + configuration + ); + PhysicalPlan coordinatorPlan = coordinatorAndDataNodePlan.v1(); + PhysicalPlan dataNodePlan = coordinatorAndDataNodePlan.v2(); + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Coordinator plan\n" + coordinatorPlan); + LOGGER.trace("DataNode plan\n" + dataNodePlan); + } + + BlockFactory blockFactory = new BlockFactory( + bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), + bigArrays, + ByteSizeValue.ofBytes(randomLongBetween(1, BlockFactory.DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE.getBytes() * 2)) + ); + ExchangeSourceHandler exchangeSource = new ExchangeSourceHandler(between(1, 64), executor); + ExchangeSinkHandler exchangeSink = new ExchangeSinkHandler(blockFactory, between(1, 64), threadPool::relativeTimeInMillis); + + LocalExecutionPlanner executionPlanner = new LocalExecutionPlanner( + getTestName(), + "", + new CancellableTask(1, "transport", "esql", null, TaskId.EMPTY_TASK_ID, Map.of()), + bigArrays, + blockFactory, + randomNodeSettings(), + configuration, + exchangeSource::createExchangeSource, + () -> exchangeSink.createExchangeSink(() -> {}), + mock(EnrichLookupService.class), + mock(LookupFromIndexService.class), + mock(InferenceService.class), + physicalOperationProviders, + List.of() + ); + + List collectedPages = Collections.synchronizedList(new ArrayList<>()); + + // replace fragment inside the coordinator plan + List drivers = new ArrayList<>(); + LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan( + "final", + foldCtx, + new OutputExec(coordinatorPlan, collectedPages::add) + ); + drivers.addAll(coordinatorNodeExecutionPlan.createDrivers(getTestName())); + if (dataNodePlan != null) { + var searchStats = new DisabledSearchStats(); + var logicalTestOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats)); + var flags = new EsqlFlags(true); + var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer( + new LocalPhysicalOptimizerContext(TEST_PLANNER_SETTINGS, flags, configuration, foldCtx, searchStats) + ); + + var csvDataNodePhysicalPlan = PlannerUtils.localPlan(dataNodePlan, logicalTestOptimizer, physicalTestOptimizer); + exchangeSource.addRemoteSink( + exchangeSink::fetchPageAsync, + Randomness.get().nextBoolean(), + () -> {}, + randomIntBetween(1, 3), + ActionListener.noop().delegateResponse((l, e) -> { + throw new AssertionError("expected no failure", e); + }) + ); + LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan("data", foldCtx, csvDataNodePhysicalPlan); + + drivers.addAll(dataNodeExecutionPlan.createDrivers(getTestName())); + Randomness.shuffle(drivers); + } + // Execute the drivers + DriverRunner runner = new DriverRunner(threadPool.getThreadContext()) { + @Override + protected void start(Driver driver, ActionListener driverListener) { + Driver.start(threadPool.getThreadContext(), executor, driver, between(1, 1000), driverListener); + } + }; + listener = ActionListener.releaseAfter(listener, () -> Releasables.close(drivers)); + runner.runToCompletion( + drivers, + listener.map(ignore -> new Result(physicalPlan.output(), collectedPages, DriverCompletionInfo.EMPTY, null)) + ); + } +}