Skip to content

Commit a43ffd5

Browse files
authored
Do not send version conflicts to failure store (#112537)
When indexing to a data stream with a failure store it's possible to get a version conflict. The reproduction path is the following: ``` PUT /_bulk {"create":{"_index": "my-ds-with-fs", "_id": "1"}} {"@timestamp": "2022-01-01", "baz": "quick", "a": "brown", "b": "fox"} {"create":{"_index": "my-ds-with-fs", "_id": "1"}} {"@timestamp": "2022-01-01", "baz": "lazy", "a": "dog"} ``` We would like the second document to not be sent to the failure store and return an error to the user: ``` { "errors" : true, "took" : 409, "items" : [ { "create" : { "_index" : ".ds-my-ds-with-fs-xxxxx-xxxx", "_id" : "1", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 0, "_primary_term" : 1, "status" : 201 } }, { "create" : { "_index" : ".ds-my-ds-with-fs-xxxxx-xxxx", "_id" : "1", "status" : 409, "error" : { "type" : "version_conflict_engine_exception", "reason" : "[1]: version conflict, document already exists (current version [1])", "index_uuid" : ".....", "shard" : "0", "index" : ".ds-my-ds-with-fs-xxxxx-xxxx" } } } ] } ``` The version conflict doc is counted as a rejected doc in APM telemetry.
1 parent a587c7d commit a43ffd5

File tree

2 files changed

+46
-2
lines changed

2 files changed

+46
-2
lines changed

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,3 +748,43 @@ teardown:
748748
indices.delete:
749749
index: .fs-logs-foobar-*
750750
- is_true: acknowledged
751+
752+
---
753+
"Version conflicts are not redirected to failure store":
754+
- requires:
755+
cluster_features: ["gte_v8.16.0"]
756+
reason: "Redirecting version conflicts to the failure store is considered a bug fixed in 8.16"
757+
test_runner_features: [allowed_warnings, contains]
758+
759+
- do:
760+
allowed_warnings:
761+
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
762+
indices.put_index_template:
763+
name: generic_logs_template
764+
body:
765+
index_patterns: logs-*
766+
data_stream:
767+
failure_store: true
768+
template:
769+
settings:
770+
number_of_shards: 1
771+
number_of_replicas: 1
772+
mappings:
773+
properties:
774+
'@timestamp':
775+
type: date
776+
count:
777+
type: long
778+
779+
- do:
780+
bulk:
781+
refresh: true
782+
body:
783+
- '{ "create": { "_index": "logs-foobar", "_id": "1" } }'
784+
- '{ "@timestamp": "2022-01-01", "baz": "quick", "a": "brown", "b": "fox" }'
785+
- '{ "create": { "_index": "logs-foobar", "_id": "1" } }'
786+
- '{ "@timestamp": "2022-01-01", "baz": "lazy", "a": "dog" }'
787+
- is_true: errors
788+
- match: { items.1.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
789+
- match: { items.1.create.status: 409 }
790+
- match: { items.1.create.error.type: version_conflict_engine_exception}

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.core.TimeValue;
4545
import org.elasticsearch.index.Index;
4646
import org.elasticsearch.index.IndexNotFoundException;
47+
import org.elasticsearch.index.engine.VersionConflictEngineException;
4748
import org.elasticsearch.index.shard.ShardId;
4849
import org.elasticsearch.indices.IndexClosedException;
4950
import org.elasticsearch.node.NodeClosedException;
@@ -478,15 +479,18 @@ private void completeShardOperation() {
478479
}
479480

480481
private void processFailure(BulkItemRequest bulkItemRequest, Exception cause) {
481-
var errorType = ElasticsearchException.getExceptionName(ExceptionsHelper.unwrapCause(cause));
482+
var error = ExceptionsHelper.unwrapCause(cause);
483+
var errorType = ElasticsearchException.getExceptionName(error);
482484
DocWriteRequest<?> docWriteRequest = bulkItemRequest.request();
483485
DataStream failureStoreCandidate = getRedirectTargetCandidate(docWriteRequest, getClusterState().metadata());
484486
// If the candidate is not null, the BulkItemRequest targets a data stream, but we'll still have to check if
485487
// it has the failure store enabled.
486488
if (failureStoreCandidate != null) {
487489
// Do not redirect documents to a failure store that were already headed to one.
488490
var isFailureStoreDoc = docWriteRequest instanceof IndexRequest indexRequest && indexRequest.isWriteToFailureStore();
489-
if (isFailureStoreDoc == false && failureStoreCandidate.isFailureStoreEnabled()) {
491+
if (isFailureStoreDoc == false
492+
&& failureStoreCandidate.isFailureStoreEnabled()
493+
&& error instanceof VersionConflictEngineException == false) {
490494
// Redirect to failure store.
491495
maybeMarkFailureStoreForRollover(failureStoreCandidate);
492496
addDocumentToRedirectRequests(bulkItemRequest, cause, failureStoreCandidate.getName());

0 commit comments

Comments
 (0)