Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -43,6 +42,7 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -166,33 +166,10 @@ protected void setRequestCircuitBreakerLimit(ByteSizeValue limit) {
}

protected final EsqlQueryResponse run(String esqlCommands) {
return run(esqlCommands, randomPragmas());
return run(syncEsqlQueryRequest().query(esqlCommands).pragmas(randomPragmas()));
}

protected final EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas) {
return run(esqlCommands, pragmas, null);
}

protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter) {
return run(esqlCommands, pragmas, filter, null);
}

protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter, Boolean allowPartialResults) {
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query(esqlCommands);
if (pragmas != null) {
request.pragmas(pragmas);
}
if (filter != null) {
request.filter(filter);
}
if (allowPartialResults != null) {
request.allowPartialResults(allowPartialResults);
}
return run(request);
}

protected EsqlQueryResponse run(EsqlQueryRequest request) {
public EsqlQueryResponse run(EsqlQueryRequest request) {
try {
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
} catch (ElasticsearchTimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
}

@Override
protected EsqlQueryResponse run(EsqlQueryRequest request) {
public EsqlQueryResponse run(EsqlQueryRequest request) {
final Client client;
if (randomBoolean()) {
client = client(randomFrom(clusterService().state().nodes().getCoordinatingOnlyNodes().values()).getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu
}

@Override
protected EsqlQueryResponse run(EsqlQueryRequest request) {
public EsqlQueryResponse run(EsqlQueryRequest request) {
if (randomBoolean()) {
request.allowPartialResults(randomBoolean());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -1916,7 +1917,7 @@ public void testScriptField() throws Exception {
pragmaSettings.put("data_partitioning", "doc");
pragmas = new QueryPragmas(pragmaSettings.build());
}
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT " + numDocs, pragmas)) {
try (EsqlQueryResponse resp = run(syncEsqlQueryRequest().query("FROM test-script | SORT k1 | LIMIT " + numDocs).pragmas(pragmas))) {
List<Object> k1Column = Iterators.toList(resp.column(0));
assertThat(k1Column, equalTo(LongStream.range(0L, numDocs).boxed().toList()));
List<Object> k2Column = Iterators.toList(resp.column(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.test.TestBlockFactory;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -52,18 +50,18 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

@Override
protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter) {
public EsqlQueryResponse run(EsqlQueryRequest original) {
EsqlQueryRequest request = EsqlQueryRequest.asyncEsqlQueryRequest();
request.query(esqlCommands);
request.pragmas(pragmas);
request.query(original.query());
request.pragmas(original.pragmas());
// deliberately small timeout, to frequently trigger incomplete response
request.waitForCompletionTimeout(TimeValue.timeValueNanos(1));
request.keepOnCompletion(randomBoolean());
if (filter != null) {
request.filter(filter);
if (original.filter() != null) {
request.filter(original.filter());
}

var response = run(request);
var response = super.run(request);
if (response.asyncExecutionId().isPresent()) {
List<ColumnInfo> initialColumns = null;
List<Page> initialPages = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

@Override
protected EsqlQueryResponse run(EsqlQueryRequest request) {
public EsqlQueryResponse run(EsqlQueryRequest request) {
// IndexResolver currently ignores failures from field-caps responses and can resolve to a smaller set of concrete indices.
boolean singleIndex = request.query().startsWith("from test |");
if (singleIndex && randomIntBetween(0, 100) <= 20) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -116,7 +117,11 @@ public void testConcurrentQueries() throws Exception {
.put("task_concurrency", between(1, 2))
.put("exchange_concurrent_clients", between(1, 2));
}
try (var response = run("from test-* | stats count(user) by tags", new QueryPragmas(pragmas.build()))) {
try (
var response = run(
syncEsqlQueryRequest().query("from test-* | stats count(user) by tags").pragmas(new QueryPragmas(pragmas.build()))
)
) {
// do nothing
} catch (Exception | AssertionError e) {
logger.warn("Query failed with exception", e);
Expand Down Expand Up @@ -250,7 +255,7 @@ public void testLimitConcurrentShards() {
mockSearchService.setOnPutContext(r -> counter.onNewContext());
mockSearchService.setOnRemoveContext(r -> counter.onContextReleased());
}
run(q, pragmas).close();
run(syncEsqlQueryRequest().query(q).pragmas(pragmas)).close();
}
} finally {
for (SearchService searchService : searchServices) {
Expand All @@ -277,7 +282,7 @@ public void testCancelUnnecessaryRequests() {
connection.sendRequest(requestId, action, request, options);
});

var query = EsqlQueryRequest.syncEsqlQueryRequest();
var query = syncEsqlQueryRequest();
query.query("from test-* | LIMIT 1");
query.pragmas(new QueryPragmas(Settings.builder().put(QueryPragmas.MAX_CONCURRENT_NODES_PER_CLUSTER.getKey(), 1).build()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest;
import static org.hamcrest.Matchers.hasSize;

public class TimeBasedIndicesIT extends AbstractEsqlIntegTestCase {
Expand All @@ -39,15 +40,15 @@ public void testFilter() {
{
String query = "FROM test | limit 1000";
var filter = new RangeQueryBuilder("@timestamp").from(epoch - TimeValue.timeValueHours(3).millis()).to("now");
try (var resp = run(query, null, filter)) {
try (var resp = run(syncEsqlQueryRequest().query(query).filter(filter))) {
List<List<Object>> values = getValuesList(resp);
assertThat(values, hasSize(oldDocs));
}
}
{
String query = "FROM test | limit 1000";
var filter = new RangeQueryBuilder("@timestamp").from("now").to(epoch + TimeValue.timeValueHours(3).millis());
try (var resp = run(query, null, filter)) {
try (var resp = run(syncEsqlQueryRequest().query(query).filter(filter))) {
List<List<Object>> values = getValuesList(resp);
assertThat(values, hasSize(newDocs));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
public class TimeSeriesIT extends AbstractEsqlIntegTestCase {

@Override
protected EsqlQueryResponse run(EsqlQueryRequest request) {
public EsqlQueryResponse run(EsqlQueryRequest request) {
assumeTrue("time series available in snapshot builds only", Build.current().isSnapshot());
return super.run(request);
}
Expand Down
Loading