diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index 9667b559ea6b0..b49f45273f337 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -9,6 +9,7 @@ import org.elasticsearch.Build; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.core.Tuple; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; @@ -133,6 +134,9 @@ public void testStopQueryLocal() throws Exception { int remote2NumShards = (Integer) testClusterInfo.get("remote2.num_shards"); populateRuntimeIndex(LOCAL_CLUSTER, "pause", INDEX_WITH_BLOCKING_MAPPING); + // Gets random node client but ensure it's the same node for all operations + Client client = cluster(LOCAL_CLUSTER).client(); + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); boolean responseExpectMeta = includeCCSMetadata.v2(); // By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. @@ -145,7 +149,7 @@ public void testStopQueryLocal() throws Exception { int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax(); LOGGER.info("--> Launching async query"); final String asyncExecutionId = startAsyncQueryWithPragmas( - client(), + client, "FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1", includeCCSMetadata.v1(), Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1)) @@ -155,8 +159,8 @@ public void testStopQueryLocal() throws Exception { assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); // wait until the remotes are done - waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId); - waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId); + waitForCluster(client, REMOTE_CLUSTER_1, asyncExecutionId); + waitForCluster(client, REMOTE_CLUSTER_2, asyncExecutionId); /* at this point: * the query against remotes should be finished @@ -165,10 +169,10 @@ public void testStopQueryLocal() throws Exception { // run the stop query AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId); LOGGER.info("Launching stop for {}", asyncExecutionId); - ActionFuture stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + ActionFuture stopAction = client.execute(EsqlAsyncStopAction.INSTANCE, stopRequest); // ensure stop operation is running assertBusy(() -> { - try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) { + try (EsqlQueryResponse asyncResponse = getAsyncResponse(client, asyncExecutionId)) { EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); LOGGER.info("Waiting for stop operation to start, current status: {}", executionInfo); assertNotNull(executionInfo); @@ -212,7 +216,7 @@ public void testStopQueryLocal() throws Exception { } } finally { SimplePauseFieldPlugin.allowEmitting.countDown(); - assertAcked(deleteAsyncId(client(), asyncExecutionId)); + assertAcked(deleteAsyncId(client, asyncExecutionId)); } }