Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.single_node;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.xpack.esql.qa.rest.generative.EsqlQueryGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.GenerativeRestTest;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.CommandGenerator;
import org.junit.ClassRule;

@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class GenerativeMetricsIT extends GenerativeRestTest {
@ClassRule
public static ElasticsearchCluster cluster = Clusters.testCluster();

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Override
protected boolean supportsSourceFieldMapping() {
return cluster.getNumNodes() == 1;
}

@Override
protected CommandGenerator sourceCommand() {
return EsqlQueryGenerator.timeSeriesSourceCommand();
}

@Override
protected boolean requiresTimeSeries() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.RenameGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.SortGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.StatsGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.TimeSeriesStatsGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.WhereGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.source.FromGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.source.TimeSeriesGenerator;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
Expand All @@ -51,6 +54,11 @@ public record QueryExecuted(String query, int depth, List<Column> outputSchema,
*/
static List<CommandGenerator> SOURCE_COMMANDS = List.of(FromGenerator.INSTANCE);

/**
* Commands at the beginning of queries that begin queries on time series indices, eg. TS
*/
static List<CommandGenerator> TIME_SERIES_SOURCE_COMMANDS = List.of(TimeSeriesGenerator.INSTANCE);

/**
* These are downstream commands, ie. that cannot appear as the first command in a query
*/
Expand All @@ -72,14 +80,27 @@ public record QueryExecuted(String query, int depth, List<Column> outputSchema,
WhereGenerator.INSTANCE
);

static List<CommandGenerator> TIME_SERIES_PIPE_COMMANDS = Stream.concat(
PIPE_COMMANDS.stream(),
Stream.of(TimeSeriesStatsGenerator.INSTANCE)
).toList();

public static CommandGenerator sourceCommand() {
return randomFrom(SOURCE_COMMANDS);
}

public static CommandGenerator timeSeriesSourceCommand() {
return randomFrom(TIME_SERIES_SOURCE_COMMANDS);
}

public static CommandGenerator randomPipeCommandGenerator() {
return randomFrom(PIPE_COMMANDS);
}

public static CommandGenerator randomMetricsPipeCommandGenerator() {
return randomFrom(TIME_SERIES_PIPE_COMMANDS);
}

public interface Executor {
void run(CommandGenerator generator, CommandGenerator.CommandDescription current);

Expand All @@ -95,8 +116,10 @@ public static void generatePipeline(
final int depth,
CommandGenerator commandGenerator,
final CommandGenerator.QuerySchema schema,
Executor executor
Executor executor,
boolean isTimeSeries
) {
boolean canGenerateTimeSeries = isTimeSeries;
CommandGenerator.CommandDescription desc = commandGenerator.generate(List.of(), List.of(), schema);
executor.run(commandGenerator, desc);
if (executor.continueExecuting() == false) {
Expand All @@ -107,7 +130,28 @@ public static void generatePipeline(
if (executor.currentSchema().isEmpty()) {
break;
}
commandGenerator = EsqlQueryGenerator.randomPipeCommandGenerator();
boolean commandAllowed = false;
while (commandAllowed == false) {
commandGenerator = isTimeSeries && canGenerateTimeSeries
? randomMetricsPipeCommandGenerator()
: randomPipeCommandGenerator();
if (isTimeSeries == false) {
commandAllowed = true;
} else {
if (commandGenerator.equals(TimeSeriesStatsGenerator.INSTANCE) || commandGenerator.equals(StatsGenerator.INSTANCE)) {
if (canGenerateTimeSeries) {
canGenerateTimeSeries = false;
commandAllowed = true;
}
} else if (commandGenerator.equals(RenameGenerator.INSTANCE)) {
// https://github.com/elastic/elasticsearch/issues/134994
canGenerateTimeSeries = false;
commandAllowed = true;
} else {
commandAllowed = true;
}
}
}
desc = commandGenerator.generate(executor.previousCommands(), executor.currentSchema(), schema);
if (desc == CommandGenerator.EMPTY_DESCRIPTION) {
continue;
Expand Down Expand Up @@ -217,6 +261,82 @@ public static boolean sortable(Column col) {
|| col.type.equals("version");
}

public static String metricsAgg(List<Column> previousOutput) {
String outerCommand = randomFrom("min", "max", "sum", "count", "avg");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an easy way to include a wider range of aggregation functions that apply on numerics? E.g. std_dev and top should be applicable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this list is probably the closest thing we'd have to that? Could exclude the commands that don't apply? (*_over_time functions, spatial, i/rate, and those 3 internal functions?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, let's give it a shot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reviewing the list a little bit more closely, it looks like some of the names (as NamedWriteableRegistry.Entry) might be a little bit different vs the ES|QL names (CountDistinct vs count_distinct) so this might not work actually....

String innerCommand = switch (randomIntBetween(0, 3)) {
case 0 -> {
// input can be numerics + aggregate_metric_double
String numericPlusAggMetricFieldName = randomMetricsNumericField(previousOutput);
if (numericPlusAggMetricFieldName == null) {
yield null;
}
yield switch ((randomIntBetween(0, 6))) {
case 0 -> "max_over_time(" + numericPlusAggMetricFieldName + ")";
case 1 -> "min_over_time(" + numericPlusAggMetricFieldName + ")";
case 2 -> "sum_over_time(" + numericPlusAggMetricFieldName + ")";
case 3 -> {
if (outerCommand.equals("sum") || outerCommand.equals("avg")) {
yield null;
}
yield "present_over_time(" + numericPlusAggMetricFieldName + ")";
}
case 4 -> {
if (outerCommand.equals("sum") || outerCommand.equals("avg")) {
yield null;
}
yield "absent_over_time(" + numericPlusAggMetricFieldName + ")";
}
case 5 -> "count_over_time(" + numericPlusAggMetricFieldName + ")";
default -> "avg_over_time(" + numericPlusAggMetricFieldName + ")";
};
}
case 1 -> {
// input can be a counter
String counterField = randomCounterField(previousOutput);
if (counterField == null) {
yield null;
}
yield "rate(" + counterField + ")";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or last|first_over_time, once supported.

}
case 2 -> {
// numerics except aggregate_metric_double
// TODO: add to case 0 when support for aggregate_metric_double is added to these functions
// TODO: add to case 1 when support for counters is added
String numericFieldName = randomNumericField(previousOutput);
if (numericFieldName == null) {
yield null;
}
if (previousOutput.stream()
.noneMatch(
column -> column.name.equals("@timestamp") && (column.type.equals("date_nanos") || column.type.equals("datetime"))
)) {
// first_over_time and last_over_time require @timestamp to be available and be either datetime or date_nanos
yield null;
}
yield (randomBoolean() ? "first_over_time(" : "last_over_time(") + numericFieldName + ")";
}
default -> {
// TODO: add other types that count_over_time supports
String otherFieldName = randomBoolean() ? randomStringField(previousOutput) : randomNumericOrDateField(previousOutput);
if (otherFieldName == null) {
yield null;
}
if (randomBoolean()) {
yield "count_over_time(" + otherFieldName + ")";
} else {
yield "count_distinct_over_time(" + otherFieldName + ")";
// TODO: replace with the below
// yield "count_distinct_over_time(" + otherFieldName + (randomBoolean() ? ", " + randomNonNegativeInt() : "") + ")";
}
}
};
if (innerCommand == null) {
// TODO: figure out a default that maybe makes more sense than using a timestamp field
innerCommand = "count_over_time(" + randomDateField(previousOutput) + ")";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe * works, though not great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count(*) is the default in the normal Stats generator but it breaks when combined with time_series aggregations (bug 1), so until we fix that I don't think we can/should set that as the default here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we'd need to think of the semantics.. Let's file an issue for the bug btw, good catch :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I will double-check the state of these bugs first before opening an issue; running a few queries I'm getting some completely new bugs so it's possible that some of them might be fixed.
Somewhat related, I think I saw that @not-napoleon was working on something similar to bug 3 (Output has changed)?

}
return outerCommand + "(" + innerCommand + ")";
}

public static String agg(List<Column> previousOutput) {
String name = randomNumericOrDateField(previousOutput);
if (name != null && randomBoolean()) {
Expand Down Expand Up @@ -251,6 +371,30 @@ public static String randomNumericField(List<Column> previousOutput) {
return randomName(previousOutput, Set.of("long", "integer", "double"));
}

public static String randomMetricsNumericField(List<Column> previousOutput) {
Set<String> allowedTypes = Set.of("double", "long", "unsigned_long", "integer", "aggregate_metric_double");
List<String> items = previousOutput.stream()
.filter(
x -> allowedTypes.contains(x.type())
|| (x.type().equals("unsupported") && canBeCastedToAggregateMetricDouble(x.originalTypes()))
)
.map(Column::name)
.toList();
if (items.isEmpty()) {
return null;
}
return items.get(randomIntBetween(0, items.size() - 1));
}

public static String randomCounterField(List<Column> previousOutput) {
return randomName(previousOutput, Set.of("counter_long", "counter_double", "counter_integer"));
}

private static boolean canBeCastedToAggregateMetricDouble(List<String> types) {
return types.contains("aggregate_metric_double")
&& Set.of("double", "long", "unsigned_long", "integer", "aggregate_metric_double").containsAll(types);
}

public static String randomStringField(List<Column> previousOutput) {
return randomName(previousOutput, Set.of("text", "keyword"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,15 @@ public abstract class GenerativeRestTest extends ESRestTestCase {
"optimized incorrectly due to missing references", // https://github.com/elastic/elasticsearch/issues/131509

// Awaiting fixes for correctness
"Expecting at most \\[.*\\] columns, got \\[.*\\]" // https://github.com/elastic/elasticsearch/issues/129561
"Expecting at most \\[.*\\] columns, got \\[.*\\]", // https://github.com/elastic/elasticsearch/issues/129561

// TS-command tests
"time-series .* the first aggregation .* is not allowed",
"count_star .* can't be used with TS command",
"time_series aggregate.* can only be used with the TS command",
"Invalid call to dataType on an unresolved object \\?LASTOVERTIME", // https://github.com/elastic/elasticsearch/issues/134791
"class org.elasticsearch.compute.data..*Block cannot be cast to class org.elasticsearch.compute.data..*Block", // https://github.com/elastic/elasticsearch/issues/134793
"Output has changed from \\[.*\\] to \\[.*\\]" // https://github.com/elastic/elasticsearch/issues/134794
);

public static final Set<Pattern> ALLOWED_ERROR_PATTERNS = ALLOWED_ERRORS.stream()
Expand All @@ -79,6 +87,10 @@ public void setup() throws IOException {

protected abstract boolean supportsSourceFieldMapping();

protected boolean requiresTimeSeries() {
return false;
}

@AfterClass
public static void wipeTestData() throws IOException {
try {
Expand Down Expand Up @@ -142,10 +154,14 @@ public List<EsqlQueryGenerator.Column> currentSchema() {
final List<CommandGenerator.CommandDescription> previousCommands = new ArrayList<>();
EsqlQueryGenerator.QueryExecuted previousResult;
};
EsqlQueryGenerator.generatePipeline(MAX_DEPTH, EsqlQueryGenerator.sourceCommand(), mappingInfo, exec);
EsqlQueryGenerator.generatePipeline(MAX_DEPTH, sourceCommand(), mappingInfo, exec, requiresTimeSeries());
}
}

protected CommandGenerator sourceCommand() {
return EsqlQueryGenerator.sourceCommand();
}

private static CommandGenerator.ValidationResult checkResults(
List<CommandGenerator.CommandDescription> previousCommands,
CommandGenerator commandGenerator,
Expand Down Expand Up @@ -234,7 +250,7 @@ private static List<String> originalTypes(Map<String, ?> x) {
}

private List<String> availableIndices() throws IOException {
return availableDatasetsForEs(true, supportsSourceFieldMapping(), false).stream()
return availableDatasetsForEs(true, supportsSourceFieldMapping(), false, requiresTimeSeries()).stream()
.filter(x -> x.requiresInferenceEndpoint() == false)
.map(x -> x.indexName())
.toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public ValidationResult validateOutput(
}
};

EsqlQueryGenerator.generatePipeline(3, gen, schema, exec);
EsqlQueryGenerator.generatePipeline(3, gen, schema, exec, false);
if (exec.previousCommands().size() > 1) {
String previousCmd = exec.previousCommands()
.stream()
Expand Down
Loading