Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.Build;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
Expand Down Expand Up @@ -133,6 +134,9 @@ public void testStopQueryLocal() throws Exception {
int remote2NumShards = (Integer) testClusterInfo.get("remote2.num_shards");
populateRuntimeIndex(LOCAL_CLUSTER, "pause", INDEX_WITH_BLOCKING_MAPPING);

// Gets random node client but ensure it's the same node for all operations
Client client = cluster(LOCAL_CLUSTER).client();

Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
boolean responseExpectMeta = includeCCSMetadata.v2();
// By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes.
Expand All @@ -145,7 +149,7 @@ public void testStopQueryLocal() throws Exception {
int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax();
LOGGER.info("--> Launching async query");
final String asyncExecutionId = startAsyncQueryWithPragmas(
client(),
client,
"FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1",
includeCCSMetadata.v1(),
Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1))
Expand All @@ -155,8 +159,8 @@ public void testStopQueryLocal() throws Exception {
assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));

// wait until the remotes are done
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId);
waitForCluster(client, REMOTE_CLUSTER_1, asyncExecutionId);
waitForCluster(client, REMOTE_CLUSTER_2, asyncExecutionId);

/* at this point:
* the query against remotes should be finished
Expand All @@ -165,10 +169,10 @@ public void testStopQueryLocal() throws Exception {
// run the stop query
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
LOGGER.info("Launching stop for {}", asyncExecutionId);
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
ActionFuture<EsqlQueryResponse> stopAction = client.execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// ensure stop operation is running
assertBusy(() -> {
try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) {
try (EsqlQueryResponse asyncResponse = getAsyncResponse(client, asyncExecutionId)) {
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
LOGGER.info("Waiting for stop operation to start, current status: {}", executionInfo);
assertNotNull(executionInfo);
Expand Down Expand Up @@ -212,7 +216,7 @@ public void testStopQueryLocal() throws Exception {
}
} finally {
SimplePauseFieldPlugin.allowEmitting.countDown();
assertAcked(deleteAsyncId(client(), asyncExecutionId));
assertAcked(deleteAsyncId(client, asyncExecutionId));
}
}

Expand Down