diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 7ce55313aa771..49ec798c21c03 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; @@ -51,6 +52,7 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; import org.elasticsearch.snapshots.SnapshotsInfoService; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportService; @@ -655,39 +657,39 @@ public void testCcrRepositoryFailsToFetchSnapshotShardSizes() throws Exception { try { final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); final PlainActionFuture waitForAllShardSnapshotSizesFailures = new PlainActionFuture<>(); - final ClusterStateListener listener = event -> { - if (RestoreInProgress.get(event.state()).isEmpty() == false && event.state().routingTable().hasIndex(followerIndex)) { - try { - final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(followerIndex); - // this assertBusy completes because the listener is added after the InternalSnapshotsInfoService - // and ClusterService preserves the order of listeners. - assertBusy(() -> { - List sizes = indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED) - .stream() - .filter(shard -> shard.unassignedInfo().lastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA) - .sorted(Comparator.comparingInt(ShardRouting::getId)) - .map(shard -> snapshotsInfoService.snapshotShardSizes().getShardSize(shard)) - .filter(Objects::nonNull) - .filter(size -> ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE == size) - .collect(Collectors.toList()); - assertThat(sizes, hasSize(numberOfShards)); - }); - waitForAllShardSnapshotSizesFailures.onResponse(null); - } catch (Exception e) { - throw new AssertionError("Failed to retrieve all snapshot shard sizes", e); - } + ClusterServiceUtils.addTemporaryStateListener( + clusterService, + state -> RestoreInProgress.get(state).isEmpty() == false && state.routingTable().hasIndex(followerIndex) + ).addListener(ActionTestUtils.assertNoFailureListener(ignore -> { + try { + // This listener runs synchronously in the same thread so that clusterService.state() returns the same state + // that satisfied the predicate. + final IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(followerIndex); + // this assertBusy completes because the listener is added after the InternalSnapshotsInfoService + // and ClusterService preserves the order of listeners. + assertBusy(() -> { + List sizes = indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED) + .stream() + .filter(shard -> shard.unassignedInfo().lastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA) + .sorted(Comparator.comparingInt(ShardRouting::getId)) + .map(shard -> snapshotsInfoService.snapshotShardSizes().getShardSize(shard)) + .filter(Objects::nonNull) + .filter(size -> ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE == size) + .collect(Collectors.toList()); + assertThat(sizes, hasSize(numberOfShards)); + }); + waitForAllShardSnapshotSizesFailures.onResponse(null); + } catch (Exception e) { + throw new AssertionError("Failed to retrieve all snapshot shard sizes", e); } - }; - - final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); - clusterService.addListener(listener); + })); logger.debug("--> creating follower index [{}]", followerIndex); followerClient().execute(PutFollowAction.INSTANCE, putFollow(leaderIndex, followerIndex, ActiveShardCount.NONE)); waitForAllShardSnapshotSizesFailures.get(30L, TimeUnit.SECONDS); - clusterService.removeListener(listener); assertThat(simulatedFailures.get(), equalTo(numberOfShards));