From f97083d0f936e6e7352d5027253133c064aad12c Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 15 Jul 2025 11:51:57 -0600 Subject: [PATCH 1/2] Fix testStopQueryLocal - always use the same client (same node) (#131253) (cherry picked from commit bfab88c430a727de63188687a2c4655276a10212) # Conflicts: # muted-tests.yml # x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java --- .../esql/action/CrossClusterAsyncQueryStopIT.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index 9667b559ea6b0..d453cfac9d8bc 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -9,6 +9,7 @@ import org.elasticsearch.Build; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.core.Tuple; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; @@ -133,6 +134,9 @@ public void testStopQueryLocal() throws Exception { int remote2NumShards = (Integer) testClusterInfo.get("remote2.num_shards"); populateRuntimeIndex(LOCAL_CLUSTER, "pause", INDEX_WITH_BLOCKING_MAPPING); + // Gets random node client but ensure it's the same node for all operations + Client client = cluster(LOCAL_CLUSTER).client(); + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); boolean responseExpectMeta = includeCCSMetadata.v2(); // By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. @@ -145,7 +149,7 @@ public void testStopQueryLocal() throws Exception { int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax(); LOGGER.info("--> Launching async query"); final String asyncExecutionId = startAsyncQueryWithPragmas( - client(), + client, "FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1", includeCCSMetadata.v1(), Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1)) @@ -165,10 +169,10 @@ public void testStopQueryLocal() throws Exception { // run the stop query AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId); LOGGER.info("Launching stop for {}", asyncExecutionId); - ActionFuture stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + ActionFuture stopAction = client.execute(EsqlAsyncStopAction.INSTANCE, stopRequest); // ensure stop operation is running assertBusy(() -> { - try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) { + try (EsqlQueryResponse asyncResponse = getAsyncResponse(client, asyncExecutionId)) { EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); LOGGER.info("Waiting for stop operation to start, current status: {}", executionInfo); assertNotNull(executionInfo); @@ -212,7 +216,7 @@ public void testStopQueryLocal() throws Exception { } } finally { SimplePauseFieldPlugin.allowEmitting.countDown(); - assertAcked(deleteAsyncId(client(), asyncExecutionId)); + assertAcked(deleteAsyncId(client, asyncExecutionId)); } } From 01fcc090e93f74ddbd0ec69fd3bc620c8c8ff1b7 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 15 Jul 2025 12:02:12 -0600 Subject: [PATCH 2/2] Fix merge --- .../xpack/esql/action/CrossClusterAsyncQueryStopIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index d453cfac9d8bc..b49f45273f337 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -159,8 +159,8 @@ public void testStopQueryLocal() throws Exception { assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); // wait until the remotes are done - waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId); - waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId); + waitForCluster(client, REMOTE_CLUSTER_1, asyncExecutionId); + waitForCluster(client, REMOTE_CLUSTER_2, asyncExecutionId); /* at this point: * the query against remotes should be finished