-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Fix and add a test for failure store with Incremental bulk #115866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Pinging @elastic/es-distributed (Team:Distributed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if someone from the data management team confirms this is the expected behavior and what they were looking to test.
int docs_in_fs = 0; | ||
for (int i = (int) hits.get(); i < bulkResponse.getItems().length; ++i) { | ||
BulkItemResponse item = bulkResponse.getItems()[i]; | ||
if (item.isFailed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just so I understand this correctly:
If an indexing operation fails and there is a failure store configured will then try to store that failure in the failure store. If that operation succeeds, then the indexing operations is indicated to the user as "successful"?
As long as that is the expected behavior this mitigation looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes your understanding is correct @Tim-Brooks. If we successfully index into the failure store, the indexing operation is considered successful, even though the operation failed to index into the original index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ankikuma & @Tim-Brooks ,
As long as that is the expected behavior this mitigation looks good to
I do not think this is expected behaviour.
I am afraid I mislead @ankikuma during our last chat.
If we successfully index into the failure store, the indexing operation is considered successful, even though the operation failed to index into the original index.
This is correct, and it explains the behaviour that the code exhibits. But if I remember correctly the conversations we looked into, the purpose of the failure store is to store failures as a result from a user misconfiguration and not technical limitations/failures.
Considering this, I would expect that this type of failure should not be redirected to the failure store and it should result in a failed response.
As a way forward, I see two options depending on the scope of this work:
- If the purpose of this test is to check that incremental indexing and failure store work as expected, I would say that we need to fix the bug that this test has unearthed.
- If the purpose is to only add a test to cover the failure store and incremental bulk indexing working together, we should write the test to work as expected and then open a bug and mute this test.
Preferably, I would prefer the first but I do not fully know the scope of this work, so I would like to offer an alternative as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gmarouli The scope of this test was to just test the interaction of incremental bulk failures caused by the short circuit feature with failure store, based on the comment from @nielsbauman here. And in this original comment , it looks like we expected the short circuited requests to got to the failure store.
However, as this quote of James Baiera indicated that if failures occurred due to resource constraints, they should not go to the failure store:
I think if we reject requests due to resource constraints that’s ok, since the failure store is not meant to be a dead letter queue - it’s a best effort storage location for documents that cannot be ingested because there is some kind of fault in their shape or content.
For instance, if the failure store index on a data stream is not allocated, we simply reject the document, nothing to be done. If there’s no memory to execute a write, or if there is no thread capacity, there’s nothing we can do
Now a short circuit failure is triggered due to a previous failure. It just so happens that in this test we simulate that failure based on indexing pressure. I am not sure how one would distinguish between failures caused by resource constraints vs. other types of failures (for the benefit of the failure store).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am not mistaken the exception thrown is EsRejectedExecutionException
right? In this case we can extend the conditions at the point where we determine if a request should be redirected to the failure store or not. See:
elasticsearch/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java
Lines 544 to 546 in e0a4584
if (isFailureStoreRequest == false | |
&& failureStoreCandidate.isFailureStoreEnabled() | |
&& error instanceof VersionConflictEngineException == false) { |
If you agree I think it's worth the effort because right now the assertions are much more complex than they need to be. I could also give it a go if you want and we can ask @jbaiera to review. Would you feel more comfortable with that approach?
} | ||
|
||
int docs_redirected_to_fs = 0; | ||
int docs_in_fs = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java variables should be camel case. docsInFS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. Done.
assertTrue(bulkResponse.getItems()[i].getFailureStoreStatus().getLabel().equalsIgnoreCase("NOT_APPLICABLE_OR_UNKNOWN")); | ||
} | ||
|
||
int docs_redirected_to_fs = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java variables should be camel case. docsRedirectedToFs
Pinging @elastic/es-data-management (Team:Data Management) |
@gmarouli @nielsbauman Could you please take a look at this test. This test is added as a follow up to the discussion here. The test behavior is as follows:
Based on the slack thread linked above, we don't want to use the failure store to store docs that were rejected due to a resource level failure. But my testcase shows that we do. So perhaps we need a follow up change to the failure store code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ankikuma I hold off the review until I understand better the scope of this PR. But it looks good in general :).
Thank you for including the failure store in your test suite!
private String dataStream = "data-stream-incremental"; | ||
private String template = "template-incremental"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we convert them these to constants? They look like they are.
int docs_in_fs = 0; | ||
for (int i = (int) hits.get(); i < bulkResponse.getItems().length; ++i) { | ||
BulkItemResponse item = bulkResponse.getItems()[i]; | ||
if (item.isFailed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am not mistaken the exception thrown is EsRejectedExecutionException
right? In this case we can extend the conditions at the point where we determine if a request should be redirected to the failure store or not. See:
elasticsearch/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java
Lines 544 to 546 in e0a4584
if (isFailureStoreRequest == false | |
&& failureStoreCandidate.isFailureStoreEnabled() | |
&& error instanceof VersionConflictEngineException == false) { |
If you agree I think it's worth the effort because right now the assertions are much more complex than they need to be. I could also give it a go if you want and we can ask @jbaiera to review. Would you feel more comfortable with that approach?
Thank you Mary for offering to try out a fix. I can incorporate your changes into my PR. |
Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ankikuma , as I started working on changing the test to match the fix, I noticed a few things that were confusing me a bit. I added an explanation for every change I performed that was not related with the actual fix.
This is still your PR so if you find that something does not suit you feel free to revert or improve.
} | ||
|
||
public void testShortCircuitFailure() throws Exception { | ||
createDataStreamWithFailureStore(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ankikuma , I thought this test is pretty targeted to failure store so we could merge the template and data stream creation in one method.
try (IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest()) { | ||
|
||
AtomicBoolean nextRequested = new AtomicBoolean(true); | ||
int successfullyStored = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ankikuma I renamed this to successfullyStored
because hits
confused me since I have associated it with search results. I also did not see the need to use an atomic.
assertDataStreamMetric(metrics, FailureStoreMetrics.METRIC_REJECTED, DATA_STREAM_NAME, 0); | ||
|
||
// Introduce artificial pressure that will reject the following requests | ||
String node = findNodeOfPrimaryShard(DATA_STREAM_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ankikuma I abstracted finding the node of the primary shard in one method because I thought the retrieval did not add much to the flow of the test.
private void assertDataStreamMetric(Map<String, List<Measurement>> metrics, String metric, String dataStreamName, int expectedValue) { | ||
List<Measurement> measurements = metrics.get(metric); | ||
assertThat(measurements, notNullValue()); | ||
long totalValue = measurements.stream() | ||
.filter(m -> m.attributes().get("data_stream").equals(dataStreamName)) | ||
.mapToLong(Measurement::getLong) | ||
.sum(); | ||
assertThat(totalValue, equalTo((long) expectedValue)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ankikuma I cleaned up the measurements assertion here a bit, not all of them were used and I found this structure a bit easier to follow.
The way I understand it, we provide the observed telemetry, the metric and the data streams we are interested in and the expected value, then it will sum the values up and verify.
The previous one was also correct, but I find this a bit more intuitive while before I got a bit lost why do we need the size of the list ;).
I am also using at every point when we check the measurements, even when the measurements would be empty because I also find the symmetry easier to read.
@elasticmachine update branch |
@elasticmachine update branch |
@jbaiera could you please review the test and fix in this PR to see if it is aligned with the expected behavior of the failure store. Thank you Mary for making the changes! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code changes and line of thinking LGTM.
✅ assuming green CI
…15866) When a document is rejected because of indexing pressure, it should not be redirected to the failure store. The failure store is not meant to be a dead letter queue - it’s a best effort storage location for documents that cannot be ingested because there is some kind of fault in their shape or content, this way a user can fix them. In the case of indexing pressure there is nothing wrong with the document itself. In this PR we fix the redirection to the failure store and we add an integration test to test the interaction of the failure store and incremental bulk's short circuit failure feature. Closes ES-9577. Co-authored-by: gmarouli <[email protected]>
…15866) When a document is rejected because of indexing pressure, it should not be redirected to the failure store. The failure store is not meant to be a dead letter queue - it’s a best effort storage location for documents that cannot be ingested because there is some kind of fault in their shape or content, this way a user can fix them. In the case of indexing pressure there is nothing wrong with the document itself. In this PR we fix the redirection to the failure store and we add an integration test to test the interaction of the failure store and incremental bulk's short circuit failure feature. Closes ES-9577. Co-authored-by: gmarouli <[email protected]>
When a document is rejected because of indexing pressure, it should not be redirected to the failure store.
The failure store is not meant to be a dead letter queue - it’s a best effort storage location for documents that cannot be ingested because there is some kind of fault in their shape or content, this way a user can fix them.
In the case of indexing pressure there is nothing wrong with the document itself. In this PR we fix the redirection to the failure store and we add an integration test to test the interaction of the failure store and incremental bulk's short circuit failure feature.
Closes ES-9577.