Skip to content

Commit 8d6f446

Browse files
authored
[ES|QL] Add a TS variation of GenerativeIT (#133804)
This commit adds a variation of the GenerativeRestTest in ES|QL that queries indices marked as time series indices and runs time series aggregations on them (amongst all the other commands already supported in the Generative tests), with a lot of limitations.
1 parent e1bbaf1 commit 8d6f446

File tree

7 files changed

+398
-10
lines changed

7 files changed

+398
-10
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.qa.single_node;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
11+
12+
import org.elasticsearch.test.TestClustersThreadFilter;
13+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
14+
import org.elasticsearch.xpack.esql.qa.rest.generative.EsqlQueryGenerator;
15+
import org.elasticsearch.xpack.esql.qa.rest.generative.GenerativeRestTest;
16+
import org.elasticsearch.xpack.esql.qa.rest.generative.command.CommandGenerator;
17+
import org.junit.ClassRule;
18+
19+
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
20+
public class GenerativeMetricsIT extends GenerativeRestTest {
21+
@ClassRule
22+
public static ElasticsearchCluster cluster = Clusters.testCluster();
23+
24+
@Override
25+
protected String getTestRestCluster() {
26+
return cluster.getHttpAddresses();
27+
}
28+
29+
@Override
30+
protected boolean supportsSourceFieldMapping() {
31+
return cluster.getNumNodes() == 1;
32+
}
33+
34+
@Override
35+
protected CommandGenerator sourceCommand() {
36+
return EsqlQueryGenerator.timeSeriesSourceCommand();
37+
}
38+
39+
@Override
40+
protected boolean requiresTimeSeries() {
41+
return true;
42+
}
43+
}

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/EsqlQueryGenerator.java

Lines changed: 146 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@
2323
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.RenameGenerator;
2424
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.SortGenerator;
2525
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.StatsGenerator;
26+
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.TimeSeriesStatsGenerator;
2627
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.WhereGenerator;
2728
import org.elasticsearch.xpack.esql.qa.rest.generative.command.source.FromGenerator;
29+
import org.elasticsearch.xpack.esql.qa.rest.generative.command.source.TimeSeriesGenerator;
2830

2931
import java.util.List;
3032
import java.util.Set;
3133
import java.util.stream.Collectors;
34+
import java.util.stream.Stream;
3235

3336
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
3437
import static org.elasticsearch.test.ESTestCase.randomBoolean;
@@ -51,6 +54,11 @@ public record QueryExecuted(String query, int depth, List<Column> outputSchema,
5154
*/
5255
static List<CommandGenerator> SOURCE_COMMANDS = List.of(FromGenerator.INSTANCE);
5356

57+
/**
58+
* Commands at the beginning of queries that begin queries on time series indices, eg. TS
59+
*/
60+
static List<CommandGenerator> TIME_SERIES_SOURCE_COMMANDS = List.of(TimeSeriesGenerator.INSTANCE);
61+
5462
/**
5563
* These are downstream commands, ie. that cannot appear as the first command in a query
5664
*/
@@ -72,14 +80,27 @@ public record QueryExecuted(String query, int depth, List<Column> outputSchema,
7280
WhereGenerator.INSTANCE
7381
);
7482

83+
static List<CommandGenerator> TIME_SERIES_PIPE_COMMANDS = Stream.concat(
84+
PIPE_COMMANDS.stream(),
85+
Stream.of(TimeSeriesStatsGenerator.INSTANCE)
86+
).toList();
87+
7588
public static CommandGenerator sourceCommand() {
7689
return randomFrom(SOURCE_COMMANDS);
7790
}
7891

92+
public static CommandGenerator timeSeriesSourceCommand() {
93+
return randomFrom(TIME_SERIES_SOURCE_COMMANDS);
94+
}
95+
7996
public static CommandGenerator randomPipeCommandGenerator() {
8097
return randomFrom(PIPE_COMMANDS);
8198
}
8299

100+
public static CommandGenerator randomMetricsPipeCommandGenerator() {
101+
return randomFrom(TIME_SERIES_PIPE_COMMANDS);
102+
}
103+
83104
public interface Executor {
84105
void run(CommandGenerator generator, CommandGenerator.CommandDescription current);
85106

@@ -95,8 +116,10 @@ public static void generatePipeline(
95116
final int depth,
96117
CommandGenerator commandGenerator,
97118
final CommandGenerator.QuerySchema schema,
98-
Executor executor
119+
Executor executor,
120+
boolean isTimeSeries
99121
) {
122+
boolean canGenerateTimeSeries = isTimeSeries;
100123
CommandGenerator.CommandDescription desc = commandGenerator.generate(List.of(), List.of(), schema);
101124
executor.run(commandGenerator, desc);
102125
if (executor.continueExecuting() == false) {
@@ -107,7 +130,28 @@ public static void generatePipeline(
107130
if (executor.currentSchema().isEmpty()) {
108131
break;
109132
}
110-
commandGenerator = EsqlQueryGenerator.randomPipeCommandGenerator();
133+
boolean commandAllowed = false;
134+
while (commandAllowed == false) {
135+
commandGenerator = isTimeSeries && canGenerateTimeSeries
136+
? randomMetricsPipeCommandGenerator()
137+
: randomPipeCommandGenerator();
138+
if (isTimeSeries == false) {
139+
commandAllowed = true;
140+
} else {
141+
if (commandGenerator.equals(TimeSeriesStatsGenerator.INSTANCE) || commandGenerator.equals(StatsGenerator.INSTANCE)) {
142+
if (canGenerateTimeSeries) {
143+
canGenerateTimeSeries = false;
144+
commandAllowed = true;
145+
}
146+
} else if (commandGenerator.equals(RenameGenerator.INSTANCE)) {
147+
// https://github.com/elastic/elasticsearch/issues/134994
148+
canGenerateTimeSeries = false;
149+
commandAllowed = true;
150+
} else {
151+
commandAllowed = true;
152+
}
153+
}
154+
}
111155
desc = commandGenerator.generate(executor.previousCommands(), executor.currentSchema(), schema);
112156
if (desc == CommandGenerator.EMPTY_DESCRIPTION) {
113157
continue;
@@ -217,6 +261,82 @@ public static boolean sortable(Column col) {
217261
|| col.type.equals("version");
218262
}
219263

264+
public static String metricsAgg(List<Column> previousOutput) {
265+
String outerCommand = randomFrom("min", "max", "sum", "count", "avg");
266+
String innerCommand = switch (randomIntBetween(0, 3)) {
267+
case 0 -> {
268+
// input can be numerics + aggregate_metric_double
269+
String numericPlusAggMetricFieldName = randomMetricsNumericField(previousOutput);
270+
if (numericPlusAggMetricFieldName == null) {
271+
yield null;
272+
}
273+
yield switch ((randomIntBetween(0, 6))) {
274+
case 0 -> "max_over_time(" + numericPlusAggMetricFieldName + ")";
275+
case 1 -> "min_over_time(" + numericPlusAggMetricFieldName + ")";
276+
case 2 -> "sum_over_time(" + numericPlusAggMetricFieldName + ")";
277+
case 3 -> {
278+
if (outerCommand.equals("sum") || outerCommand.equals("avg")) {
279+
yield null;
280+
}
281+
yield "present_over_time(" + numericPlusAggMetricFieldName + ")";
282+
}
283+
case 4 -> {
284+
if (outerCommand.equals("sum") || outerCommand.equals("avg")) {
285+
yield null;
286+
}
287+
yield "absent_over_time(" + numericPlusAggMetricFieldName + ")";
288+
}
289+
case 5 -> "count_over_time(" + numericPlusAggMetricFieldName + ")";
290+
default -> "avg_over_time(" + numericPlusAggMetricFieldName + ")";
291+
};
292+
}
293+
case 1 -> {
294+
// input can be a counter
295+
String counterField = randomCounterField(previousOutput);
296+
if (counterField == null) {
297+
yield null;
298+
}
299+
yield "rate(" + counterField + ")";
300+
}
301+
case 2 -> {
302+
// numerics except aggregate_metric_double
303+
// TODO: add to case 0 when support for aggregate_metric_double is added to these functions
304+
// TODO: add to case 1 when support for counters is added
305+
String numericFieldName = randomNumericField(previousOutput);
306+
if (numericFieldName == null) {
307+
yield null;
308+
}
309+
if (previousOutput.stream()
310+
.noneMatch(
311+
column -> column.name.equals("@timestamp") && (column.type.equals("date_nanos") || column.type.equals("datetime"))
312+
)) {
313+
// first_over_time and last_over_time require @timestamp to be available and be either datetime or date_nanos
314+
yield null;
315+
}
316+
yield (randomBoolean() ? "first_over_time(" : "last_over_time(") + numericFieldName + ")";
317+
}
318+
default -> {
319+
// TODO: add other types that count_over_time supports
320+
String otherFieldName = randomBoolean() ? randomStringField(previousOutput) : randomNumericOrDateField(previousOutput);
321+
if (otherFieldName == null) {
322+
yield null;
323+
}
324+
if (randomBoolean()) {
325+
yield "count_over_time(" + otherFieldName + ")";
326+
} else {
327+
yield "count_distinct_over_time(" + otherFieldName + ")";
328+
// TODO: replace with the below
329+
// yield "count_distinct_over_time(" + otherFieldName + (randomBoolean() ? ", " + randomNonNegativeInt() : "") + ")";
330+
}
331+
}
332+
};
333+
if (innerCommand == null) {
334+
// TODO: figure out a default that maybe makes more sense than using a timestamp field
335+
innerCommand = "count_over_time(" + randomDateField(previousOutput) + ")";
336+
}
337+
return outerCommand + "(" + innerCommand + ")";
338+
}
339+
220340
public static String agg(List<Column> previousOutput) {
221341
String name = randomNumericOrDateField(previousOutput);
222342
if (name != null && randomBoolean()) {
@@ -251,6 +371,30 @@ public static String randomNumericField(List<Column> previousOutput) {
251371
return randomName(previousOutput, Set.of("long", "integer", "double"));
252372
}
253373

374+
public static String randomMetricsNumericField(List<Column> previousOutput) {
375+
Set<String> allowedTypes = Set.of("double", "long", "unsigned_long", "integer", "aggregate_metric_double");
376+
List<String> items = previousOutput.stream()
377+
.filter(
378+
x -> allowedTypes.contains(x.type())
379+
|| (x.type().equals("unsupported") && canBeCastedToAggregateMetricDouble(x.originalTypes()))
380+
)
381+
.map(Column::name)
382+
.toList();
383+
if (items.isEmpty()) {
384+
return null;
385+
}
386+
return items.get(randomIntBetween(0, items.size() - 1));
387+
}
388+
389+
public static String randomCounterField(List<Column> previousOutput) {
390+
return randomName(previousOutput, Set.of("counter_long", "counter_double", "counter_integer"));
391+
}
392+
393+
private static boolean canBeCastedToAggregateMetricDouble(List<String> types) {
394+
return types.contains("aggregate_metric_double")
395+
&& Set.of("double", "long", "unsigned_long", "integer", "aggregate_metric_double").containsAll(types);
396+
}
397+
254398
public static String randomStringField(List<Column> previousOutput) {
255399
return randomName(previousOutput, Set.of("text", "keyword"));
256400
}

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,15 @@ public abstract class GenerativeRestTest extends ESRestTestCase {
6262
"optimized incorrectly due to missing references", // https://github.com/elastic/elasticsearch/issues/131509
6363

6464
// Awaiting fixes for correctness
65-
"Expecting at most \\[.*\\] columns, got \\[.*\\]" // https://github.com/elastic/elasticsearch/issues/129561
65+
"Expecting at most \\[.*\\] columns, got \\[.*\\]", // https://github.com/elastic/elasticsearch/issues/129561
66+
67+
// TS-command tests
68+
"time-series .* the first aggregation .* is not allowed",
69+
"count_star .* can't be used with TS command",
70+
"time_series aggregate.* can only be used with the TS command",
71+
"Invalid call to dataType on an unresolved object \\?LASTOVERTIME", // https://github.com/elastic/elasticsearch/issues/134791
72+
"class org.elasticsearch.compute.data..*Block cannot be cast to class org.elasticsearch.compute.data..*Block", // https://github.com/elastic/elasticsearch/issues/134793
73+
"Output has changed from \\[.*\\] to \\[.*\\]" // https://github.com/elastic/elasticsearch/issues/134794
6674
);
6775

6876
public static final Set<Pattern> ALLOWED_ERROR_PATTERNS = ALLOWED_ERRORS.stream()
@@ -79,6 +87,10 @@ public void setup() throws IOException {
7987

8088
protected abstract boolean supportsSourceFieldMapping();
8189

90+
protected boolean requiresTimeSeries() {
91+
return false;
92+
}
93+
8294
@AfterClass
8395
public static void wipeTestData() throws IOException {
8496
try {
@@ -142,10 +154,14 @@ public List<EsqlQueryGenerator.Column> currentSchema() {
142154
final List<CommandGenerator.CommandDescription> previousCommands = new ArrayList<>();
143155
EsqlQueryGenerator.QueryExecuted previousResult;
144156
};
145-
EsqlQueryGenerator.generatePipeline(MAX_DEPTH, EsqlQueryGenerator.sourceCommand(), mappingInfo, exec);
157+
EsqlQueryGenerator.generatePipeline(MAX_DEPTH, sourceCommand(), mappingInfo, exec, requiresTimeSeries());
146158
}
147159
}
148160

161+
protected CommandGenerator sourceCommand() {
162+
return EsqlQueryGenerator.sourceCommand();
163+
}
164+
149165
private static CommandGenerator.ValidationResult checkResults(
150166
List<CommandGenerator.CommandDescription> previousCommands,
151167
CommandGenerator commandGenerator,
@@ -234,7 +250,7 @@ private static List<String> originalTypes(Map<String, ?> x) {
234250
}
235251

236252
private List<String> availableIndices() throws IOException {
237-
return availableDatasetsForEs(true, supportsSourceFieldMapping(), false).stream()
253+
return availableDatasetsForEs(true, supportsSourceFieldMapping(), false, requiresTimeSeries()).stream()
238254
.filter(x -> x.requiresInferenceEndpoint() == false)
239255
.map(x -> x.indexName())
240256
.toList();

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/command/pipe/ForkGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public ValidationResult validateOutput(
114114
}
115115
};
116116

117-
EsqlQueryGenerator.generatePipeline(3, gen, schema, exec);
117+
EsqlQueryGenerator.generatePipeline(3, gen, schema, exec, false);
118118
if (exec.previousCommands().size() > 1) {
119119
String previousCmd = exec.previousCommands()
120120
.stream()

0 commit comments

Comments
 (0)