Skip to content

Commit 8fd96df

Browse files
committed
Fix data stream retrieval in DataStreamLifecycleServiceIT
These tests had the potential to fail when two consecutive GET data streams requests would hit two different nodes, where one node already had the cluster state that contained the new backing index and the other node didn't yet. Caused by #122852 Fixes #124846 Fixes #124950 Fixes #124999
1 parent ae16016 commit 8fd96df

File tree

8 files changed

+122
-277
lines changed

8 files changed

+122
-277
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 72 additions & 252 deletions
Large diffs are not rendered by default.

muted-tests.yml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -339,9 +339,6 @@ tests:
339339
- class: org.elasticsearch.xpack.ilm.DataStreamAndIndexLifecycleMixingTests
340340
method: testUpdateIndexTemplateToDataStreamLifecyclePreference
341341
issue: https://github.com/elastic/elasticsearch/issues/124837
342-
- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT
343-
method: testAutomaticForceMerge
344-
issue: https://github.com/elastic/elasticsearch/issues/124846
345342
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
346343
method: test {p0=search.vectors/41_knn_search_bbq_hnsw/Test knn search}
347344
issue: https://github.com/elastic/elasticsearch/issues/124848
@@ -363,9 +360,6 @@ tests:
363360
- class: org.elasticsearch.packaging.test.BootstrapCheckTests
364361
method: test20RunWithBootstrapChecks
365362
issue: https://github.com/elastic/elasticsearch/issues/124940
366-
- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT
367-
method: testErrorRecordingOnRetention
368-
issue: https://github.com/elastic/elasticsearch/issues/124950
369363
- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT
370364
method: test {lookup-join.MvJoinKeyFromRow SYNC}
371365
issue: https://github.com/elastic/elasticsearch/issues/124951
@@ -390,9 +384,6 @@ tests:
390384
- class: org.elasticsearch.packaging.test.DockerTests
391385
method: test011SecurityEnabledStatus
392386
issue: https://github.com/elastic/elasticsearch/issues/124990
393-
- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT
394-
method: testLifecycleAppliedToFailureStore
395-
issue: https://github.com/elastic/elasticsearch/issues/124999
396387
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
397388
method: test {p0=search/610_function_score/Random}
398389
issue: https://github.com/elastic/elasticsearch/issues/125010

server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void onFailure(Exception e) {
121121
);
122122

123123
safeAwait(
124-
taskCreatedListener.<Void>andThen(
124+
taskCreatedListener.<ClusterState>andThen(
125125
(l, v) -> ClusterServiceUtils.addTemporaryStateListener(
126126
masterClusterService,
127127
clusterState -> hasPersistentTask(clusterState) == false

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,7 @@ && switch (shardEntry.getValue().state()) {
657657
resetMockLog();
658658
}
659659

660-
private static SubscribableListener<Void> createSnapshotPausedListener(
660+
private static SubscribableListener<ClusterState> createSnapshotPausedListener(
661661
ClusterService clusterService,
662662
String repoName,
663663
String indexName,

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.support.PlainActionFuture;
1515
import org.elasticsearch.action.support.SubscribableListener;
1616
import org.elasticsearch.action.support.master.AcknowledgedResponse;
17+
import org.elasticsearch.cluster.ClusterState;
1718
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
1819
import org.elasticsearch.cluster.metadata.IndexMetadata;
1920
import org.elasticsearch.cluster.service.ClusterService;
@@ -124,7 +125,7 @@ public void testDeleteSnapshotWhenNotWaitingForCompletion() throws Exception {
124125
createSnapshot("test-repo", "test-snapshot", List.of("test-index"));
125126
MockRepository repository = getRepositoryOnMaster("test-repo");
126127
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
127-
SubscribableListener<Void> snapshotDeletionListener = createSnapshotDeletionListener("test-repo");
128+
SubscribableListener<ClusterState> snapshotDeletionListener = createSnapshotDeletionListener("test-repo");
128129
repository.blockOnDataFiles();
129130
try {
130131
clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snapshot")
@@ -146,7 +147,7 @@ public void testDeleteSnapshotWhenWaitingForCompletion() throws Exception {
146147
createSnapshot("test-repo", "test-snapshot", List.of("test-index"));
147148
MockRepository repository = getRepositoryOnMaster("test-repo");
148149
PlainActionFuture<AcknowledgedResponse> requestCompleteListener = new PlainActionFuture<>();
149-
SubscribableListener<Void> snapshotDeletionListener = createSnapshotDeletionListener("test-repo");
150+
SubscribableListener<ClusterState> snapshotDeletionListener = createSnapshotDeletionListener("test-repo");
150151
repository.blockOnDataFiles();
151152
try {
152153
clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snapshot")
@@ -168,7 +169,7 @@ public void testDeleteSnapshotWhenWaitingForCompletion() throws Exception {
168169
* @param repositoryName The repository to monitor for deletions
169170
* @return the listener
170171
*/
171-
private SubscribableListener<Void> createSnapshotDeletionListener(String repositoryName) {
172+
private SubscribableListener<ClusterState> createSnapshotDeletionListener(String repositoryName) {
172173
AtomicBoolean deleteHasStarted = new AtomicBoolean(false);
173174
return ClusterServiceUtils.addTemporaryStateListener(
174175
internalCluster().getCurrentMasterNodeInstance(ClusterService.class),

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,7 +1388,7 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
13881388
&& e.isClone()
13891389
&& e.shardSnapshotStatusByRepoShardId().isEmpty() == false
13901390
)
1391-
).addListener(l);
1391+
).addListener(l.map(cs -> null));
13921392
client.admin()
13931393
.cluster()
13941394
.prepareCloneSnapshot(TEST_REQUEST_TIMEOUT, repoName, originalSnapshotName, cloneName)
@@ -1401,7 +1401,8 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
14011401
testListener = testListener.andThen(l -> scheduleNow(() -> {
14021402
// Once all snapshots & clones have started, drop the data node and wait for all snapshot activity to complete
14031403
testClusterNodes.disconnectNode(testClusterNodes.randomDataNodeSafe());
1404-
ClusterServiceUtils.addTemporaryStateListener(masterClusterService, cs -> SnapshotsInProgress.get(cs).isEmpty()).addListener(l);
1404+
ClusterServiceUtils.addTemporaryStateListener(masterClusterService, cs -> SnapshotsInProgress.get(cs).isEmpty())
1405+
.addListener(l.map(cs -> null));
14051406
}));
14061407

14071408
deterministicTaskQueue.runAllRunnableTasks();

test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -263,14 +263,24 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta
263263
);
264264
}
265265

266-
public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) {
267-
final var listener = new SubscribableListener<Void>();
266+
public static SubscribableListener<ClusterState> addTemporaryStateListener(
267+
ClusterService clusterService,
268+
Predicate<ClusterState> predicate
269+
) {
270+
final var listener = new SubscribableListener<ClusterState>();
271+
final var initialState = clusterService.state();
272+
if (predicate.test(initialState)) {
273+
listener.onResponse(initialState);
274+
// No need to add the cluster state listener if the predicate already passes.
275+
return listener;
276+
}
268277
final ClusterStateListener clusterStateListener = new ClusterStateListener() {
269278
@Override
270279
public void clusterChanged(ClusterChangedEvent event) {
271280
try {
272-
if (predicate.test(event.state())) {
273-
listener.onResponse(null);
281+
final var state = event.state();
282+
if (predicate.test(state)) {
283+
listener.onResponse(state);
274284
}
275285
} catch (Exception e) {
276286
listener.onFailure(e);
@@ -284,11 +294,7 @@ public String toString() {
284294
};
285295
clusterService.addListener(clusterStateListener);
286296
listener.addListener(ActionListener.running(() -> clusterService.removeListener(clusterStateListener)));
287-
if (predicate.test(clusterService.state())) {
288-
listener.onResponse(null);
289-
} else {
290-
listener.addTimeout(ESTestCase.SAFE_AWAIT_TIMEOUT, clusterService.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
291-
}
297+
listener.addTimeout(ESTestCase.SAFE_AWAIT_TIMEOUT, clusterService.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
292298
return listener;
293299
}
294300
}

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,32 @@ private static Settings.Builder getExcludeSettings(int num, Settings.Builder bui
854854
return builder;
855855
}
856856

857+
/**
858+
* Waits for the specified data stream to have the expected number of backing indices.
859+
*/
860+
public static List<String> waitForDataStreamBackingIndices(String dataStreamName, int expectedSize) {
861+
return waitForDataStreamIndices(dataStreamName, expectedSize, false);
862+
}
863+
864+
/**
865+
* Waits for the specified data stream to have the expected number of backing or failure indices.
866+
*/
867+
public static List<String> waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) {
868+
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
869+
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
870+
final var dataStream = clusterState.metadata().getProject().dataStreams().get(dataStreamName);
871+
if (dataStream == null) {
872+
return false;
873+
}
874+
return dataStream.getDataStreamIndices(failureStore).getIndices().size() == expectedSize;
875+
});
876+
final var state = safeAwait(listener);
877+
// We will only reach the return statement when the data stream exists (and has the expected number of indices),
878+
// so we can safely retrieve the data stream without worrying about NPEs.
879+
final var indices = state.metadata().getProject().dataStreams().get(dataStreamName).getDataStreamIndices(failureStore).getIndices();
880+
return indices.stream().map(Index::getName).toList();
881+
}
882+
857883
/**
858884
* Returns a list of the data stream's backing index names.
859885
*/

0 commit comments

Comments
 (0)