From 6cfa8a3b7eba7a485cca7a867f95de3cd3899042 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 15 Jul 2025 11:53:47 +1000 Subject: [PATCH] [Test] Ensure assertion run only once (#131174) The assertion runs inside the cluster state listener may run a second time if the listener is not removed in time. It is possible that the assertion no longer holds on the 2nd time due to cluster state changes. There is no need to assert it more than once. This PR ensures assertion runs only once by removing the listener right after it. Resolves: #130274 --- .../xpack/ccr/CcrRepositoryIT.java | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 7ce55313aa771..49ec798c21c03 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; @@ -51,6 +52,7 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; import org.elasticsearch.snapshots.SnapshotsInfoService; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportService; @@ -655,39 +657,39 @@ public void testCcrRepositoryFailsToFetchSnapshotShardSizes() throws Exception { try { final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); final PlainActionFuture waitForAllShardSnapshotSizesFailures = new PlainActionFuture<>(); - final ClusterStateListener listener = event -> { - if (RestoreInProgress.get(event.state()).isEmpty() == false && event.state().routingTable().hasIndex(followerIndex)) { - try { - final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(followerIndex); - // this assertBusy completes because the listener is added after the InternalSnapshotsInfoService - // and ClusterService preserves the order of listeners. - assertBusy(() -> { - List sizes = indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED) - .stream() - .filter(shard -> shard.unassignedInfo().lastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA) - .sorted(Comparator.comparingInt(ShardRouting::getId)) - .map(shard -> snapshotsInfoService.snapshotShardSizes().getShardSize(shard)) - .filter(Objects::nonNull) - .filter(size -> ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE == size) - .collect(Collectors.toList()); - assertThat(sizes, hasSize(numberOfShards)); - }); - waitForAllShardSnapshotSizesFailures.onResponse(null); - } catch (Exception e) { - throw new AssertionError("Failed to retrieve all snapshot shard sizes", e); - } + ClusterServiceUtils.addTemporaryStateListener( + clusterService, + state -> RestoreInProgress.get(state).isEmpty() == false && state.routingTable().hasIndex(followerIndex) + ).addListener(ActionTestUtils.assertNoFailureListener(ignore -> { + try { + // This listener runs synchronously in the same thread so that clusterService.state() returns the same state + // that satisfied the predicate. + final IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(followerIndex); + // this assertBusy completes because the listener is added after the InternalSnapshotsInfoService + // and ClusterService preserves the order of listeners. + assertBusy(() -> { + List sizes = indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED) + .stream() + .filter(shard -> shard.unassignedInfo().lastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA) + .sorted(Comparator.comparingInt(ShardRouting::getId)) + .map(shard -> snapshotsInfoService.snapshotShardSizes().getShardSize(shard)) + .filter(Objects::nonNull) + .filter(size -> ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE == size) + .collect(Collectors.toList()); + assertThat(sizes, hasSize(numberOfShards)); + }); + waitForAllShardSnapshotSizesFailures.onResponse(null); + } catch (Exception e) { + throw new AssertionError("Failed to retrieve all snapshot shard sizes", e); } - }; - - final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); - clusterService.addListener(listener); + })); logger.debug("--> creating follower index [{}]", followerIndex); followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, followerIndex, ActiveShardCount.NONE)); waitForAllShardSnapshotSizesFailures.get(30L, TimeUnit.SECONDS); - clusterService.removeListener(listener); assertThat(simulatedFailures.get(), equalTo(numberOfShards));