From 060a3a87e165b1099c3c32d23d2b3c347ae04fb5 Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Mon, 29 Sep 2025 09:43:57 -0400 Subject: [PATCH 1/5] Wait until remote clusters are connected --- ...actSemanticCrossClusterSearchTestCase.java | 31 ++++++++++++++++++- ...MatchQueryBuilderCrossClusterSearchIT.java | 2 +- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java index 685453fa77c78..95e1a9cf00738 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java @@ -8,6 +8,9 @@ package org.elasticsearch.search.ccs; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest; +import org.elasticsearch.action.admin.cluster.remote.RemoteInfoResponse; +import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction; import org.elasticsearch.action.search.OpenPointInTimeRequest; import org.elasticsearch.action.search.OpenPointInTimeResponse; import org.elasticsearch.action.search.SearchRequest; @@ -35,6 +38,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.transport.RemoteConnectionInfo; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; @@ -50,6 +54,7 @@ import org.elasticsearch.xpack.ml.action.TransportCoordinatedInferenceAction; import java.io.IOException; +import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -92,9 +97,10 @@ protected Collection> nodePlugins(String clusterAlias) { return List.of(LocalStateInferencePlugin.class, TestInferenceServicePlugin.class, FakeMlPlugin.class); } - protected void setupTwoClusters(TestIndexInfo localIndexInfo, TestIndexInfo remoteIndexInfo) throws IOException { + protected void setupTwoClusters(TestIndexInfo localIndexInfo, TestIndexInfo remoteIndexInfo) throws Exception { setupCluster(LOCAL_CLUSTER, localIndexInfo); setupCluster(REMOTE_CLUSTER, remoteIndexInfo); + waitUntilRemoteClusterConnected(REMOTE_CLUSTER); } protected void setupCluster(String clusterAlias, TestIndexInfo indexInfo) throws IOException { @@ -140,6 +146,29 @@ protected void setupCluster(String clusterAlias, TestIndexInfo indexInfo) throws assertThat(refreshResponse.getStatus(), is(RestStatus.OK)); } + protected void waitUntilRemoteClusterConnected(String clusterAlias) throws InterruptedException { + RemoteInfoRequest request = new RemoteInfoRequest(); + boolean connected; + int attempts = 0; + int delay = 0; + do { + if (delay > 0) { + // Delay between retries so that we don't use up all our attempts in a tight loop + Thread.sleep(Duration.ofSeconds(delay)); + } + RemoteInfoResponse response = client().execute(TransportRemoteInfoAction.TYPE, request).actionGet(TEST_REQUEST_TIMEOUT); + connected = response.getInfos() + .stream() + .filter(i -> i.getClusterAlias().equals(clusterAlias)) + .anyMatch(RemoteConnectionInfo::isConnected); + delay += 5; + } while (connected == false && attempts++ < 5); + + if (connected == false) { + throw new AssertionError("Cannot connect to remote cluster [" + clusterAlias + "]"); + } + } + protected BytesReference openPointInTime(String[] indices, TimeValue keepAlive) { OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).keepAlive(keepAlive); final OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet(); diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/MatchQueryBuilderCrossClusterSearchIT.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/MatchQueryBuilderCrossClusterSearchIT.java index a83f7fa80e461..671ed52532fc9 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/MatchQueryBuilderCrossClusterSearchIT.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/MatchQueryBuilderCrossClusterSearchIT.java @@ -151,7 +151,7 @@ public void testMatchQueryWithCcsMinimizeRoundTripsFalse() throws Exception { ); } - private void configureClusters() throws IOException { + private void configureClusters() throws Exception { final String commonInferenceId = "common-inference-id"; final String localInferenceId = "local-inference-id"; final String remoteInferenceId = "remote-inference-id"; From 9677a19517a1b6fe4766f006dcaaae3d22c8e00d Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Mon, 29 Sep 2025 10:11:18 -0400 Subject: [PATCH 2/5] Wait for index green status --- .../ccs/AbstractSemanticCrossClusterSearchTestCase.java | 5 +++-- .../search/ccs/MatchQueryBuilderCrossClusterSearchIT.java | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java index 95e1a9cf00738..4d8810029055f 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java @@ -106,6 +106,7 @@ protected void setupTwoClusters(TestIndexInfo localIndexInfo, TestIndexInfo remo protected void setupCluster(String clusterAlias, TestIndexInfo indexInfo) throws IOException { final Client client = client(clusterAlias); final String indexName = indexInfo.name(); + final int dataNodeCount = cluster(clusterAlias).numDataNodes(); for (var entry : indexInfo.inferenceEndpoints().entrySet()) { String inferenceId = entry.getKey(); @@ -123,13 +124,13 @@ protected void setupCluster(String clusterAlias, TestIndexInfo indexInfo) throws createInferenceEndpoint(client, minimalServiceSettings.taskType(), inferenceId, serviceSettings); } - Settings indexSettings = indexSettings(randomIntBetween(2, 5), randomIntBetween(0, 1)).build(); + Settings indexSettings = indexSettings(randomIntBetween(1, dataNodeCount), 0).build(); assertAcked(client.admin().indices().prepareCreate(indexName).setSettings(indexSettings).setMapping(indexInfo.mappings())); assertFalse( client.admin() .cluster() .prepareHealth(TEST_REQUEST_TIMEOUT, indexName) - .setWaitForYellowStatus() + .setWaitForGreenStatus() .setTimeout(TimeValue.timeValueSeconds(10)) .get() .isTimedOut() diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/MatchQueryBuilderCrossClusterSearchIT.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/MatchQueryBuilderCrossClusterSearchIT.java index 671ed52532fc9..d92f0f6ef7373 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/MatchQueryBuilderCrossClusterSearchIT.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/MatchQueryBuilderCrossClusterSearchIT.java @@ -17,7 +17,6 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.Before; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; From 652e89e0c95ea299d1186d62b7ab8d901d2575fb Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Mon, 29 Sep 2025 10:17:07 -0400 Subject: [PATCH 3/5] Unmute tests --- muted-tests.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index b659f5e741584..0cd1e92efcce6 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -597,18 +597,12 @@ tests: - class: org.elasticsearch.action.admin.cluster.stats.SearchUsageStatsTests method: testToXContent issue: https://github.com/elastic/elasticsearch/issues/135558 -- class: org.elasticsearch.search.ccs.SparseVectorQueryBuilderCrossClusterSearchIT - method: testSparseVectorQueryWithCcsMinimizeRoundTripsFalse - issue: https://github.com/elastic/elasticsearch/issues/135559 - class: org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDeciderTests method: testCanAllocatePrimaryExistingInRestoreInProgress issue: https://github.com/elastic/elasticsearch/issues/135566 - class: org.elasticsearch.xpack.esql.inference.textembedding.TextEmbeddingOperatorTests method: testSimpleCircuitBreaking issue: https://github.com/elastic/elasticsearch/issues/135569 -- class: org.elasticsearch.search.ccs.KnnVectorQueryBuilderCrossClusterSearchIT - method: testKnnQueryWithCcsMinimizeRoundTripsFalse - issue: https://github.com/elastic/elasticsearch/issues/135573 - class: org.elasticsearch.multiproject.test.XpackWithMultipleProjectsClientYamlTestSuiteIT method: test {yaml=esql/60_usage/Basic ESQL usage output (telemetry) snapshot version} issue: https://github.com/elastic/elasticsearch/issues/135579 From 0b9a1aa20878a7b1a552b50652c9758a716a95e0 Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Mon, 29 Sep 2025 10:25:48 -0400 Subject: [PATCH 4/5] Rename variable --- .../ccs/AbstractSemanticCrossClusterSearchTestCase.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java index 4d8810029055f..3ec6344f014ec 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java @@ -151,18 +151,18 @@ protected void waitUntilRemoteClusterConnected(String clusterAlias) throws Inter RemoteInfoRequest request = new RemoteInfoRequest(); boolean connected; int attempts = 0; - int delay = 0; + int delayInSeconds = 0; do { - if (delay > 0) { + if (delayInSeconds > 0) { // Delay between retries so that we don't use up all our attempts in a tight loop - Thread.sleep(Duration.ofSeconds(delay)); + Thread.sleep(Duration.ofSeconds(delayInSeconds)); } RemoteInfoResponse response = client().execute(TransportRemoteInfoAction.TYPE, request).actionGet(TEST_REQUEST_TIMEOUT); connected = response.getInfos() .stream() .filter(i -> i.getClusterAlias().equals(clusterAlias)) .anyMatch(RemoteConnectionInfo::isConnected); - delay += 5; + delayInSeconds += 5; } while (connected == false && attempts++ < 5); if (connected == false) { From f07145fb40ec3a0c99aae3d1e0148c15a7b26a3a Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Mon, 29 Sep 2025 12:39:17 -0400 Subject: [PATCH 5/5] Simplified waitUntilRemoteClusterConnected --- ...actSemanticCrossClusterSearchTestCase.java | 23 +++++-------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java index 3ec6344f014ec..e1f72ddec5aab 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/search/ccs/AbstractSemanticCrossClusterSearchTestCase.java @@ -54,13 +54,13 @@ import org.elasticsearch.xpack.ml.action.TransportCoordinatedInferenceAction; import java.io.IOException; -import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -147,27 +147,16 @@ protected void setupCluster(String clusterAlias, TestIndexInfo indexInfo) throws assertThat(refreshResponse.getStatus(), is(RestStatus.OK)); } - protected void waitUntilRemoteClusterConnected(String clusterAlias) throws InterruptedException { + protected void waitUntilRemoteClusterConnected(String clusterAlias) throws Exception { RemoteInfoRequest request = new RemoteInfoRequest(); - boolean connected; - int attempts = 0; - int delayInSeconds = 0; - do { - if (delayInSeconds > 0) { - // Delay between retries so that we don't use up all our attempts in a tight loop - Thread.sleep(Duration.ofSeconds(delayInSeconds)); - } + assertBusy(() -> { RemoteInfoResponse response = client().execute(TransportRemoteInfoAction.TYPE, request).actionGet(TEST_REQUEST_TIMEOUT); - connected = response.getInfos() + boolean connected = response.getInfos() .stream() .filter(i -> i.getClusterAlias().equals(clusterAlias)) .anyMatch(RemoteConnectionInfo::isConnected); - delayInSeconds += 5; - } while (connected == false && attempts++ < 5); - - if (connected == false) { - throw new AssertionError("Cannot connect to remote cluster [" + clusterAlias + "]"); - } + assertThat(connected, is(true)); + }, 30, TimeUnit.SECONDS); } protected BytesReference openPointInTime(String[] indices, TimeValue keepAlive) {