Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ private CountDownLatch configureElectionLatch(String newMaster, List<Releasable>
*/
private static String ensureSufficientMasterEligibleNodes() {
final var votingConfigSizeListener = ClusterServiceUtils.addTemporaryStateListener(
internalCluster().getAnyMasterNodeInstance(ClusterService.class),
cs -> 5 <= cs.coordinationMetadata().getLastCommittedConfiguration().getNodeIds().size()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -2182,7 +2181,6 @@ public void testDeleteIndexWithOutOfOrderFinalization() {
// ensure each snapshot has really started before moving on to the next one
safeAwait(
ClusterServiceUtils.addTemporaryStateListener(
internalCluster().getInstance(ClusterService.class),
cs -> SnapshotsInProgress.get(cs)
.forRepo(repoName)
.stream()
Expand All @@ -2202,7 +2200,6 @@ public void testDeleteIndexWithOutOfOrderFinalization() {
final var indexRecreatedListener = ClusterServiceUtils
// wait until the snapshot has entered finalization
.addTemporaryStateListener(
internalCluster().getInstance(ClusterService.class),
cs -> SnapshotsInProgress.get(cs)
.forRepo(repoName)
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -848,12 +847,7 @@ public void testSnapshotWithMissingShardLevelIndexFile() throws Exception {
clusterAdmin().prepareCloneSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snap-1", "test-snap-2")
.setIndices("test-idx-1")
.get();
safeAwait(
ClusterServiceUtils.addTemporaryStateListener(
internalCluster().getInstance(ClusterService.class),
cs -> SnapshotsInProgress.get(cs).isEmpty()
)
);
safeAwait(ClusterServiceUtils.addTemporaryStateListener(cs -> SnapshotsInProgress.get(cs).isEmpty()));
assertThat(
clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, "test-repo")
.setSnapshots("test-snap-2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1484,7 +1484,6 @@ public void run() {
indexShard.failShard("simulated", new ElasticsearchException("simulated"));
safeAwait(
ClusterServiceUtils.addTemporaryStateListener(
internalCluster().getInstance(ClusterService.class),
cs -> cs.metadata().getProject().index(indexName).primaryTerm(0) > primaryTerm
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.mockstore.MockRepository;
Expand Down Expand Up @@ -170,22 +169,19 @@ public void testDeleteSnapshotWhenWaitingForCompletion() throws Exception {
*/
private SubscribableListener<Void> createSnapshotDeletionListener(String repositoryName) {
AtomicBoolean deleteHasStarted = new AtomicBoolean(false);
return ClusterServiceUtils.addTemporaryStateListener(
internalCluster().getCurrentMasterNodeInstance(ClusterService.class),
state -> {
SnapshotDeletionsInProgress deletionsInProgress = (SnapshotDeletionsInProgress) state.getCustoms()
.get(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress == null) {
return false;
}
if (deleteHasStarted.get() == false) {
deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName));
return false;
} else {
return deletionsInProgress.hasExecutingDeletion(repositoryName) == false;
}
return ClusterServiceUtils.addMasterTemporaryStateListener(state -> {
SnapshotDeletionsInProgress deletionsInProgress = (SnapshotDeletionsInProgress) state.getCustoms()
.get(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress == null) {
return false;
}
);
if (deleteHasStarted.get() == false) {
deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName));
return false;
} else {
return deletionsInProgress.hasExecutingDeletion(repositoryName) == false;
}
});
}

public void testRerouteWhenShardSnapshotsCompleted() throws Exception {
Expand All @@ -209,13 +205,10 @@ public void testRerouteWhenShardSnapshotsCompleted() throws Exception {
.put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", originalNode)
);

final var shardMovedListener = ClusterServiceUtils.addTemporaryStateListener(
internalCluster().getCurrentMasterNodeInstance(ClusterService.class),
state -> {
final var primaryShard = state.routingTable().index(indexName).shard(0).primaryShard();
return primaryShard.started() && originalNode.equals(state.nodes().get(primaryShard.currentNodeId()).getName()) == false;
}
);
final var shardMovedListener = ClusterServiceUtils.addMasterTemporaryStateListener(state -> {
final var primaryShard = state.routingTable().index(indexName).shard(0).primaryShard();
return primaryShard.started() && originalNode.equals(state.nodes().get(primaryShard.currentNodeId()).getName()) == false;
});
assertFalse(shardMovedListener.isDone());

unblockAllDataNodes(repoName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta
);
}

/**
* Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state
* that satisfies {@code predicate}, at which point it unsubscribes itself.
*
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
* given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is
* already complete. If no matching cluster state is seen within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is
* completed exceptionally on the scheduler thread that belongs to {@code clusterService}.
*/
public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) {
final var listener = new SubscribableListener<Void>();
final ClusterStateListener clusterStateListener = new ClusterStateListener() {
Expand Down Expand Up @@ -291,4 +300,35 @@ public String toString() {
}
return listener;
}

/**
* Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of one of the nodes in the
* {@link ESIntegTestCase#internalCluster()}. When the chosen {@link ClusterService} applies a state that satisfies {@code predicate}
* the listener unsubscribes itself.
*
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
* {@link ClusterService} belonging to one of the nodes in the {@link ESIntegTestCase#internalCluster()}. If the current cluster
* state already matches {@code predicate} then the returned listener is already complete. If no matching cluster state is seen
* within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that
* belongs to the chosen node's {@link ClusterService}.
*/
public static SubscribableListener<Void> addTemporaryStateListener(Predicate<ClusterState> predicate) {
return addTemporaryStateListener(ESIntegTestCase.internalCluster().clusterService(), predicate);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, for some reason I assumed ESIntegTestCase#internalCluster() was only available in ESIntegTestCase and descendants. Realizing it's publicly available, I definitely agree we should try to keep utility methods outside of ESIntegTestCase as that class is already huge. I'll move the data stream utility methods I recently added in ESIntegTestCase to a different class sometime.

}

/**
* Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of the current elected master node in the
* {@link ESIntegTestCase#internalCluster()}. When this node's {@link ClusterService} applies a state that satisfies {@code predicate}
* the listener unsubscribes itself.
*
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
* {@link ClusterService} belonging to the node that was the elected master node in the
* {@link ESIntegTestCase#internalCluster()} when this method was first called. If the current cluster state already matches
* {@code predicate} then the returned listener is already complete. If no matching cluster state is seen within
* {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that belongs to
* the elected master node's {@link ClusterService}.
*/
public static SubscribableListener<Void> addMasterTemporaryStateListener(Predicate<ClusterState> predicate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ on explicitly mentioning master in the method name. Should we mention randomNode (or something similar) in the other method? You're updating those usages anyway, so it won't result in a lot more changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conventionally we don't say randomNode in these things (or else we'd say it almost everywhere). I'm going to stick with that, but if we changed it everywhere then that'd be ok too :)

return addTemporaryStateListener(ESIntegTestCase.internalCluster().getCurrentMasterNodeInstance(ClusterService.class), predicate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -867,8 +867,7 @@ public static List<String> waitForDataStreamBackingIndices(String dataStreamName
public static List<String> waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) {
// We listen to the cluster state on the master node to ensure all other nodes have already acked the new cluster state.
// This avoids inconsistencies in subsequent API calls which might hit a non-master node.
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
final var listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
final var dataStream = clusterState.metadata().getProject().dataStreams().get(dataStreamName);
if (dataStream == null) {
return false;
Expand Down