From 4cff0080210c37d3f2b6c969c373c81226cdf526 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 15 Jul 2025 11:53:47 +1000 Subject: [PATCH] [Test] Ensure assertion run only once (#131174) The assertion runs inside the cluster state listener may run a second time if the listener is not removed in time. It is possible that the assertion no longer holds on the 2nd time due to cluster state changes. There is no need to assert it more than once. This PR ensures assertion runs only once by removing the listener right after it. Resolves: #130274 --- .../xpack/ccr/CcrRepositoryIT.java | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index cca8228557aba..e2f334f2928cc 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; @@ -51,6 +52,7 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; import org.elasticsearch.snapshots.SnapshotsInfoService; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportService; @@ -655,39 +657,39 @@ public void testCcrRepositoryFailsToFetchSnapshotShardSizes() throws Exception { try { final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); final PlainActionFuture waitForAllShardSnapshotSizesFailures = new PlainActionFuture<>(); - final ClusterStateListener listener = event -> { - if (RestoreInProgress.get(event.state()).isEmpty() == false && event.state().routingTable().hasIndex(followerIndex)) { - try { - final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(followerIndex); - // this assertBusy completes because the listener is added after the InternalSnapshotsInfoService - // and ClusterService preserves the order of listeners. - assertBusy(() -> { - List sizes = indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED) - .stream() - .filter(shard -> shard.unassignedInfo().lastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA) - .sorted(Comparator.comparingInt(ShardRouting::getId)) - .map(shard -> snapshotsInfoService.snapshotShardSizes().getShardSize(shard)) - .filter(Objects::nonNull) - .filter(size -> ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE == size) - .collect(Collectors.toList()); - assertThat(sizes, hasSize(numberOfShards)); - }); - waitForAllShardSnapshotSizesFailures.onResponse(null); - } catch (Exception e) { - throw new AssertionError("Failed to retrieve all snapshot shard sizes", e); - } + ClusterServiceUtils.addTemporaryStateListener( + clusterService, + state -> RestoreInProgress.get(state).isEmpty() == false && state.routingTable().hasIndex(followerIndex) + ).addListener(ActionTestUtils.assertNoFailureListener(ignore -> { + try { + // This listener runs synchronously in the same thread so that clusterService.state() returns the same state + // that satisfied the predicate. + final IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(followerIndex); + // this assertBusy completes because the listener is added after the InternalSnapshotsInfoService + // and ClusterService preserves the order of listeners. + assertBusy(() -> { + List sizes = indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED) + .stream() + .filter(shard -> shard.unassignedInfo().lastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA) + .sorted(Comparator.comparingInt(ShardRouting::getId)) + .map(shard -> snapshotsInfoService.snapshotShardSizes().getShardSize(shard)) + .filter(Objects::nonNull) + .filter(size -> ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE == size) + .collect(Collectors.toList()); + assertThat(sizes, hasSize(numberOfShards)); + }); + waitForAllShardSnapshotSizesFailures.onResponse(null); + } catch (Exception e) { + throw new AssertionError("Failed to retrieve all snapshot shard sizes", e); } - }; - - final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); - clusterService.addListener(listener); + })); logger.debug("--> creating follower index [{}]", followerIndex); followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, followerIndex, ActiveShardCount.NONE)); waitForAllShardSnapshotSizesFailures.get(30L, TimeUnit.SECONDS); - clusterService.removeListener(listener); assertThat(simulatedFailures.get(), equalTo(numberOfShards));