Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions x-pack/plugin/esql/qa/server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ dependencies {
// Requirement for some ESQL-specific utilities
implementation project(':x-pack:plugin:esql')
api project(xpackModule('esql:qa:testFixtures'))
implementation("org.testcontainers:testcontainers:${versions.testcontainer}")
runtimeOnly "com.github.docker-java:docker-java-transport-zerodep:${versions.dockerJava}"
runtimeOnly "com.github.docker-java:docker-java-transport:${versions.dockerJava}"
runtimeOnly "com.github.docker-java:docker-java-core:${versions.dockerJava}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.multi_node;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.xpack.esql.qa.rest.generative.GenerativeTimeseriesRestTest;
import org.junit.ClassRule;

@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class BasicGenerativeTimeseriesRestIT extends GenerativeTimeseriesRestTest {
@ClassRule
public static ElasticsearchCluster cluster = Clusters.testCluster(spec -> spec.plugin("inference-service-test"));

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.single_node;

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.datageneration.DocumentGenerator;
import org.elasticsearch.datageneration.DataGeneratorSpecification;
import org.elasticsearch.datageneration.FieldType;
import org.elasticsearch.datageneration.MappingGenerator;
import org.elasticsearch.datageneration.TemplateGenerator;
import org.elasticsearch.datageneration.fields.PredefinedField;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.qa.rest.generative.EsqlQueryGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.CommandGenerator;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class TimeseriesRandomizedTestCaseIT extends GenerativeIT {

private static final int MAX_DEPTH = 15;

@Test
public void testRandomTimeseriesQueries() throws Exception {
String indexName = "timeseries_test";
generateAndIndexData(indexName, 200);

// Ten random queries
for (int i = 0; i < 10; i++) {
runRandomQuery(indexName);
}
}

private void runRandomQuery(String indexName) {
CommandGenerator.QuerySchema mappingInfo = new CommandGenerator.QuerySchema(List.of(indexName), List.of(), List.of());
List<CommandGenerator.CommandDescription> previousCommands = new ArrayList<>();

// Base query to select recent data
String baseQuery = "FROM " + indexName + " | WHERE @timestamp > NOW() - INTERVAL 1 DAY";
EsqlQueryResponse response = run(baseQuery);
List<EsqlQueryGenerator.Column> currentSchema = toGeneratorSchema(response.columns());

String currentQuery = baseQuery;

for (int j = 0; j < MAX_DEPTH; j++) {
if (currentSchema.isEmpty()) {
break;
}

CommandGenerator commandGenerator = EsqlQueryGenerator.randomPipeCommandGenerator();
CommandGenerator.CommandDescription desc = commandGenerator.generate(previousCommands, currentSchema, mappingInfo);

if (desc == CommandGenerator.EMPTY_DESCRIPTION) {
continue;
}

String nextCommand = desc.commandString();
currentQuery += nextCommand;

try {
response = run(currentQuery);
currentSchema = toGeneratorSchema(response.columns());
previousCommands.add(desc);
} catch (Exception e) {
// For now, we fail on any exception. More sophisticated error handling could be added here.
throw new AssertionError("Query failed: " + currentQuery, e);
}
}
}

private DataGeneratorSpecification createDataSpec() {
return DataGeneratorSpecification.builder()
.withMaxFieldCountPerLevel(10)
.withMaxObjectDepth(2)
.withPredefinedFields(List.of(new PredefinedField.WithGenerator(
"timestamp",
FieldType.DATE,
Map.of("type", "date"),
(ignored) -> ESTestCase.randomPositiveTimeValue()
)))
.build();
}

private void generateAndIndexData(String indexName, int numDocs) throws Exception {
var spec = createDataSpec();
DocumentGenerator dataGenerator = new DocumentGenerator(spec);
var templateGen = new TemplateGenerator(spec).generate();
var mappingGen = new MappingGenerator(spec).generate(templateGen);
// Create a new index with a randomized mapping
createIndex(indexName, Settings.EMPTY);

BulkRequest bulkRequest = new BulkRequest(indexName);
for (int i = 0; i < numDocs; i++) {
Map<String, Object> document = dataGenerator.generate(templateGen, mappingGen);
bulkRequest.add(new IndexRequest().source(document, XContentType.JSON));
}
client().bulk(bulkRequest).actionGet();
refresh(indexName);
}

private List<EsqlQueryGenerator.Column> toGeneratorSchema(List<ColumnInfoImpl> columnInfos) {
if (columnInfos == null) {
return List.of();
}
return columnInfos.stream()
.map(ci -> new EsqlQueryGenerator.Column(ci.name(), ci.outputType()))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.rest.generative;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.esql.AssertWarnings;
import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.CommandGenerator;
import org.junit.AfterClass;
import org.junit.Before;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public abstract class GenerativeTimeseriesRestTest extends ESRestTestCase {

public static final int ITERATIONS = 200; // More test cases
public static final int MAX_DEPTH = 8; // Fewer statements

private static final String PROMETHEUS_IMAGE = "prom/prometheus:latest";
private static GenericContainer<?> prometheusContainer;

public static final Set<String> ALLOWED_ERRORS = Set.of(
// "Reference \\[.*\\] is ambiguous",
// "Cannot use field \\[.*\\] due to ambiguities",
// "cannot sort on .*",
// "argument of \\[count.*\\] must",
// "Cannot use field \\[.*\\] with unsupported type \\[.*_range\\]",
// "Unbounded sort not supported yet",
// "The field names are too complex to process",
// "must be \\[any type except counter types\\]",
// "Unknown column \\[<all-fields-projected>\\]",
// "Plan \\[ProjectExec\\[\\[<no-fields>.* optimized incorrectly due to missing references",
// "optimized incorrectly due to missing references",
// "The incoming YAML document exceeds the limit:",
// "Data too large",
// "Expecting the following columns \\[.*\\], got",
// "Expecting at most \\[.*\\] columns, got \\[.*\\]"
);

public static final Set<Pattern> ALLOWED_ERROR_PATTERNS = ALLOWED_ERRORS.stream()
.map(x -> ".*" + x + ".*")
.map(x -> Pattern.compile(x, Pattern.DOTALL))
.collect(Collectors.toSet());

@Before
public void setup() throws IOException {
if (prometheusContainer == null) {
// prometheusContainer = new GenericContainer<>(DockerImageName.parse(PROMETHEUS_IMAGE)).withExposedPorts(9090);
// prometheusContainer.start();
}

if (indexExists("timeseries") == false) {
TimeseriesDataGenerationHelper dataGen = new TimeseriesDataGenerationHelper();
Request createIndex = new Request("PUT", "/timeseries");
XContentBuilder builder = XContentFactory.jsonBuilder().map(dataGen.getMapping().raw());
createIndex.setJsonEntity("{\"mappings\": " + Strings.toString(builder) + "}");
client().performRequest(createIndex);

for (int i = 0; i < 1000; i++) {
Request indexRequest = new Request("POST", "/timeseries/_doc");
indexRequest.setJsonEntity(dataGen.generateDocumentAsString());
client().performRequest(indexRequest);
}
client().performRequest(new Request("POST", "/timeseries/_refresh"));
}
}

@AfterClass
public static void wipeTestData() throws IOException {
if (prometheusContainer != null) {
prometheusContainer.stop();
prometheusContainer = null;
}
try {
adminClient().performRequest(new Request("DELETE", "/*"));
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
throw e;
}
}
}

public void test() throws IOException {
List<String> indices = List.of("timeseries");
CommandGenerator.QuerySchema mappingInfo = new CommandGenerator.QuerySchema(indices, List.of(), List.of());
EsqlQueryGenerator.QueryExecuted previousResult = null;
for (int i = 0; i < ITERATIONS; i++) {
List<CommandGenerator.CommandDescription> previousCommands = new ArrayList<>();
CommandGenerator commandGenerator = EsqlQueryGenerator.sourceCommand();
CommandGenerator.CommandDescription desc = commandGenerator.generate(List.of(), List.of(), mappingInfo);
String command = desc.commandString();
EsqlQueryGenerator.QueryExecuted result = execute(command, 0);
if (result.exception() != null) {
checkException(result);
continue;
}
if (checkResults(List.of(), commandGenerator, desc, null, result).success() == false) {
continue;
}
previousResult = result;
previousCommands.add(desc);
for (int j = 0; j < MAX_DEPTH; j++) {
if (result.outputSchema().isEmpty()) {
break;
}
commandGenerator = EsqlQueryGenerator.randomPipeCommandGenerator();
desc = commandGenerator.generate(previousCommands, result.outputSchema(), mappingInfo);
if (desc == CommandGenerator.EMPTY_DESCRIPTION) {
continue;
}
command = desc.commandString();
result = execute(result.query() + command, result.depth() + 1);
if (result.exception() != null) {
checkException(result);
break;
}
if (checkResults(previousCommands, commandGenerator, desc, previousResult, result).success() == false) {
break;
}
previousCommands.add(desc);
previousResult = result;
}
}
}

private static CommandGenerator.ValidationResult checkResults(
List<CommandGenerator.CommandDescription> previousCommands,
CommandGenerator commandGenerator,
CommandGenerator.CommandDescription commandDescription,
EsqlQueryGenerator.QueryExecuted previousResult,
EsqlQueryGenerator.QueryExecuted result
) {
CommandGenerator.ValidationResult outputValidation = commandGenerator.validateOutput(
previousCommands,
commandDescription,
previousResult == null ? null : previousResult.outputSchema(),
previousResult == null ? null : previousResult.result(),
result.outputSchema(),
result.result()
);
if (outputValidation.success() == false) {
for (Pattern allowedError : ALLOWED_ERROR_PATTERNS) {
if (allowedError.matcher(outputValidation.errorMessage()).matches()) {
return outputValidation;
}
}
fail("query: " + result.query() + "\nerror: " + outputValidation.errorMessage());
}
return outputValidation;
}

private void checkException(EsqlQueryGenerator.QueryExecuted query) {
for (Pattern allowedError : ALLOWED_ERROR_PATTERNS) {
if (allowedError.matcher(query.exception().getMessage()).matches()) {
return;
}
}
fail("query: " + query.query() + "\nexception: " + query.exception().getMessage());
}

@SuppressWarnings("unchecked")
private EsqlQueryGenerator.QueryExecuted execute(String command, int depth) {
try {
Map<String, Object> a = RestEsqlTestCase.runEsql(
new RestEsqlTestCase.RequestObjectBuilder().query(command).build(),
new AssertWarnings.AllowedRegexes(List.of(Pattern.compile(".*"))),
RestEsqlTestCase.Mode.SYNC
);
List<EsqlQueryGenerator.Column> outputSchema = outputSchema(a);
List<List<Object>> values = (List<List<Object>>) a.get("values");
return new EsqlQueryGenerator.QueryExecuted(command, depth, outputSchema, values, null);
} catch (Exception e) {
return new EsqlQueryGenerator.QueryExecuted(command, depth, null, null, e);
} catch (AssertionError ae) {
return new EsqlQueryGenerator.QueryExecuted(command, depth, null, null, new RuntimeException(ae.getMessage()));
}
}

@SuppressWarnings("unchecked")
private List<EsqlQueryGenerator.Column> outputSchema(Map<String, Object> a) {
List<Map<String, String>> cols = (List<Map<String, String>>) a.get("columns");
if (cols == null) {
return null;
}
return cols.stream().map(x -> new EsqlQueryGenerator.Column(x.get("name"), x.get("type"))).collect(Collectors.toList());
}
}
Loading