From c93b1fee9fc28b974123a49a13d6503c4b038d71 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 14 Jul 2025 18:04:50 -0600 Subject: [PATCH] Fix testStopQueryLocal - always use the same client (same node) --- muted-tests.yml | 3 --- .../action/CrossClusterAsyncQueryStopIT.java | 16 ++++++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 5487810e2fe1e..baad6d1ede9d0 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -517,9 +517,6 @@ tests: - class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT method: testRowStatsProjectGroupByInt issue: https://github.com/elastic/elasticsearch/issues/131024 -- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT - method: testStopQueryLocal - issue: https://github.com/elastic/elasticsearch/issues/121672 - class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT method: test {lookup-join.MvJoinKeyOnFromAfterStats ASYNC} issue: https://github.com/elastic/elasticsearch/issues/131148 diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index 049d7fc4cf94b..6c765888cd66c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -9,6 +9,7 @@ import org.elasticsearch.Build; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.core.Tuple; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; @@ -133,6 +134,9 @@ public void testStopQueryLocal() throws Exception { int remote2NumShards = (Integer) testClusterInfo.get("remote2.num_shards"); populateRuntimeIndex(LOCAL_CLUSTER, "pause", INDEX_WITH_BLOCKING_MAPPING); + // Gets random node client but ensure it's the same node for all operations + Client client = cluster(LOCAL_CLUSTER).client(); + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); boolean responseExpectMeta = includeCCSMetadata.v2(); // By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. @@ -145,7 +149,7 @@ public void testStopQueryLocal() throws Exception { int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax(); LOGGER.info("--> Launching async query"); final String asyncExecutionId = startAsyncQueryWithPragmas( - client(), + client, "FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1", includeCCSMetadata.v1(), Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1)) @@ -157,9 +161,9 @@ public void testStopQueryLocal() throws Exception { // wait until the remotes are done LOGGER.info("--> Waiting for remotes", asyncExecutionId); - waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId); + waitForCluster(client, REMOTE_CLUSTER_1, asyncExecutionId); LOGGER.info("--> Remote 1 done", asyncExecutionId); - waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId); + waitForCluster(client, REMOTE_CLUSTER_2, asyncExecutionId); LOGGER.info("--> Remote 2 done", asyncExecutionId); /* at this point: @@ -169,10 +173,10 @@ public void testStopQueryLocal() throws Exception { // run the stop query AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId); LOGGER.info("Launching stop for {}", asyncExecutionId); - ActionFuture stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + ActionFuture stopAction = client.execute(EsqlAsyncStopAction.INSTANCE, stopRequest); // ensure stop operation is running assertBusy(() -> { - try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) { + try (EsqlQueryResponse asyncResponse = getAsyncResponse(client, asyncExecutionId)) { EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); LOGGER.info("--> Waiting for stop operation to start, current status: {}", executionInfo); assertNotNull(executionInfo); @@ -216,7 +220,7 @@ public void testStopQueryLocal() throws Exception { } } finally { SimplePauseFieldPlugin.allowEmitting.countDown(); - assertAcked(deleteAsyncId(client(), asyncExecutionId)); + assertAcked(deleteAsyncId(client, asyncExecutionId)); } }