Skip to content

Commit f1d8724

Browse files
DaveCTurneromricohenn
authored andcommitted
Add more addTemporaryStateListener utils (elastic#125648)
We often call `addTemporaryStateListener` with the `ClusterService` of a random node, or the currently elected master. This commit adds utilities for this common pattern.
1 parent 4a2f1da commit f1d8724

File tree

7 files changed

+58
-37
lines changed

7 files changed

+58
-37
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/support/master/TransportMasterNodeActionIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,6 @@ private CountDownLatch configureElectionLatch(String newMaster, List<Releasable>
214214
*/
215215
private static String ensureSufficientMasterEligibleNodes() {
216216
final var votingConfigSizeListener = ClusterServiceUtils.addTemporaryStateListener(
217-
internalCluster().getAnyMasterNodeInstance(ClusterService.class),
218217
cs -> 5 <= cs.coordinationMetadata().getLastCommittedConfiguration().getNodeIds().size()
219218
);
220219

server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
2727
import org.elasticsearch.cluster.SnapshotsInProgress;
2828
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
29-
import org.elasticsearch.cluster.service.ClusterService;
3029
import org.elasticsearch.common.Strings;
3130
import org.elasticsearch.common.settings.Settings;
3231
import org.elasticsearch.common.util.concurrent.ListenableFuture;
@@ -2182,7 +2181,6 @@ public void testDeleteIndexWithOutOfOrderFinalization() {
21822181
// ensure each snapshot has really started before moving on to the next one
21832182
safeAwait(
21842183
ClusterServiceUtils.addTemporaryStateListener(
2185-
internalCluster().getInstance(ClusterService.class),
21862184
cs -> SnapshotsInProgress.get(cs)
21872185
.forRepo(repoName)
21882186
.stream()
@@ -2202,7 +2200,6 @@ public void testDeleteIndexWithOutOfOrderFinalization() {
22022200
final var indexRecreatedListener = ClusterServiceUtils
22032201
// wait until the snapshot has entered finalization
22042202
.addTemporaryStateListener(
2205-
internalCluster().getInstance(ClusterService.class),
22062203
cs -> SnapshotsInProgress.get(cs)
22072204
.forRepo(repoName)
22082205
.stream()

server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.cluster.SnapshotsInProgress;
2020
import org.elasticsearch.cluster.metadata.Metadata;
2121
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
22-
import org.elasticsearch.cluster.service.ClusterService;
2322
import org.elasticsearch.common.bytes.BytesReference;
2423
import org.elasticsearch.common.settings.Settings;
2524
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -848,12 +847,7 @@ public void testSnapshotWithMissingShardLevelIndexFile() throws Exception {
848847
clusterAdmin().prepareCloneSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snap-1", "test-snap-2")
849848
.setIndices("test-idx-1")
850849
.get();
851-
safeAwait(
852-
ClusterServiceUtils.addTemporaryStateListener(
853-
internalCluster().getInstance(ClusterService.class),
854-
cs -> SnapshotsInProgress.get(cs).isEmpty()
855-
)
856-
);
850+
safeAwait(ClusterServiceUtils.addTemporaryStateListener(cs -> SnapshotsInProgress.get(cs).isEmpty()));
857851
assertThat(
858852
clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, "test-repo")
859853
.setSnapshots("test-snap-2")

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1484,7 +1484,6 @@ public void run() {
14841484
indexShard.failShard("simulated", new ElasticsearchException("simulated"));
14851485
safeAwait(
14861486
ClusterServiceUtils.addTemporaryStateListener(
1487-
internalCluster().getInstance(ClusterService.class),
14881487
cs -> cs.metadata().getProject().index(indexName).primaryTerm(0) > primaryTerm
14891488
)
14901489
);

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1717
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
1818
import org.elasticsearch.cluster.metadata.IndexMetadata;
19-
import org.elasticsearch.cluster.service.ClusterService;
2019
import org.elasticsearch.cluster.service.MasterService;
2120
import org.elasticsearch.common.settings.Settings;
2221
import org.elasticsearch.snapshots.mockstore.MockRepository;
@@ -170,22 +169,19 @@ public void testDeleteSnapshotWhenWaitingForCompletion() throws Exception {
170169
*/
171170
private SubscribableListener<Void> createSnapshotDeletionListener(String repositoryName) {
172171
AtomicBoolean deleteHasStarted = new AtomicBoolean(false);
173-
return ClusterServiceUtils.addTemporaryStateListener(
174-
internalCluster().getCurrentMasterNodeInstance(ClusterService.class),
175-
state -> {
176-
SnapshotDeletionsInProgress deletionsInProgress = (SnapshotDeletionsInProgress) state.getCustoms()
177-
.get(SnapshotDeletionsInProgress.TYPE);
178-
if (deletionsInProgress == null) {
179-
return false;
180-
}
181-
if (deleteHasStarted.get() == false) {
182-
deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName));
183-
return false;
184-
} else {
185-
return deletionsInProgress.hasExecutingDeletion(repositoryName) == false;
186-
}
172+
return ClusterServiceUtils.addMasterTemporaryStateListener(state -> {
173+
SnapshotDeletionsInProgress deletionsInProgress = (SnapshotDeletionsInProgress) state.getCustoms()
174+
.get(SnapshotDeletionsInProgress.TYPE);
175+
if (deletionsInProgress == null) {
176+
return false;
187177
}
188-
);
178+
if (deleteHasStarted.get() == false) {
179+
deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName));
180+
return false;
181+
} else {
182+
return deletionsInProgress.hasExecutingDeletion(repositoryName) == false;
183+
}
184+
});
189185
}
190186

191187
public void testRerouteWhenShardSnapshotsCompleted() throws Exception {
@@ -209,13 +205,10 @@ public void testRerouteWhenShardSnapshotsCompleted() throws Exception {
209205
.put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", originalNode)
210206
);
211207

212-
final var shardMovedListener = ClusterServiceUtils.addTemporaryStateListener(
213-
internalCluster().getCurrentMasterNodeInstance(ClusterService.class),
214-
state -> {
215-
final var primaryShard = state.routingTable().index(indexName).shard(0).primaryShard();
216-
return primaryShard.started() && originalNode.equals(state.nodes().get(primaryShard.currentNodeId()).getName()) == false;
217-
}
218-
);
208+
final var shardMovedListener = ClusterServiceUtils.addMasterTemporaryStateListener(state -> {
209+
final var primaryShard = state.routingTable().index(indexName).shard(0).primaryShard();
210+
return primaryShard.started() && originalNode.equals(state.nodes().get(primaryShard.currentNodeId()).getName()) == false;
211+
});
219212
assertFalse(shardMovedListener.isDone());
220213

221214
unblockAllDataNodes(repoName);

test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,15 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta
263263
);
264264
}
265265

266+
/**
267+
* Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state
268+
* that satisfies {@code predicate}, at which point it unsubscribes itself.
269+
*
270+
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
271+
* given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is
272+
* already complete. If no matching cluster state is seen within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is
273+
* completed exceptionally on the scheduler thread that belongs to {@code clusterService}.
274+
*/
266275
public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) {
267276
final var listener = new SubscribableListener<Void>();
268277
final ClusterStateListener clusterStateListener = new ClusterStateListener() {
@@ -291,4 +300,35 @@ public String toString() {
291300
}
292301
return listener;
293302
}
303+
304+
/**
305+
* Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of one of the nodes in the
306+
* {@link ESIntegTestCase#internalCluster()}. When the chosen {@link ClusterService} applies a state that satisfies {@code predicate}
307+
* the listener unsubscribes itself.
308+
*
309+
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
310+
* {@link ClusterService} belonging to one of the nodes in the {@link ESIntegTestCase#internalCluster()}. If the current cluster
311+
* state already matches {@code predicate} then the returned listener is already complete. If no matching cluster state is seen
312+
* within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that
313+
* belongs to the chosen node's {@link ClusterService}.
314+
*/
315+
public static SubscribableListener<Void> addTemporaryStateListener(Predicate<ClusterState> predicate) {
316+
return addTemporaryStateListener(ESIntegTestCase.internalCluster().clusterService(), predicate);
317+
}
318+
319+
/**
320+
* Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of the current elected master node in the
321+
* {@link ESIntegTestCase#internalCluster()}. When this node's {@link ClusterService} applies a state that satisfies {@code predicate}
322+
* the listener unsubscribes itself.
323+
*
324+
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
325+
* {@link ClusterService} belonging to the node that was the elected master node in the
326+
* {@link ESIntegTestCase#internalCluster()} when this method was first called. If the current cluster state already matches
327+
* {@code predicate} then the returned listener is already complete. If no matching cluster state is seen within
328+
* {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that belongs to
329+
* the elected master node's {@link ClusterService}.
330+
*/
331+
public static SubscribableListener<Void> addMasterTemporaryStateListener(Predicate<ClusterState> predicate) {
332+
return addTemporaryStateListener(ESIntegTestCase.internalCluster().getCurrentMasterNodeInstance(ClusterService.class), predicate);
333+
}
294334
}

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -867,8 +867,7 @@ public static List<String> waitForDataStreamBackingIndices(String dataStreamName
867867
public static List<String> waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) {
868868
// We listen to the cluster state on the master node to ensure all other nodes have already acked the new cluster state.
869869
// This avoids inconsistencies in subsequent API calls which might hit a non-master node.
870-
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
871-
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
870+
final var listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
872871
final var dataStream = clusterState.metadata().getProject().dataStreams().get(dataStreamName);
873872
if (dataStream == null) {
874873
return false;

0 commit comments

Comments
 (0)