Skip to content

Conversation

@nielsbauman
Copy link
Contributor

These tests had the potential to fail when two consecutive GET data streams requests would hit two different nodes, where one node already had the cluster state that contained the new backing index and the other node didn't yet.

Caused by #122852

Fixes #124846
Fixes #124950
Fixes #124999

These tests had the potential to fail when two consecutive GET data
streams requests would hit two different nodes, where one node already
had the cluster state that contained the new backing index and the other
node didn't yet.

Caused by elastic#122852

Fixes elastic#124846
Fixes elastic#124950
Fixes elastic#124999
@nielsbauman nielsbauman added >test Issues or PRs that are addressing/adding tests :Data Management/Data streams Data streams and their lifecycles Team:Data Management Meta label for data/management team v9.1.0 labels Mar 19, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little bit on the fence about backporting these changes or not. These fixes are only necessary on v9.1.0 because the GET data streams API only runs on the local node starting with that version, but I'm thinking about backporting test fixes in the future to versions before v9.1.0. I think I'm leaning towards not backporting, but other thoughts are welcome.

*/
public static List<String> waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) {
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

David Turner suggested using a temporary cluster state listener instead of an assertBusy that uses the GET data streams transport action. This has improved performance as we can react to new cluster state updates immediately instead of waiting for the exponential backoff of assertBusy. The only "drawback" is that an assertBusy would implicitly cover the GET data streams transport action. I don't think we should rely on tests like these to cover that action - i.e. we should have dedicated tests that sufficiently cover the action, and I think we do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not add the override that returns the cluster state like this. Instead, just await for the cluster to reach a suitable state and then use the regular APIs and transport actions to query what you need. That way you get the little bit of extra coverage from calling the transport action, and you also get some confidence that the state you were waiting for doesn't immediately vanish in the next update since the transport action may see a future state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, that reduces the performance benefit because we'll have to do another request, but only by a little bit I think. I agree that it does bring back some extra coverage. I'll revert the changes to addTemporaryStateListener and use the API to fetch the backing indices.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance benefits are all about avoiding the Thread.sleep in the assertBusy call, oversleeping past the point where the cluster state is ready for you. The cost of the transport action should be very low.

*/
public static List<String> waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) {
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not add the override that returns the cluster state like this. Instead, just await for the cluster to reach a suitable state and then use the regular APIs and transport actions to query what you need. That way you get the little bit of extra coverage from calling the transport action, and you also get some confidence that the state you were waiting for doesn't immediately vanish in the next update since the transport action may see a future state.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General pattern of changes looks good but I'll leave a detailed review to someone else if that's ok

});
safeAwait(listener);
final var backingIndexNames = getDataStreamBackingIndexNames(dataStreamName, failureStore);
assertEquals("Retrieved number of data stream indices doesn't match expectation", expectedSize, backingIndexNames.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest mentioning the data stream name in the message, and also using assertThat("...", backingIndexNames, hasSize(expectedSize)) so you get to see more detail if it fails

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add the data stream name in the message, but I don't think the assertion mechanism changes much:
current implementation gives

Retrieved number of data stream indices doesn't match expectation
Expected :1
Actual   :2
<Click to see difference>

java.lang.AssertionError: Retrieved number of data stream indices doesn't match expectation expected:<1> but was:<2>
	at __randomizedtesting.SeedInfo.seed([779A73C770529FF8:DDD333CAFBC6DD08]:0)
	at org.junit.Assert.fail(Assert.java:89)
	at org.junit.Assert.failNotEquals(Assert.java:835)
	at org.junit.Assert.assertEquals(Assert.java:647)
	at org.elasticsearch.test.ESIntegTestCase.waitForDataStreamIndices(ESIntegTestCase.java:880)
	at org.elasticsearch.test.ESIntegTestCase.waitForDataStreamBackingIndices(ESIntegTestCase.java:861)
	at org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT.testErrorRecordingOnRetention(DataStreamLifecycleServiceIT.java:643)

Your suggestion gives

Retrieved number of data stream indices doesn't match expectation
Expected: a collection with size <1>
     but: collection size was <2>
java.lang.AssertionError: Retrieved number of data stream indices doesn't match expectation
Expected: a collection with size <1>
     but: collection size was <2>
	at __randomizedtesting.SeedInfo.seed([779A73C770529FF8:DDD333CAFBC6DD08]:0)
	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
	at org.elasticsearch.test.ESTestCase.assertThat(ESTestCase.java:2674)
	at org.elasticsearch.test.ESIntegTestCase.waitForDataStreamIndices(ESIntegTestCase.java:881)
	at org.elasticsearch.test.ESIntegTestCase.waitForDataStreamBackingIndices(ESIntegTestCase.java:862)
	at org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT.testErrorRecordingOnRetention(DataStreamLifecycleServiceIT.java:643)

I'll add the retrieved index names to the message/reason, that should probably help debugging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh hm that's disappointing, I thought hasSize reported the contents of the collection too

Copy link
Contributor

@gmarouli gmarouli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I added some comments but this is already an improvement :) . Thanks for fixing it.

assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(SYSTEM_DATA_STREAM_NAME));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(1)); // global retention is ignored
List<Index> currentBackingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use getDataStreamBackingIndexNames(SYSTEM_DATA_STREAM_NAME) here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're serializing the getDataStreamResponse to XContent a few lines below, so I preferred this over doing two API calls.

break;
}
}
DataStreamLifecycleService lifecycleService = internalCluster().getCurrentMasterNodeInstance(DataStreamLifecycleService.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

/**
* Waits for the specified data stream to have the expected number of backing or failure indices.
*/
public static List<String> waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought, what if instead of expectedSize we would add a predicate here. This would change the nature of this to wait for condition or something, but it might make it more reusable and extend the benefit of avoiding a Thread.sleep in other places too. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought about that too. I decided to not do that (yet) as I didn't see any use cases where that would be relevant. On second thought, I think sections like these could make use of that:

assertBusy(() -> {
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
assertThat(backingIndices.size(), equalTo(1));
// we expect the data stream to have only one backing index, the write one, with generation 2
// as generation 1 would've been deleted by the data stream lifecycle given the configuration
String writeIndex = backingIndices.get(0);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});

However, in that case, it doesn't really make sense to retrieve the backing indices. Then, having a different dedicated method just for data stream predicates feels a little over-specific, so a generic cluster-state predicate one is probably more sensible. However, then we're getting pretty close to the addTemporaryStateListener already (and the awaitClusterState). I would still see value in adding a wrapper in ESIntegTestCase that fetches the ClusterService instance from the master node, to avoid having to do that in every test. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I would say it depends on how much is this going to be used, for example:

  • If it's used only in this file, the helper method could reside in this file.
  • If it's used only in this module, then the helper could be part of a fixture and that all the other tests use. If I am not mistaken we have more methods that could go there.
  • If it's used by many tests scattered all over the place, then I agree that it's worth putting it in ESIntegTestCase

If you agree with the above, where do you think this should reside?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll respond to those questions later, but I'm not sure how they're related to this discussion? I was referring to the implementation of the additional helper method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to merge this for now. There are other similar test suites that need to be updated. When I get to those, I'll automatically see what potential other use cases there are and we can adjust the helper method accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I misunderstood. What I had in mind was that we can share the method that you said would be rather specific:

Then, having a different dedicated method just for data stream predicates feels a little over-specific

Let me try to understand better what you meant with the following:

I would still see value in adding a wrapper in ESIntegTestCase that fetches the ClusterService instance from the master node, to avoid having to do that in every test

You mean to avoid doing this: final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);? Something like:

public static SubscribableListener<Void> addTemporaryStateListener(Predicate<ClusterState> predicate) {
     final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
     return ClusterServiceUtils.addTemporaryStateListener(clusterService, predicate);
}

Not sure, if this is worth it honestly, considering it's only one line of code. Do you think this is too much of the mental load for a developer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we already have org.elasticsearch.test.ESIntegTestCase#clusterService to access the ClusterService of some random node, which is often what you want. But here it's important that it's the cluster service on the currently elected master node, since we need to be sure we've applied the state everywhere else before proceeding.

Not that I'm against a utility for this somewhere in the test framework (suggest ClusterServiceUtils over ESIntegTestCase). I see other places where it would be useful. Just that I think it should mention masterNode somewhere in its name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #125648

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, David! My only reason for

I would still see value in adding a wrapper in ESIntegTestCase that fetches the ClusterService instance from the master node, to avoid having to do that in every test.

was based on my assumption that ESIntegTestCase#internalCluster was not publicly available. Your PR is exactly what I had in mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gmarouli, FTR, I agree that the mental load is minimal. I have a personal vendetta with boilerplate/duplicate code. In general, I'm also a fan of reducing test method sizes, as I feel a lot of our tests are hard to read, so any line reduction is a win in my eyes.

@nielsbauman nielsbauman enabled auto-merge (squash) March 24, 2025 14:12
@nielsbauman nielsbauman merged commit 542a3b6 into elastic:main Mar 24, 2025
17 of 18 checks passed
@nielsbauman nielsbauman deleted the fix-data-stream-lifecycle-service-it branch March 24, 2025 15:43
omricohenn pushed a commit to omricohenn/elasticsearch that referenced this pull request Mar 28, 2025
…125195)

These tests had the potential to fail when two consecutive GET data
streams requests would hit two different nodes, where one node already
had the cluster state that contained the new backing index and the other
node didn't yet.

Caused by elastic#122852

Fixes elastic#124846
Fixes elastic#124950
Fixes elastic#124999
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Data Management/Data streams Data streams and their lifecycles Team:Data Management Meta label for data/management team >test Issues or PRs that are addressing/adding tests v9.1.0

Projects

None yet

4 participants