-
Notifications
You must be signed in to change notification settings - Fork 25.6k
[ES|QL] Add a TS variation of GenerativeIT #133804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
82296eb
f150261
820f4b7
3a8c89b
5235b80
d0e24d1
044e127
44bdb2b
d9afeb1
41f884b
f775ea8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.esql.qa.single_node; | ||
|
|
||
| import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; | ||
|
|
||
| import org.elasticsearch.test.TestClustersThreadFilter; | ||
| import org.elasticsearch.test.cluster.ElasticsearchCluster; | ||
| import org.elasticsearch.xpack.esql.qa.rest.generative.EsqlQueryGenerator; | ||
| import org.elasticsearch.xpack.esql.qa.rest.generative.GenerativeRestTest; | ||
| import org.elasticsearch.xpack.esql.qa.rest.generative.command.CommandGenerator; | ||
| import org.junit.ClassRule; | ||
|
|
||
| @ThreadLeakFilters(filters = TestClustersThreadFilter.class) | ||
| public class GenerativeMetricsIT extends GenerativeRestTest { | ||
| @ClassRule | ||
| public static ElasticsearchCluster cluster = Clusters.testCluster(); | ||
|
|
||
| @Override | ||
| protected String getTestRestCluster() { | ||
| return cluster.getHttpAddresses(); | ||
| } | ||
|
|
||
| @Override | ||
| protected boolean supportsSourceFieldMapping() { | ||
| return cluster.getNumNodes() == 1; | ||
| } | ||
|
|
||
| @Override | ||
| protected CommandGenerator sourceCommand() { | ||
| return EsqlQueryGenerator.timeSeriesSourceCommand(); | ||
| } | ||
|
|
||
| @Override | ||
| protected boolean requiresTimeSeries() { | ||
| return true; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,12 +23,15 @@ | |
| import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.RenameGenerator; | ||
| import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.SortGenerator; | ||
| import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.StatsGenerator; | ||
| import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.TimeSeriesStatsGenerator; | ||
| import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.WhereGenerator; | ||
| import org.elasticsearch.xpack.esql.qa.rest.generative.command.source.FromGenerator; | ||
| import org.elasticsearch.xpack.esql.qa.rest.generative.command.source.TimeSeriesGenerator; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
|
|
||
| import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; | ||
| import static org.elasticsearch.test.ESTestCase.randomBoolean; | ||
|
|
@@ -51,6 +54,11 @@ public record QueryExecuted(String query, int depth, List<Column> outputSchema, | |
| */ | ||
| static List<CommandGenerator> SOURCE_COMMANDS = List.of(FromGenerator.INSTANCE); | ||
|
|
||
| /** | ||
| * Commands at the beginning of queries that begin queries on time series indices, eg. TS | ||
| */ | ||
| static List<CommandGenerator> TIME_SERIES_SOURCE_COMMANDS = List.of(TimeSeriesGenerator.INSTANCE); | ||
|
|
||
| /** | ||
| * These are downstream commands, ie. that cannot appear as the first command in a query | ||
| */ | ||
|
|
@@ -72,14 +80,27 @@ public record QueryExecuted(String query, int depth, List<Column> outputSchema, | |
| WhereGenerator.INSTANCE | ||
| ); | ||
|
|
||
| static List<CommandGenerator> TIME_SERIES_PIPE_COMMANDS = Stream.concat( | ||
| PIPE_COMMANDS.stream(), | ||
| Stream.of(TimeSeriesStatsGenerator.INSTANCE) | ||
| ).toList(); | ||
|
|
||
| public static CommandGenerator sourceCommand() { | ||
| return randomFrom(SOURCE_COMMANDS); | ||
| } | ||
|
|
||
| public static CommandGenerator timeSeriesSourceCommand() { | ||
| return randomFrom(TIME_SERIES_SOURCE_COMMANDS); | ||
| } | ||
|
|
||
| public static CommandGenerator randomPipeCommandGenerator() { | ||
| return randomFrom(PIPE_COMMANDS); | ||
| } | ||
|
|
||
| public static CommandGenerator randomMetricsPipeCommandGenerator() { | ||
| return randomFrom(TIME_SERIES_PIPE_COMMANDS); | ||
| } | ||
|
|
||
| public interface Executor { | ||
| void run(CommandGenerator generator, CommandGenerator.CommandDescription current); | ||
|
|
||
|
|
@@ -95,7 +116,8 @@ public static void generatePipeline( | |
| final int depth, | ||
| CommandGenerator commandGenerator, | ||
| final CommandGenerator.QuerySchema schema, | ||
| Executor executor | ||
| Executor executor, | ||
| boolean isTimeSeries | ||
| ) { | ||
| CommandGenerator.CommandDescription desc = commandGenerator.generate(List.of(), List.of(), schema); | ||
| executor.run(commandGenerator, desc); | ||
|
|
@@ -107,7 +129,7 @@ public static void generatePipeline( | |
| if (executor.currentSchema().isEmpty()) { | ||
| break; | ||
| } | ||
| commandGenerator = EsqlQueryGenerator.randomPipeCommandGenerator(); | ||
| commandGenerator = isTimeSeries ? randomMetricsPipeCommandGenerator() : randomPipeCommandGenerator(); | ||
| desc = commandGenerator.generate(executor.previousCommands(), executor.currentSchema(), schema); | ||
| if (desc == CommandGenerator.EMPTY_DESCRIPTION) { | ||
| continue; | ||
|
|
@@ -217,6 +239,82 @@ public static boolean sortable(Column col) { | |
| || col.type.equals("version"); | ||
| } | ||
|
|
||
| public static String metricsAgg(List<Column> previousOutput) { | ||
| String outerCommand = randomFrom("min", "max", "sum", "count", "avg"); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there an easy way to include a wider range of aggregation functions that apply on numerics? E.g. std_dev and top should be applicable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this list is probably the closest thing we'd have to that? Could exclude the commands that don't apply? (*_over_time functions, spatial, i/rate, and those 3 internal functions?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, let's give it a shot. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reviewing the list a little bit more closely, it looks like some of the names (as |
||
| String innerCommand = switch (randomIntBetween(0, 3)) { | ||
| case 0 -> { | ||
| // input can be numerics + aggregate_metric_double | ||
| String numericPlusAggMetricFieldName = randomMetricsNumericField(previousOutput); | ||
| if (numericPlusAggMetricFieldName == null) { | ||
| yield null; | ||
| } | ||
| yield switch ((randomIntBetween(0, 6))) { | ||
| case 0 -> "max_over_time(" + numericPlusAggMetricFieldName + ")"; | ||
| case 1 -> "min_over_time(" + numericPlusAggMetricFieldName + ")"; | ||
| case 2 -> "sum_over_time(" + numericPlusAggMetricFieldName + ")"; | ||
| case 3 -> { | ||
| if (outerCommand.equals("sum") || outerCommand.equals("avg")) { | ||
| yield null; | ||
| } | ||
| yield "present_over_time(" + numericPlusAggMetricFieldName + ")"; | ||
limotova marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| case 4 -> { | ||
| if (outerCommand.equals("sum") || outerCommand.equals("avg")) { | ||
| yield null; | ||
| } | ||
| yield "absent_over_time(" + numericPlusAggMetricFieldName + ")"; | ||
| } | ||
| case 5 -> "count_over_time(" + numericPlusAggMetricFieldName + ")"; | ||
| default -> "avg_over_time(" + numericPlusAggMetricFieldName + ")"; | ||
limotova marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| }; | ||
| } | ||
| case 1 -> { | ||
| // input can be a counter | ||
| String counterField = randomCounterField(previousOutput); | ||
| if (counterField == null) { | ||
| yield null; | ||
| } | ||
| yield "rate(" + counterField + ")"; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or last|first_over_time, once supported. |
||
| } | ||
| case 2 -> { | ||
| // numerics except aggregate_metric_double | ||
| // TODO: add to case 0 when support for aggregate_metric_double is added to these functions | ||
| // TODO: add to case 1 when support for counters is added | ||
| String numericFieldName = randomNumericField(previousOutput); | ||
| if (numericFieldName == null) { | ||
| yield null; | ||
| } | ||
| if (previousOutput.stream() | ||
| .noneMatch( | ||
| column -> column.name.equals("@timestamp") && (column.type.equals("date_nanos") || column.type.equals("datetime")) | ||
| )) { | ||
| // first_over_time and last_over_time require @timestamp to be available and be either datetime or date_nanos | ||
| yield null; | ||
| } | ||
| yield (randomBoolean() ? "first_over_time(" : "last_over_time(") + numericFieldName + ")"; | ||
| } | ||
| default -> { | ||
| // TODO: add other types that count_over_time supports | ||
| String otherFieldName = randomBoolean() ? randomStringField(previousOutput) : randomNumericOrDateField(previousOutput); | ||
| if (otherFieldName == null) { | ||
| yield null; | ||
| } | ||
| if (randomBoolean()) { | ||
| yield "count_over_time(" + otherFieldName + ")"; | ||
| } else { | ||
| yield "count_distinct_over_time(" + otherFieldName + ")"; | ||
| // TODO: replace with the below | ||
| // yield "count_distinct_over_time(" + otherFieldName + (randomBoolean() ? ", " + randomNonNegativeInt() : "") + ")"; | ||
| } | ||
| } | ||
| }; | ||
| if (innerCommand == null) { | ||
| // TODO: figure out a default that maybe makes more sense than using a timestamp field | ||
| innerCommand = "count_over_time(" + randomDateField(previousOutput) + ")"; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah we'd need to think of the semantics.. Let's file an issue for the bug btw, good catch :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I will double-check the state of these bugs first before opening an issue; running a few queries I'm getting some completely new bugs so it's possible that some of them might be fixed. |
||
| } | ||
| return outerCommand + "(" + innerCommand + ")"; | ||
| } | ||
|
|
||
| public static String agg(List<Column> previousOutput) { | ||
| String name = randomNumericOrDateField(previousOutput); | ||
| if (name != null && randomBoolean()) { | ||
|
|
@@ -251,6 +349,30 @@ public static String randomNumericField(List<Column> previousOutput) { | |
| return randomName(previousOutput, Set.of("long", "integer", "double")); | ||
| } | ||
|
|
||
| public static String randomMetricsNumericField(List<Column> previousOutput) { | ||
| Set<String> allowedTypes = Set.of("double", "long", "unsigned_long", "integer", "aggregate_metric_double"); | ||
| List<String> items = previousOutput.stream() | ||
| .filter( | ||
| x -> allowedTypes.contains(x.type()) | ||
| || (x.type().equals("unsupported") && canBeCastedToAggregateMetricDouble(x.originalTypes())) | ||
| ) | ||
| .map(Column::name) | ||
| .toList(); | ||
| if (items.isEmpty()) { | ||
| return null; | ||
| } | ||
| return items.get(randomIntBetween(0, items.size() - 1)); | ||
| } | ||
|
|
||
| public static String randomCounterField(List<Column> previousOutput) { | ||
| return randomName(previousOutput, Set.of("counter_long", "counter_double", "counter_integer")); | ||
| } | ||
|
|
||
| private static boolean canBeCastedToAggregateMetricDouble(List<String> types) { | ||
| return types.contains("aggregate_metric_double") | ||
| && Set.of("double", "long", "unsigned_long", "integer", "aggregate_metric_double").containsAll(types); | ||
| } | ||
|
|
||
| public static String randomStringField(List<Column> previousOutput) { | ||
| return randomName(previousOutput, Set.of("text", "keyword")); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe; | ||
|
|
||
| import org.elasticsearch.xpack.esql.qa.rest.generative.EsqlQueryGenerator; | ||
| import org.elasticsearch.xpack.esql.qa.rest.generative.command.CommandGenerator; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.elasticsearch.test.ESTestCase.randomBoolean; | ||
| import static org.elasticsearch.test.ESTestCase.randomIntBetween; | ||
| import static org.elasticsearch.xpack.esql.qa.rest.generative.EsqlQueryGenerator.randomDateField; | ||
|
|
||
| public class TimeSeriesStatsGenerator implements CommandGenerator { | ||
|
|
||
| public static final String STATS = "stats"; | ||
| public static final CommandGenerator INSTANCE = new TimeSeriesStatsGenerator(); | ||
|
|
||
| @Override | ||
| public CommandDescription generate( | ||
| List<CommandDescription> previousCommands, | ||
| List<EsqlQueryGenerator.Column> previousOutput, | ||
| QuerySchema schema | ||
| ) { | ||
| // generates stats in the form of: | ||
| // `STATS some_aggregation(some_field) by optional_grouping_field, non_optional = bucket(time_field, 5minute)` | ||
| // where `some_aggregation` can be a time series aggregation in the form of agg1(agg2_over_time(some_field)), | ||
| // or a regular aggregation. | ||
| // There is a variable number of aggregations per pipe | ||
|
|
||
| List<EsqlQueryGenerator.Column> nonNull = previousOutput.stream() | ||
| .filter(EsqlQueryGenerator::fieldCanBeUsed) | ||
| .filter(x -> x.type().equals("null") == false) | ||
| .collect(Collectors.toList()); | ||
| if (nonNull.isEmpty()) { | ||
| return EMPTY_DESCRIPTION; | ||
| } | ||
| String timestamp = randomDateField(nonNull); | ||
| // if there's no timestamp field left, there's nothing to bucket on | ||
| if (timestamp == null) { | ||
| return EMPTY_DESCRIPTION; | ||
| } | ||
|
|
||
| StringBuilder cmd = new StringBuilder(" | stats "); | ||
|
|
||
| // TODO: increase range max to 5 | ||
| int nStats = randomIntBetween(1, 2); | ||
| for (int i = 0; i < nStats; i++) { | ||
| String name; | ||
| if (randomBoolean()) { | ||
| name = EsqlQueryGenerator.randomIdentifier(); | ||
| } else { | ||
| name = EsqlQueryGenerator.randomName(previousOutput); | ||
| if (name == null) { | ||
| name = EsqlQueryGenerator.randomIdentifier(); | ||
| } | ||
| } | ||
| // generate the aggregation | ||
| String expression = randomBoolean() ? EsqlQueryGenerator.metricsAgg(nonNull) : EsqlQueryGenerator.agg(nonNull); | ||
limotova marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (i > 0) { | ||
| cmd.append(","); | ||
| } | ||
| cmd.append(" "); | ||
| cmd.append(name); | ||
| cmd.append(" = "); | ||
| cmd.append(expression); | ||
| } | ||
|
|
||
| cmd.append(" by "); | ||
| if (randomBoolean()) { | ||
| var col = EsqlQueryGenerator.randomGroupableName(nonNull); | ||
| if (col != null) { | ||
| cmd.append(col + ", "); | ||
| } | ||
| } | ||
| // TODO: add alternative time buckets | ||
| // TODO: replace name of bucket with half chance of being EsqlQueryGenerator.randomName(previousOutput) if | ||
| // is fixed https://github.com/elastic/elasticsearch/issues/134796 | ||
| cmd.append(EsqlQueryGenerator.randomIdentifier() + " = bucket(" + timestamp + ",1hour)"); | ||
| return new CommandDescription(STATS, this, cmd.toString(), Map.of()); | ||
| } | ||
|
|
||
| @Override | ||
| public ValidationResult validateOutput( | ||
| List<CommandDescription> previousCommands, | ||
| CommandDescription commandDescription, | ||
| List<EsqlQueryGenerator.Column> previousColumns, | ||
| List<List<Object>> previousOutput, | ||
| List<EsqlQueryGenerator.Column> columns, | ||
| List<List<Object>> output | ||
| ) { | ||
| // TODO validate columns | ||
| return VALIDATION_OK; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.