Skip to content

Commit 73f2aef

Browse files
committed
Fetch backing indices using transport action
1 parent 8fd96df commit 73f2aef

File tree

6 files changed

+28
-26
lines changed

6 files changed

+28
-26
lines changed

server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void onFailure(Exception e) {
121121
);
122122

123123
safeAwait(
124-
taskCreatedListener.<ClusterState>andThen(
124+
taskCreatedListener.<Void>andThen(
125125
(l, v) -> ClusterServiceUtils.addTemporaryStateListener(
126126
masterClusterService,
127127
clusterState -> hasPersistentTask(clusterState) == false

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,7 @@ && switch (shardEntry.getValue().state()) {
657657
resetMockLog();
658658
}
659659

660-
private static SubscribableListener<ClusterState> createSnapshotPausedListener(
660+
private static SubscribableListener<Void> createSnapshotPausedListener(
661661
ClusterService clusterService,
662662
String repoName,
663663
String indexName,

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void testDeleteSnapshotWhenNotWaitingForCompletion() throws Exception {
125125
createSnapshot("test-repo", "test-snapshot", List.of("test-index"));
126126
MockRepository repository = getRepositoryOnMaster("test-repo");
127127
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
128-
SubscribableListener<ClusterState> snapshotDeletionListener = createSnapshotDeletionListener("test-repo");
128+
SubscribableListener<Void> snapshotDeletionListener = createSnapshotDeletionListener("test-repo");
129129
repository.blockOnDataFiles();
130130
try {
131131
clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snapshot")
@@ -147,7 +147,7 @@ public void testDeleteSnapshotWhenWaitingForCompletion() throws Exception {
147147
createSnapshot("test-repo", "test-snapshot", List.of("test-index"));
148148
MockRepository repository = getRepositoryOnMaster("test-repo");
149149
PlainActionFuture<AcknowledgedResponse> requestCompleteListener = new PlainActionFuture<>();
150-
SubscribableListener<ClusterState> snapshotDeletionListener = createSnapshotDeletionListener("test-repo");
150+
SubscribableListener<Void> snapshotDeletionListener = createSnapshotDeletionListener("test-repo");
151151
repository.blockOnDataFiles();
152152
try {
153153
clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snapshot")
@@ -169,7 +169,7 @@ public void testDeleteSnapshotWhenWaitingForCompletion() throws Exception {
169169
* @param repositoryName The repository to monitor for deletions
170170
* @return the listener
171171
*/
172-
private SubscribableListener<ClusterState> createSnapshotDeletionListener(String repositoryName) {
172+
private SubscribableListener<Void> createSnapshotDeletionListener(String repositoryName) {
173173
AtomicBoolean deleteHasStarted = new AtomicBoolean(false);
174174
return ClusterServiceUtils.addTemporaryStateListener(
175175
internalCluster().getCurrentMasterNodeInstance(ClusterService.class),

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,7 +1388,7 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
13881388
&& e.isClone()
13891389
&& e.shardSnapshotStatusByRepoShardId().isEmpty() == false
13901390
)
1391-
).addListener(l.map(cs -> null));
1391+
).addListener(l);
13921392
client.admin()
13931393
.cluster()
13941394
.prepareCloneSnapshot(TEST_REQUEST_TIMEOUT, repoName, originalSnapshotName, cloneName)
@@ -1401,8 +1401,7 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
14011401
testListener = testListener.andThen(l -> scheduleNow(() -> {
14021402
// Once all snapshots & clones have started, drop the data node and wait for all snapshot activity to complete
14031403
testClusterNodes.disconnectNode(testClusterNodes.randomDataNodeSafe());
1404-
ClusterServiceUtils.addTemporaryStateListener(masterClusterService, cs -> SnapshotsInProgress.get(cs).isEmpty())
1405-
.addListener(l.map(cs -> null));
1404+
ClusterServiceUtils.addTemporaryStateListener(masterClusterService, cs -> SnapshotsInProgress.get(cs).isEmpty()).addListener(l);
14061405
}));
14071406

14081407
deterministicTaskQueue.runAllRunnableTasks();

test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -263,24 +263,19 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta
263263
);
264264
}
265265

266-
public static SubscribableListener<ClusterState> addTemporaryStateListener(
267-
ClusterService clusterService,
268-
Predicate<ClusterState> predicate
269-
) {
270-
final var listener = new SubscribableListener<ClusterState>();
271-
final var initialState = clusterService.state();
272-
if (predicate.test(initialState)) {
273-
listener.onResponse(initialState);
266+
public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) {
267+
final var listener = new SubscribableListener<Void>();
268+
if (predicate.test(clusterService.state())) {
269+
listener.onResponse(null);
274270
// No need to add the cluster state listener if the predicate already passes.
275271
return listener;
276272
}
277273
final ClusterStateListener clusterStateListener = new ClusterStateListener() {
278274
@Override
279275
public void clusterChanged(ClusterChangedEvent event) {
280276
try {
281-
final var state = event.state();
282-
if (predicate.test(state)) {
283-
listener.onResponse(state);
277+
if (predicate.test(event.state())) {
278+
listener.onResponse(null);
284279
}
285280
} catch (Exception e) {
286281
listener.onFailure(e);

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,8 @@ public static List<String> waitForDataStreamBackingIndices(String dataStreamName
865865
* Waits for the specified data stream to have the expected number of backing or failure indices.
866866
*/
867867
public static List<String> waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) {
868+
// We listen to the cluster state on the master node to ensure all other nodes have already acked the new cluster state.
869+
// This avoids inconsistencies in subsequent API calls which might hit a non-master node.
868870
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
869871
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
870872
final var dataStream = clusterState.metadata().getProject().dataStreams().get(dataStreamName);
@@ -873,17 +875,23 @@ public static List<String> waitForDataStreamIndices(String dataStreamName, int e
873875
}
874876
return dataStream.getDataStreamIndices(failureStore).getIndices().size() == expectedSize;
875877
});
876-
final var state = safeAwait(listener);
877-
// We will only reach the return statement when the data stream exists (and has the expected number of indices),
878-
// so we can safely retrieve the data stream without worrying about NPEs.
879-
final var indices = state.metadata().getProject().dataStreams().get(dataStreamName).getDataStreamIndices(failureStore).getIndices();
880-
return indices.stream().map(Index::getName).toList();
878+
safeAwait(listener);
879+
final var backingIndexNames = getDataStreamBackingIndexNames(dataStreamName, failureStore);
880+
assertEquals("Retrieved number of data stream indices doesn't match expectation", expectedSize, backingIndexNames.size());
881+
return backingIndexNames;
881882
}
882883

883884
/**
884885
* Returns a list of the data stream's backing index names.
885886
*/
886-
public List<String> getDataStreamBackingIndexNames(String dataStreamName) {
887+
public static List<String> getDataStreamBackingIndexNames(String dataStreamName) {
888+
return getDataStreamBackingIndexNames(dataStreamName, false);
889+
}
890+
891+
/**
892+
* Returns a list of the data stream's backing or failure index names.
893+
*/
894+
public static List<String> getDataStreamBackingIndexNames(String dataStreamName, boolean failureStore) {
887895
GetDataStreamAction.Response response = safeGet(
888896
client().execute(
889897
GetDataStreamAction.INSTANCE,
@@ -893,7 +901,7 @@ public List<String> getDataStreamBackingIndexNames(String dataStreamName) {
893901
assertThat(response.getDataStreams().size(), equalTo(1));
894902
DataStream dataStream = response.getDataStreams().getFirst().getDataStream();
895903
assertThat(dataStream.getName(), equalTo(dataStreamName));
896-
return dataStream.getIndices().stream().map(Index::getName).toList();
904+
return dataStream.getDataStreamIndices(failureStore).getIndices().stream().map(Index::getName).toList();
897905
}
898906

899907
/**

0 commit comments

Comments
 (0)