Skip to content

Commit c8503f3

Browse files
committed
Add more addTemporaryStateListener utils
We often call `addTemporaryStateListener` with the `ClusterService` of a random node, or the currently elected master. This commit adds utilities for this common pattern.
1 parent 16441fe commit c8503f3

File tree

7 files changed

+59
-36
lines changed

7 files changed

+59
-36
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/support/master/TransportMasterNodeActionIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,6 @@ private CountDownLatch configureElectionLatch(String newMaster, List<Releasable>
214214
*/
215215
private static String ensureSufficientMasterEligibleNodes() {
216216
final var votingConfigSizeListener = ClusterServiceUtils.addTemporaryStateListener(
217-
internalCluster().getAnyMasterNodeInstance(ClusterService.class),
218217
cs -> 5 <= cs.coordinationMetadata().getLastCommittedConfiguration().getNodeIds().size()
219218
);
220219

server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2182,7 +2182,6 @@ public void testDeleteIndexWithOutOfOrderFinalization() {
21822182
// ensure each snapshot has really started before moving on to the next one
21832183
safeAwait(
21842184
ClusterServiceUtils.addTemporaryStateListener(
2185-
internalCluster().getInstance(ClusterService.class),
21862185
cs -> SnapshotsInProgress.get(cs)
21872186
.forRepo(repoName)
21882187
.stream()
@@ -2202,7 +2201,6 @@ public void testDeleteIndexWithOutOfOrderFinalization() {
22022201
final var indexRecreatedListener = ClusterServiceUtils
22032202
// wait until the snapshot has entered finalization
22042203
.addTemporaryStateListener(
2205-
internalCluster().getInstance(ClusterService.class),
22062204
cs -> SnapshotsInProgress.get(cs)
22072205
.forRepo(repoName)
22082206
.stream()

server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.cluster.SnapshotsInProgress;
2020
import org.elasticsearch.cluster.metadata.Metadata;
2121
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
22-
import org.elasticsearch.cluster.service.ClusterService;
2322
import org.elasticsearch.common.bytes.BytesReference;
2423
import org.elasticsearch.common.settings.Settings;
2524
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -848,12 +847,7 @@ public void testSnapshotWithMissingShardLevelIndexFile() throws Exception {
848847
clusterAdmin().prepareCloneSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snap-1", "test-snap-2")
849848
.setIndices("test-idx-1")
850849
.get();
851-
safeAwait(
852-
ClusterServiceUtils.addTemporaryStateListener(
853-
internalCluster().getInstance(ClusterService.class),
854-
cs -> SnapshotsInProgress.get(cs).isEmpty()
855-
)
856-
);
850+
safeAwait(ClusterServiceUtils.addTemporaryStateListener(cs -> SnapshotsInProgress.get(cs).isEmpty()));
857851
assertThat(
858852
clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, "test-repo")
859853
.setSnapshots("test-snap-2")

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1484,7 +1484,6 @@ public void run() {
14841484
indexShard.failShard("simulated", new ElasticsearchException("simulated"));
14851485
safeAwait(
14861486
ClusterServiceUtils.addTemporaryStateListener(
1487-
internalCluster().getInstance(ClusterService.class),
14881487
cs -> cs.metadata().getProject().index(indexName).primaryTerm(0) > primaryTerm
14891488
)
14901489
);

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1717
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
1818
import org.elasticsearch.cluster.metadata.IndexMetadata;
19-
import org.elasticsearch.cluster.service.ClusterService;
2019
import org.elasticsearch.cluster.service.MasterService;
2120
import org.elasticsearch.common.settings.Settings;
2221
import org.elasticsearch.snapshots.mockstore.MockRepository;
@@ -170,22 +169,19 @@ public void testDeleteSnapshotWhenWaitingForCompletion() throws Exception {
170169
*/
171170
private SubscribableListener<Void> createSnapshotDeletionListener(String repositoryName) {
172171
AtomicBoolean deleteHasStarted = new AtomicBoolean(false);
173-
return ClusterServiceUtils.addTemporaryStateListener(
174-
internalCluster().getCurrentMasterNodeInstance(ClusterService.class),
175-
state -> {
176-
SnapshotDeletionsInProgress deletionsInProgress = (SnapshotDeletionsInProgress) state.getCustoms()
177-
.get(SnapshotDeletionsInProgress.TYPE);
178-
if (deletionsInProgress == null) {
179-
return false;
180-
}
181-
if (deleteHasStarted.get() == false) {
182-
deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName));
183-
return false;
184-
} else {
185-
return deletionsInProgress.hasExecutingDeletion(repositoryName) == false;
186-
}
172+
return ClusterServiceUtils.addMasterTemporaryStateListener(state -> {
173+
SnapshotDeletionsInProgress deletionsInProgress = (SnapshotDeletionsInProgress) state.getCustoms()
174+
.get(SnapshotDeletionsInProgress.TYPE);
175+
if (deletionsInProgress == null) {
176+
return false;
187177
}
188-
);
178+
if (deleteHasStarted.get() == false) {
179+
deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName));
180+
return false;
181+
} else {
182+
return deletionsInProgress.hasExecutingDeletion(repositoryName) == false;
183+
}
184+
});
189185
}
190186

191187
public void testRerouteWhenShardSnapshotsCompleted() throws Exception {
@@ -209,13 +205,10 @@ public void testRerouteWhenShardSnapshotsCompleted() throws Exception {
209205
.put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", originalNode)
210206
);
211207

212-
final var shardMovedListener = ClusterServiceUtils.addTemporaryStateListener(
213-
internalCluster().getCurrentMasterNodeInstance(ClusterService.class),
214-
state -> {
215-
final var primaryShard = state.routingTable().index(indexName).shard(0).primaryShard();
216-
return primaryShard.started() && originalNode.equals(state.nodes().get(primaryShard.currentNodeId()).getName()) == false;
217-
}
218-
);
208+
final var shardMovedListener = ClusterServiceUtils.addMasterTemporaryStateListener(state -> {
209+
final var primaryShard = state.routingTable().index(indexName).shard(0).primaryShard();
210+
return primaryShard.started() && originalNode.equals(state.nodes().get(primaryShard.currentNodeId()).getName()) == false;
211+
});
219212
assertFalse(shardMovedListener.isDone());
220213

221214
unblockAllDataNodes(repoName);

test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.common.settings.ClusterSettings;
3737
import org.elasticsearch.common.settings.Settings;
3838
import org.elasticsearch.common.util.concurrent.EsExecutors;
39+
import org.elasticsearch.core.Predicates;
3940
import org.elasticsearch.core.TimeValue;
4041
import org.elasticsearch.node.NodeClosedException;
4142
import org.elasticsearch.tasks.TaskManager;
@@ -263,6 +264,15 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta
263264
);
264265
}
265266

267+
/**
268+
* Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state
269+
* that satisfies {@code predicate}, at which point it unsubscribes itself.
270+
*
271+
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
272+
* given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is
273+
* already complete. If no matching cluster state is seen within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is
274+
* completed exceptionally on the scheduler thread that belongs to {@code clusterService}.
275+
*/
266276
public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) {
267277
final var listener = new SubscribableListener<Void>();
268278
final ClusterStateListener clusterStateListener = new ClusterStateListener() {
@@ -291,4 +301,35 @@ public String toString() {
291301
}
292302
return listener;
293303
}
304+
305+
/**
306+
* Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of one of the nodes in the
307+
* {@link ESIntegTestCase#internalCluster()}. When the chosen {@link ClusterService} applies a state that satisfies {@code predicate}
308+
* the listener unsubscribes itself.
309+
*
310+
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
311+
* {@link ClusterService} belonging to one of the nodes in the {@link ESIntegTestCase#internalCluster()}. If the current cluster
312+
* state already matches {@code predicate} then the returned listener is already complete. If no matching cluster state is seen
313+
* within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that
314+
* belongs to the chosen node's {@link ClusterService}.
315+
*/
316+
public static SubscribableListener<Void> addTemporaryStateListener(Predicate<ClusterState> predicate) {
317+
return addTemporaryStateListener(ESIntegTestCase.internalCluster().clusterService(), predicate);
318+
}
319+
320+
/**
321+
* Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of the current elected master node in the
322+
* {@link ESIntegTestCase#internalCluster()}. When this node's {@link ClusterService} applies a state that satisfies {@code predicate}
323+
* the listener unsubscribes itself.
324+
*
325+
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
326+
* {@link ClusterService} belonging to the node that was the elected master node in the
327+
* {@link ESIntegTestCase#internalCluster()} when this method was first called. If the current cluster state already matches
328+
* {@code predicate} then the returned listener is already complete. If no matching cluster state is seen within
329+
* {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that belongs to
330+
* the elected master node's {@link ClusterService}.
331+
*/
332+
public static SubscribableListener<Void> addMasterTemporaryStateListener(Predicate<ClusterState> predicate) {
333+
return addTemporaryStateListener(ESIntegTestCase.internalCluster().getCurrentMasterNodeInstance(ClusterService.class), predicate);
334+
}
294335
}

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -867,8 +867,7 @@ public static List<String> waitForDataStreamBackingIndices(String dataStreamName
867867
public static List<String> waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) {
868868
// We listen to the cluster state on the master node to ensure all other nodes have already acked the new cluster state.
869869
// This avoids inconsistencies in subsequent API calls which might hit a non-master node.
870-
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
871-
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
870+
final var listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
872871
final var dataStream = clusterState.metadata().getProject().dataStreams().get(dataStreamName);
873872
if (dataStream == null) {
874873
return false;

0 commit comments

Comments
 (0)