Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little bit on the fence about backporting these changes or not. These fixes are only necessary on v9.1.0 because the GET data streams API only runs on the local node starting with that version, but I'm thinking about backporting test fixes in the future to versions before v9.1.0. I think I'm leaning towards not backporting, but other thoughts are welcome.

Large diffs are not rendered by default.

9 changes: 0 additions & 9 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,6 @@ tests:
- class: org.elasticsearch.xpack.ilm.DataStreamAndIndexLifecycleMixingTests
method: testUpdateIndexTemplateToDataStreamLifecyclePreference
issue: https://github.com/elastic/elasticsearch/issues/124837
- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT
method: testAutomaticForceMerge
issue: https://github.com/elastic/elasticsearch/issues/124846
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=search.vectors/41_knn_search_bbq_hnsw/Test knn search}
issue: https://github.com/elastic/elasticsearch/issues/124848
Expand All @@ -363,9 +360,6 @@ tests:
- class: org.elasticsearch.packaging.test.BootstrapCheckTests
method: test20RunWithBootstrapChecks
issue: https://github.com/elastic/elasticsearch/issues/124940
- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT
method: testErrorRecordingOnRetention
issue: https://github.com/elastic/elasticsearch/issues/124950
- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT
method: test {lookup-join.MvJoinKeyFromRow SYNC}
issue: https://github.com/elastic/elasticsearch/issues/124951
Expand All @@ -390,9 +384,6 @@ tests:
- class: org.elasticsearch.packaging.test.DockerTests
method: test011SecurityEnabledStatus
issue: https://github.com/elastic/elasticsearch/issues/124990
- class: org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT
method: testLifecycleAppliedToFailureStore
issue: https://github.com/elastic/elasticsearch/issues/124999
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=search/610_function_score/Random}
issue: https://github.com/elastic/elasticsearch/issues/125010
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void onFailure(Exception e) {
);

safeAwait(
taskCreatedListener.<Void>andThen(
taskCreatedListener.<ClusterState>andThen(
(l, v) -> ClusterServiceUtils.addTemporaryStateListener(
masterClusterService,
clusterState -> hasPersistentTask(clusterState) == false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ && switch (shardEntry.getValue().state()) {
resetMockLog();
}

private static SubscribableListener<Void> createSnapshotPausedListener(
private static SubscribableListener<ClusterState> createSnapshotPausedListener(
ClusterService clusterService,
String repoName,
String indexName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -124,7 +125,7 @@ public void testDeleteSnapshotWhenNotWaitingForCompletion() throws Exception {
createSnapshot("test-repo", "test-snapshot", List.of("test-index"));
MockRepository repository = getRepositoryOnMaster("test-repo");
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
SubscribableListener<Void> snapshotDeletionListener = createSnapshotDeletionListener("test-repo");
SubscribableListener<ClusterState> snapshotDeletionListener = createSnapshotDeletionListener("test-repo");
repository.blockOnDataFiles();
try {
clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snapshot")
Expand All @@ -146,7 +147,7 @@ public void testDeleteSnapshotWhenWaitingForCompletion() throws Exception {
createSnapshot("test-repo", "test-snapshot", List.of("test-index"));
MockRepository repository = getRepositoryOnMaster("test-repo");
PlainActionFuture<AcknowledgedResponse> requestCompleteListener = new PlainActionFuture<>();
SubscribableListener<Void> snapshotDeletionListener = createSnapshotDeletionListener("test-repo");
SubscribableListener<ClusterState> snapshotDeletionListener = createSnapshotDeletionListener("test-repo");
repository.blockOnDataFiles();
try {
clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snapshot")
Expand All @@ -168,7 +169,7 @@ public void testDeleteSnapshotWhenWaitingForCompletion() throws Exception {
* @param repositoryName The repository to monitor for deletions
* @return the listener
*/
private SubscribableListener<Void> createSnapshotDeletionListener(String repositoryName) {
private SubscribableListener<ClusterState> createSnapshotDeletionListener(String repositoryName) {
AtomicBoolean deleteHasStarted = new AtomicBoolean(false);
return ClusterServiceUtils.addTemporaryStateListener(
internalCluster().getCurrentMasterNodeInstance(ClusterService.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,7 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
&& e.isClone()
&& e.shardSnapshotStatusByRepoShardId().isEmpty() == false
)
).addListener(l);
).addListener(l.map(cs -> null));
client.admin()
.cluster()
.prepareCloneSnapshot(TEST_REQUEST_TIMEOUT, repoName, originalSnapshotName, cloneName)
Expand All @@ -1401,7 +1401,8 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
testListener = testListener.andThen(l -> scheduleNow(() -> {
// Once all snapshots & clones have started, drop the data node and wait for all snapshot activity to complete
testClusterNodes.disconnectNode(testClusterNodes.randomDataNodeSafe());
ClusterServiceUtils.addTemporaryStateListener(masterClusterService, cs -> SnapshotsInProgress.get(cs).isEmpty()).addListener(l);
ClusterServiceUtils.addTemporaryStateListener(masterClusterService, cs -> SnapshotsInProgress.get(cs).isEmpty())
.addListener(l.map(cs -> null));
}));

deterministicTaskQueue.runAllRunnableTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,24 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta
);
}

public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) {
final var listener = new SubscribableListener<Void>();
public static SubscribableListener<ClusterState> addTemporaryStateListener(
ClusterService clusterService,
Predicate<ClusterState> predicate
) {
final var listener = new SubscribableListener<ClusterState>();
final var initialState = clusterService.state();
if (predicate.test(initialState)) {
listener.onResponse(initialState);
// No need to add the cluster state listener if the predicate already passes.
return listener;
}
final ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
try {
if (predicate.test(event.state())) {
listener.onResponse(null);
final var state = event.state();
if (predicate.test(state)) {
listener.onResponse(state);
}
} catch (Exception e) {
listener.onFailure(e);
Expand All @@ -284,11 +294,7 @@ public String toString() {
};
clusterService.addListener(clusterStateListener);
listener.addListener(ActionListener.running(() -> clusterService.removeListener(clusterStateListener)));
if (predicate.test(clusterService.state())) {
listener.onResponse(null);
} else {
listener.addTimeout(ESTestCase.SAFE_AWAIT_TIMEOUT, clusterService.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
}
listener.addTimeout(ESTestCase.SAFE_AWAIT_TIMEOUT, clusterService.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
return listener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,32 @@ private static Settings.Builder getExcludeSettings(int num, Settings.Builder bui
return builder;
}

/**
* Waits for the specified data stream to have the expected number of backing indices.
*/
public static List<String> waitForDataStreamBackingIndices(String dataStreamName, int expectedSize) {
return waitForDataStreamIndices(dataStreamName, expectedSize, false);
}

/**
* Waits for the specified data stream to have the expected number of backing or failure indices.
*/
public static List<String> waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought, what if instead of expectedSize we would add a predicate here. This would change the nature of this to wait for condition or something, but it might make it more reusable and extend the benefit of avoiding a Thread.sleep in other places too. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought about that too. I decided to not do that (yet) as I didn't see any use cases where that would be relevant. On second thought, I think sections like these could make use of that:

assertBusy(() -> {
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
assertThat(backingIndices.size(), equalTo(1));
// we expect the data stream to have only one backing index, the write one, with generation 2
// as generation 1 would've been deleted by the data stream lifecycle given the configuration
String writeIndex = backingIndices.get(0);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});

However, in that case, it doesn't really make sense to retrieve the backing indices. Then, having a different dedicated method just for data stream predicates feels a little over-specific, so a generic cluster-state predicate one is probably more sensible. However, then we're getting pretty close to the addTemporaryStateListener already (and the awaitClusterState). I would still see value in adding a wrapper in ESIntegTestCase that fetches the ClusterService instance from the master node, to avoid having to do that in every test. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I would say it depends on how much is this going to be used, for example:

  • If it's used only in this file, the helper method could reside in this file.
  • If it's used only in this module, then the helper could be part of a fixture and that all the other tests use. If I am not mistaken we have more methods that could go there.
  • If it's used by many tests scattered all over the place, then I agree that it's worth putting it in ESIntegTestCase

If you agree with the above, where do you think this should reside?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll respond to those questions later, but I'm not sure how they're related to this discussion? I was referring to the implementation of the additional helper method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to merge this for now. There are other similar test suites that need to be updated. When I get to those, I'll automatically see what potential other use cases there are and we can adjust the helper method accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I misunderstood. What I had in mind was that we can share the method that you said would be rather specific:

Then, having a different dedicated method just for data stream predicates feels a little over-specific

Let me try to understand better what you meant with the following:

I would still see value in adding a wrapper in ESIntegTestCase that fetches the ClusterService instance from the master node, to avoid having to do that in every test

You mean to avoid doing this: final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);? Something like:

public static SubscribableListener<Void> addTemporaryStateListener(Predicate<ClusterState> predicate) {
     final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
     return ClusterServiceUtils.addTemporaryStateListener(clusterService, predicate);
}

Not sure, if this is worth it honestly, considering it's only one line of code. Do you think this is too much of the mental load for a developer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we already have org.elasticsearch.test.ESIntegTestCase#clusterService to access the ClusterService of some random node, which is often what you want. But here it's important that it's the cluster service on the currently elected master node, since we need to be sure we've applied the state everywhere else before proceeding.

Not that I'm against a utility for this somewhere in the test framework (suggest ClusterServiceUtils over ESIntegTestCase). I see other places where it would be useful. Just that I think it should mention masterNode somewhere in its name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #125648

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, David! My only reason for

I would still see value in adding a wrapper in ESIntegTestCase that fetches the ClusterService instance from the master node, to avoid having to do that in every test.

was based on my assumption that ESIntegTestCase#internalCluster was not publicly available. Your PR is exactly what I had in mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gmarouli, FTR, I agree that the mental load is minimal. I have a personal vendetta with boilerplate/duplicate code. In general, I'm also a fan of reducing test method sizes, as I feel a lot of our tests are hard to read, so any line reduction is a win in my eyes.

final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

David Turner suggested using a temporary cluster state listener instead of an assertBusy that uses the GET data streams transport action. This has improved performance as we can react to new cluster state updates immediately instead of waiting for the exponential backoff of assertBusy. The only "drawback" is that an assertBusy would implicitly cover the GET data streams transport action. I don't think we should rely on tests like these to cover that action - i.e. we should have dedicated tests that sufficiently cover the action, and I think we do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not add the override that returns the cluster state like this. Instead, just await for the cluster to reach a suitable state and then use the regular APIs and transport actions to query what you need. That way you get the little bit of extra coverage from calling the transport action, and you also get some confidence that the state you were waiting for doesn't immediately vanish in the next update since the transport action may see a future state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, that reduces the performance benefit because we'll have to do another request, but only by a little bit I think. I agree that it does bring back some extra coverage. I'll revert the changes to addTemporaryStateListener and use the API to fetch the backing indices.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance benefits are all about avoiding the Thread.sleep in the assertBusy call, oversleeping past the point where the cluster state is ready for you. The cost of the transport action should be very low.

final var dataStream = clusterState.metadata().getProject().dataStreams().get(dataStreamName);
if (dataStream == null) {
return false;
}
return dataStream.getDataStreamIndices(failureStore).getIndices().size() == expectedSize;
});
final var state = safeAwait(listener);
// We will only reach the return statement when the data stream exists (and has the expected number of indices),
// so we can safely retrieve the data stream without worrying about NPEs.
final var indices = state.metadata().getProject().dataStreams().get(dataStreamName).getDataStreamIndices(failureStore).getIndices();
return indices.stream().map(Index::getName).toList();
}

/**
* Returns a list of the data stream's backing index names.
*/
Expand Down
Loading