Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.single_node;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.xpack.esql.qa.rest.generative.EsqlQueryGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.GenerativeRestTest;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.CommandGenerator;
import org.junit.ClassRule;

@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class GenerativeMetricsIT extends GenerativeRestTest {
@ClassRule
public static ElasticsearchCluster cluster = Clusters.testCluster();

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Override
protected boolean supportsSourceFieldMapping() {
return cluster.getNumNodes() == 1;
}

@Override
protected CommandGenerator sourceCommand() {
return EsqlQueryGenerator.timeSeriesSourceCommand();
}

@Override
protected boolean requiresTimeSeries() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.KeepGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.LimitGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.LookupJoinGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.MetricsStatsGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.MvExpandGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.RenameGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.SortGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.StatsGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.WhereGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.source.FromGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.source.MetricGenerator;

import java.util.List;
import java.util.Set;
Expand All @@ -51,6 +53,11 @@ public record QueryExecuted(String query, int depth, List<Column> outputSchema,
*/
static List<CommandGenerator> SOURCE_COMMANDS = List.of(FromGenerator.INSTANCE);

/**
* Commands at the beginning of queries that begin queries on time series indices, eg. TS
*/
static List<CommandGenerator> TIME_SERIES_SOURCE_COMMANDS = List.of(MetricGenerator.INSTANCE);

/**
* These are downstream commands, ie. that cannot appear as the first command in a query
*/
Expand All @@ -72,14 +79,42 @@ public record QueryExecuted(String query, int depth, List<Column> outputSchema,
WhereGenerator.INSTANCE
);

static List<CommandGenerator> TIME_SERIES_PIPE_COMMANDS = List.of(
ChangePointGenerator.INSTANCE,
DissectGenerator.INSTANCE,
DropGenerator.INSTANCE,
EnrichGenerator.INSTANCE,
EvalGenerator.INSTANCE,
ForkGenerator.INSTANCE,
GrokGenerator.INSTANCE,
KeepGenerator.INSTANCE,
LimitGenerator.INSTANCE,
LookupJoinGenerator.INSTANCE,
MetricsStatsGenerator.INSTANCE,
MvExpandGenerator.INSTANCE,
RenameGenerator.INSTANCE,
SortGenerator.INSTANCE,
StatsGenerator.INSTANCE,
WhereGenerator.INSTANCE
);

public static CommandGenerator sourceCommand() {
return randomFrom(SOURCE_COMMANDS);
}

public static CommandGenerator timeSeriesSourceCommand() {
return randomFrom(TIME_SERIES_SOURCE_COMMANDS);
}

public static CommandGenerator randomPipeCommandGenerator() {
return randomFrom(PIPE_COMMANDS);
}

public static CommandGenerator randomMetricsPipeCommandGenerator() {
// todo better way
return randomFrom(TIME_SERIES_PIPE_COMMANDS);
}

public interface Executor {
void run(CommandGenerator generator, CommandGenerator.CommandDescription current);

Expand All @@ -95,7 +130,8 @@ public static void generatePipeline(
final int depth,
CommandGenerator commandGenerator,
final CommandGenerator.QuerySchema schema,
Executor executor
Executor executor,
boolean isTimeSeries
) {
CommandGenerator.CommandDescription desc = commandGenerator.generate(List.of(), List.of(), schema);
executor.run(commandGenerator, desc);
Expand All @@ -107,7 +143,7 @@ public static void generatePipeline(
if (executor.currentSchema().isEmpty()) {
break;
}
commandGenerator = EsqlQueryGenerator.randomPipeCommandGenerator();
commandGenerator = isTimeSeries ? randomMetricsPipeCommandGenerator() : EsqlQueryGenerator.randomPipeCommandGenerator();
desc = commandGenerator.generate(executor.previousCommands(), executor.currentSchema(), schema);
if (desc == CommandGenerator.EMPTY_DESCRIPTION) {
continue;
Expand Down Expand Up @@ -217,6 +253,61 @@ public static boolean sortable(Column col) {
|| col.type.equals("version");
}

public static String metricsAgg(List<Column> previousOutput) {
String outerCommand = randomFrom("min", "max", "sum", "count", "avg");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an easy way to include a wider range of aggregation functions that apply on numerics? E.g. std_dev and top should be applicable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this list is probably the closest thing we'd have to that? Could exclude the commands that don't apply? (*_over_time functions, spatial, i/rate, and those 3 internal functions?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, let's give it a shot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reviewing the list a little bit more closely, it looks like some of the names (as NamedWriteableRegistry.Entry) might be a little bit different vs the ES|QL names (CountDistinct vs count_distinct) so this might not work actually....

String innerCommand = switch (randomIntBetween(0, 3)) {
case 0 -> {
// input can be numerics + aggregate_metric_double
String numericPlusAggMetricFieldName = randomMetricsNumericField(previousOutput);
if (numericPlusAggMetricFieldName == null) {
yield null;
}
yield switch ((randomIntBetween(0, 3))) {
case 0 -> "max_over_time(" + numericPlusAggMetricFieldName + ")";
case 1 -> "min_over_time(" + numericPlusAggMetricFieldName + ")";
case 2 -> "sum_over_time(" + numericPlusAggMetricFieldName + ")";
default -> "avg_over_time(" + numericPlusAggMetricFieldName + ")";
};
}
case 1 -> {
// input can be a counter
String counterField = randomCounterField(previousOutput);
if (counterField == null) {
yield null;
}
yield "rate(" + counterField + ")";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or last|first_over_time, once supported.

}
case 2 -> {
// numerics except aggregate_metric_double
// TODO: move to case 0 when support for aggregate_metric_double is added to these functions
String numericFieldName = randomNumericField(previousOutput);
if (numericFieldName == null) {
yield null;
}
yield (randomBoolean() ? "first_over_time(" : "last_over_time(") + numericFieldName + ")";
}
default -> {
// TODO: add other types that count_over_time supports
String otherFieldName = randomBoolean() ? randomStringField(previousOutput) : randomNumericOrDateField(previousOutput);
if (otherFieldName == null) {
yield null;
}
if (randomBoolean()) {
yield "count_over_time(" + otherFieldName + ")";
} else {
yield "count_distinct_over_time(" + otherFieldName + ")";
// TODO: replace with the below
// yield "count_distinct_over_time(" + otherFieldName + (randomBoolean() ? ", " + randomNonNegativeInt() : "") + ")";
}
}
};
if (innerCommand == null) {
// TODO: figure out a default that maybe makes more sense than using a timestamp field
innerCommand = "count_over_time(" + randomDateField(previousOutput) + ")";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe * works, though not great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count(*) is the default in the normal Stats generator but it breaks when combined with time_series aggregations (bug 1), so until we fix that I don't think we can/should set that as the default here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we'd need to think of the semantics.. Let's file an issue for the bug btw, good catch :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I will double-check the state of these bugs first before opening an issue; running a few queries I'm getting some completely new bugs so it's possible that some of them might be fixed.
Somewhat related, I think I saw that @not-napoleon was working on something similar to bug 3 (Output has changed)?

}
return outerCommand + "(" + innerCommand + ")";
}

public static String agg(List<Column> previousOutput) {
String name = randomNumericOrDateField(previousOutput);
if (name != null && randomBoolean()) {
Expand Down Expand Up @@ -251,6 +342,30 @@ public static String randomNumericField(List<Column> previousOutput) {
return randomName(previousOutput, Set.of("long", "integer", "double"));
}

public static String randomMetricsNumericField(List<Column> previousOutput) {
Set<String> allowedTypes = Set.of("double", "long", "unsigned_long", "integer", "aggregate_metric_double");
List<String> items = previousOutput.stream()
.filter(
x -> allowedTypes.contains(x.type())
|| (x.type().equals("unsupported") && canBeCastedToAggregateMetricDouble(x.originalTypes()))
)
.map(Column::name)
.toList();
if (items.isEmpty()) {
return null;
}
return items.get(randomIntBetween(0, items.size() - 1));
}

public static String randomCounterField(List<Column> previousOutput) {
return randomName(previousOutput, Set.of("counter_long", "counter_double", "counter_integer"));
}

private static boolean canBeCastedToAggregateMetricDouble(List<String> types) {
return types.contains("aggregate_metric_double")
&& Set.of("double", "long", "unsigned_long", "integer", "aggregate_metric_double").containsAll(types);
}

public static String randomStringField(List<Column> previousOutput) {
return randomName(previousOutput, Set.of("text", "keyword"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public void setup() throws IOException {

protected abstract boolean supportsSourceFieldMapping();

protected boolean requiresTimeSeries() {
return false;
}

@AfterClass
public static void wipeTestData() throws IOException {
try {
Expand Down Expand Up @@ -142,10 +146,14 @@ public List<EsqlQueryGenerator.Column> currentSchema() {
final List<CommandGenerator.CommandDescription> previousCommands = new ArrayList<>();
EsqlQueryGenerator.QueryExecuted previousResult;
};
EsqlQueryGenerator.generatePipeline(MAX_DEPTH, EsqlQueryGenerator.sourceCommand(), mappingInfo, exec);
EsqlQueryGenerator.generatePipeline(MAX_DEPTH, sourceCommand(), mappingInfo, exec, requiresTimeSeries());
}
}

protected CommandGenerator sourceCommand() {
return EsqlQueryGenerator.sourceCommand();
}

private static CommandGenerator.ValidationResult checkResults(
List<CommandGenerator.CommandDescription> previousCommands,
CommandGenerator commandGenerator,
Expand Down Expand Up @@ -234,7 +242,7 @@ private static List<String> originalTypes(Map<String, ?> x) {
}

private List<String> availableIndices() throws IOException {
return availableDatasetsForEs(true, supportsSourceFieldMapping(), false).stream()
return availableDatasetsForEs(true, supportsSourceFieldMapping(), false, requiresTimeSeries()).stream()
.filter(x -> x.requiresInferenceEndpoint() == false)
.map(x -> x.indexName())
.toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public ValidationResult validateOutput(
}
};

EsqlQueryGenerator.generatePipeline(3, gen, schema, exec);
EsqlQueryGenerator.generatePipeline(3, gen, schema, exec, false);
if (exec.previousCommands().size() > 1) {
String previousCmd = exec.previousCommands()
.stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe;

import org.elasticsearch.xpack.esql.qa.rest.generative.EsqlQueryGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.CommandGenerator;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.elasticsearch.xpack.esql.qa.rest.generative.EsqlQueryGenerator.randomDateField;

public class MetricsStatsGenerator implements CommandGenerator {

public static final String STATS = "stats";
public static final CommandGenerator INSTANCE = new MetricsStatsGenerator();

@Override
public CommandDescription generate(
List<CommandDescription> previousCommands,
List<EsqlQueryGenerator.Column> previousOutput,
QuerySchema schema
) {
// generates stats in the form of:
// `STATS some_aggregation(some_field) by optional_grouping_field, non_optional = bucket(time_field, 5minute)`
// where `some_aggregation` can be a time series aggregation, or a regular aggregation
// There is a variable number of aggregations per command

List<EsqlQueryGenerator.Column> nonNull = previousOutput.stream()
.filter(EsqlQueryGenerator::fieldCanBeUsed)
.filter(x -> x.type().equals("null") == false)
.collect(Collectors.toList());
if (nonNull.isEmpty()) {
return EMPTY_DESCRIPTION;
}
String timestamp = randomDateField(nonNull);
// if there's no timestamp field left, there's nothing to bucket on
if (timestamp == null) {
return EMPTY_DESCRIPTION;
}

StringBuilder cmd = new StringBuilder(" | stats ");

// TODO: increase range max to 5
int nStats = randomIntBetween(1, 2);
for (int i = 0; i < nStats; i++) {
String name;
if (randomBoolean()) {
name = EsqlQueryGenerator.randomIdentifier();
} else {
name = EsqlQueryGenerator.randomName(previousOutput);
if (name == null) {
name = EsqlQueryGenerator.randomIdentifier();
}
}
// generate the aggregation
String expression = randomBoolean() ? EsqlQueryGenerator.metricsAgg(nonNull) : EsqlQueryGenerator.agg(nonNull);
if (i > 0) {
cmd.append(",");
}
cmd.append(" ");
cmd.append(name);
cmd.append(" = ");
cmd.append(expression);
}

cmd.append(" by ");
if (randomBoolean()) {
var col = EsqlQueryGenerator.randomGroupableName(nonNull);
if (col != null) {
cmd.append(col + ", ");
}
}
// TODO: add alternative time buckets
cmd.append(
(randomBoolean() ? EsqlQueryGenerator.randomIdentifier() : EsqlQueryGenerator.randomName(previousOutput))
+ " = bucket("
+ timestamp
+ ",1hour)"
);
return new CommandDescription(STATS, this, cmd.toString(), Map.of());
}

@Override
public ValidationResult validateOutput(
List<CommandDescription> previousCommands,
CommandDescription commandDescription,
List<EsqlQueryGenerator.Column> previousColumns,
List<List<Object>> previousOutput,
List<EsqlQueryGenerator.Column> columns,
List<List<Object>> output
) {
// TODO validate columns
return VALIDATION_OK;
}
}
Loading