Skip to content

Commit f4f075a

Browse files
authored
Add failure store status in index response of data streams (#112816)
The failure store status is a flag that indicates how the failure store was used or could be used if enabled. The user can be informed about the usage of the failure store in the following way: When relevant we add the optional field `failure_store` . The field will be omitted when the use of the failure store is not relevant. For example, if a document was successfully indexed in a data stream, if a failure concerns an index or if the opType is not index or create. In more detail: - when we have a “success” create/index response, the field `failure_store` will not be present if the documented was indexed in a backing index. Otherwise, if it got stored in the failure store it will have the value `used`. - when we have a “rejected“ create/index response, meaning the document was not persisted in elasticsearch, we return the field `failure_store` which is either `not_enabled`, if the document could have ended up in the failure store if it was enabled, or `failed` if something went wrong and the document was not persisted in the failure store, for example, the cluster is out of space and in read-only mode. We chose to make it an optional field to reduce the impact of this field on a bulk response. The value will exist in the java object but it will not be returned to the user. The only values that will be displayed are: - `used`: meaning this document was indexed in the failure store - `not_enabled`: meaning this document was rejected but could have been stored in the failure store if it was applicable. - `failed`: meaning this failed document, failed to be stored in the failure store. Example: ``` "errors": true, "took": 202, "items": [ { "create": { "_index": ".fs-my-ds-2024.09.04-000002", "_id": "iRDDvJEB_J3Inuia2zgH", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "_seq_no": 6, "_primary_term": 1, "status": 201, "failure_store": "used" } }, { "create": { "_index": "ds-no-fs", "_id": "hxDDvJEB_J3Inuia2jj3", "status": 400, "error": { "type": "document_parsing_exception", "reason": "[1:153] failed to parse field [count] of type [long] in document with id 'hxDDvJEB_J3Inuia2jj3'. Preview of field's value: 'bla'", "caused_by": { "type": "illegal_argument_exception", "reason": "For input string: \"bla\"" } } }, "failure_store": "not_enabled" }, { "create": { "_index": ".ds-my-ds-2024.09.04-000001", "_id": "iBDDvJEB_J3Inuia2jj3", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "_seq_no": 7, "_primary_term": 1, "status": 201 } } ] ```
1 parent 49183d6 commit f4f075a

File tree

18 files changed

+639
-81
lines changed

18 files changed

+639
-81
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,7 +1415,7 @@ public void testNoTimestampInDocument() throws Exception {
14151415

14161416
IndexRequest indexRequest = new IndexRequest(dataStreamName).opType("create").source("{}", XContentType.JSON);
14171417
Exception e = expectThrows(Exception.class, client().index(indexRequest));
1418-
assertThat(e.getCause().getMessage(), equalTo("data stream timestamp field [@timestamp] is missing"));
1418+
assertThat(e.getCause().getCause().getMessage(), equalTo("data stream timestamp field [@timestamp] is missing"));
14191419
}
14201420

14211421
public void testMultipleTimestampValuesInDocument() throws Exception {
@@ -1431,7 +1431,7 @@ public void testMultipleTimestampValuesInDocument() throws Exception {
14311431
IndexRequest indexRequest = new IndexRequest(dataStreamName).opType("create")
14321432
.source("{\"@timestamp\": [\"2020-12-12\",\"2022-12-12\"]}", XContentType.JSON);
14331433
Exception e = expectThrows(Exception.class, client().index(indexRequest));
1434-
assertThat(e.getCause().getMessage(), equalTo("data stream timestamp field [@timestamp] encountered multiple values"));
1434+
assertThat(e.getCause().getCause().getMessage(), equalTo("data stream timestamp field [@timestamp] encountered multiple values"));
14351435
}
14361436

14371437
public void testMixedAutoCreate() throws Exception {

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml

Lines changed: 135 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@ teardown:
3333
---
3434
"Redirect ingest failure in data stream to failure store":
3535
- requires:
36-
cluster_features: ["gte_v8.15.0"]
37-
reason: "data stream failure stores REST structure changed in 8.15+"
38-
test_runner_features: [allowed_warnings, contains]
39-
36+
reason: "Failure store status was added in 8.16+"
37+
test_runner_features: [capabilities, allowed_warnings, contains]
38+
capabilities:
39+
- method: POST
40+
path: /{index}/_doc
41+
capabilities: [ 'failure_store_status' ]
4042
- do:
4143
ingest.put_pipeline:
4244
id: "failing_pipeline"
@@ -92,6 +94,8 @@ teardown:
9294
body:
9395
'@timestamp': '2020-12-12'
9496
foo: bar
97+
- match: { failure_store: used}
98+
- match: { _index: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/'}
9599

96100
- do:
97101
indices.get_data_stream:
@@ -144,9 +148,12 @@ teardown:
144148
---
145149
"Redirect shard failure in data stream to failure store":
146150
- requires:
147-
cluster_features: ["gte_v8.14.0"]
148-
reason: "data stream failure stores only redirect shard failures in 8.14+"
149-
test_runner_features: [allowed_warnings, contains]
151+
reason: "Failure store status was added in 8.16+"
152+
test_runner_features: [ capabilities, allowed_warnings, contains ]
153+
capabilities:
154+
- method: POST
155+
path: /{index}/_doc
156+
capabilities: [ 'failure_store_status' ]
150157

151158
- do:
152159
allowed_warnings:
@@ -176,6 +183,8 @@ teardown:
176183
body:
177184
'@timestamp': '2020-12-12'
178185
count: 'invalid value'
186+
- match: { _index: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000002/'}
187+
- match: { failure_store: used}
179188

180189
- do:
181190
indices.get_data_stream:
@@ -222,14 +231,13 @@ teardown:
222231

223232
---
224233
"Ensure failure is redirected to correct failure store after a reroute processor":
225-
- skip:
226-
known_issues:
227-
- cluster_feature: "gte_v8.15.0"
228-
fixed_by: "gte_v8.16.0"
229-
reason: "Failure store documents contained the original index name rather than the rerouted one before v8.16.0"
230234
- requires:
231-
test_runner_features: [allowed_warnings]
232-
235+
test_runner_features: [allowed_warnings, capabilities]
236+
reason: "Failure store status was added in 8.16+"
237+
capabilities:
238+
- method: POST
239+
path: /{index}/_doc
240+
capabilities: [ 'failure_store_status' ]
233241
- do:
234242
ingest.put_pipeline:
235243
id: "failing_pipeline"
@@ -307,6 +315,7 @@ teardown:
307315
body:
308316
'@timestamp': '2020-12-12'
309317
foo: bar
318+
- match: { failure_store: used}
310319

311320
- do:
312321
search:
@@ -422,9 +431,12 @@ teardown:
422431
---
423432
"Failure redirects to original failure store during index change if final pipeline changes target":
424433
- requires:
425-
cluster_features: [ "gte_v8.15.0" ]
426-
reason: "data stream failure stores REST structure changed in 8.15+"
427-
test_runner_features: [ allowed_warnings, contains ]
434+
reason: "Failure store status was added in 8.16+"
435+
test_runner_features: [ capabilities, allowed_warnings, contains ]
436+
capabilities:
437+
- method: POST
438+
path: /{index}/_doc
439+
capabilities: [ 'failure_store_status' ]
428440

429441
- do:
430442
ingest.put_pipeline:
@@ -466,6 +478,7 @@ teardown:
466478
body:
467479
'@timestamp': '2020-12-12'
468480
foo: bar
481+
- match: { failure_store: used}
469482

470483
- do:
471484
indices.get_data_stream:
@@ -514,9 +527,12 @@ teardown:
514527
---
515528
"Failure redirects to correct failure store when index loop is detected":
516529
- requires:
517-
cluster_features: [ "gte_v8.15.0" ]
518-
reason: "data stream failure stores REST structure changed in 8.15+"
519-
test_runner_features: [ allowed_warnings, contains ]
530+
reason: "Failure store status was added in 8.16+"
531+
test_runner_features: [ capabilities, allowed_warnings, contains ]
532+
capabilities:
533+
- method: POST
534+
path: /{index}/_doc
535+
capabilities: [ 'failure_store_status' ]
520536

521537
- do:
522538
ingest.put_pipeline:
@@ -591,6 +607,8 @@ teardown:
591607
body:
592608
'@timestamp': '2020-12-12'
593609
foo: bar
610+
- match: { _index: '/\.fs-destination-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
611+
- match: { failure_store: used}
594612

595613

596614
- do:
@@ -640,9 +658,12 @@ teardown:
640658
---
641659
"Failure redirects to correct failure store when pipeline loop is detected":
642660
- requires:
643-
cluster_features: [ "gte_v8.15.0" ]
644-
reason: "data stream failure stores REST structure changed in 8.15+"
645-
test_runner_features: [ allowed_warnings, contains ]
661+
reason: "Failure store status was added in 8.16+"
662+
test_runner_features: [ capabilities, allowed_warnings, contains ]
663+
capabilities:
664+
- method: POST
665+
path: /{index}/_doc
666+
capabilities: [ 'failure_store_status' ]
646667

647668
- do:
648669
ingest.put_pipeline:
@@ -701,6 +722,7 @@ teardown:
701722
body:
702723
'@timestamp': '2020-12-12'
703724
foo: bar
725+
- match: { failure_store: used}
704726

705727
- do:
706728
indices.get_data_stream:
@@ -752,9 +774,7 @@ teardown:
752774
---
753775
"Version conflicts are not redirected to failure store":
754776
- requires:
755-
cluster_features: ["gte_v8.16.0"]
756-
reason: "Redirecting version conflicts to the failure store is considered a bug fixed in 8.16"
757-
test_runner_features: [allowed_warnings, contains]
777+
test_runner_features: [ allowed_warnings]
758778

759779
- do:
760780
allowed_warnings:
@@ -788,3 +808,92 @@ teardown:
788808
- match: { items.1.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
789809
- match: { items.1.create.status: 409 }
790810
- match: { items.1.create.error.type: version_conflict_engine_exception}
811+
- is_false: items.1.create.failure_store
812+
813+
---
814+
"Test failure store status with bulk request":
815+
- requires:
816+
test_runner_features: [ allowed_warnings, capabilities ]
817+
reason: "Failure store status was added in 8.16+"
818+
capabilities:
819+
- method: POST
820+
path: /_bulk
821+
capabilities: [ 'failure_store_status' ]
822+
- method: PUT
823+
path: /_bulk
824+
capabilities: [ 'failure_store_status' ]
825+
826+
- do:
827+
allowed_warnings:
828+
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
829+
indices.put_index_template:
830+
name: generic_logs_template
831+
body:
832+
index_patterns: logs-*
833+
data_stream:
834+
failure_store: true
835+
template:
836+
settings:
837+
number_of_shards: 1
838+
number_of_replicas: 1
839+
mappings:
840+
properties:
841+
'@timestamp':
842+
type: date
843+
count:
844+
type: long
845+
- do:
846+
allowed_warnings:
847+
- "index template [no-fs] has index patterns [no-fs*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [no-fs] will take precedence during new index creation"
848+
indices.put_index_template:
849+
name: no-fs
850+
body:
851+
index_patterns: no-fs*
852+
data_stream:
853+
failure_store: false
854+
template:
855+
settings:
856+
number_of_shards: 1
857+
number_of_replicas: 0
858+
mappings:
859+
properties:
860+
'@timestamp':
861+
type: date
862+
count:
863+
type: long
864+
865+
866+
- do:
867+
bulk:
868+
refresh: true
869+
body:
870+
- '{ "create": { "_index": "logs-foobar", "_id": "1" } }'
871+
- '{ "@timestamp": "2022-01-01", "baz": "quick", "a": "brown", "b": "fox" }'
872+
- '{ "create": { "_index": "logs-foobar", "_id": "1" } }'
873+
- '{ "@timestamp": "2022-01-01", "baz": "lazy", "a": "dog" }'
874+
- '{ "create": { "_index": "logs-foobar", "_id": "1" } }'
875+
- '{ "@timestamp": "2022-01-01", "count": "invalid" }'
876+
- '{ "create": { "_index": "no-fs", "_id": "1" } }'
877+
- '{ "@timestamp": "2022-01-01", "count": "invalid" }'
878+
- is_true: errors
879+
# Successfully indexed to backing index
880+
- match: { items.0.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
881+
- match: { items.0.create.status: 201 }
882+
- is_false: items.1.create.failure_store
883+
884+
# Rejected but not eligible to go to failure store
885+
- match: { items.1.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
886+
- match: { items.1.create.status: 409 }
887+
- match: { items.1.create.error.type: version_conflict_engine_exception}
888+
- is_false: items.1.create.failure_store
889+
890+
# Successfully indexed to failure store
891+
- match: { items.2.create._index: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
892+
- match: { items.2.create.status: 201 }
893+
- match: { items.2.create.failure_store: used }
894+
895+
# Rejected, eligible to go to failure store, but failure store not enabled
896+
- match: { items.3.create._index: '/\.ds-no-fs-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
897+
- match: { items.3.create.status: 400 }
898+
- match: { items.3.create.error.type: document_parsing_exception }
899+
- match: { items.3.create.failure_store: not_enabled }

server/src/main/java/org/elasticsearch/ElasticsearchException.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.lucene.index.IndexFormatTooOldException;
1515
import org.apache.lucene.store.AlreadyClosedException;
1616
import org.apache.lucene.store.LockObtainFailedException;
17+
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
1718
import org.elasticsearch.action.support.replication.ReplicationOperation;
1819
import org.elasticsearch.cluster.action.shard.ShardStateAction;
1920
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
@@ -1929,6 +1930,12 @@ private enum ElasticsearchExceptionHandle {
19291930
org.elasticsearch.ingest.IngestPipelineException::new,
19301931
182,
19311932
TransportVersions.INGEST_PIPELINE_EXCEPTION_ADDED
1933+
),
1934+
INDEX_RESPONSE_WRAPPER_EXCEPTION(
1935+
IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class,
1936+
IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus::new,
1937+
183,
1938+
TransportVersions.FAILURE_STORE_STATUS_IN_INDEX_RESPONSE
19321939
);
19331940

19341941
final Class<? extends ElasticsearchException> exceptionClass;

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ static TransportVersion def(int id) {
219219
public static final TransportVersion SIMULATE_COMPONENT_TEMPLATES_SUBSTITUTIONS = def(8_743_00_0);
220220
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_EMBEDDINGS_ADDED = def(8_744_00_0);
221221
public static final TransportVersion BULK_INCREMENTAL_STATE = def(8_745_00_0);
222+
public static final TransportVersion FAILURE_STORE_STATUS_IN_INDEX_RESPONSE = def(8_746_00_0);
222223

223224
/*
224225
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/DocWriteResponse.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.action;
1010

1111
import org.elasticsearch.TransportVersions;
12+
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
1213
import org.elasticsearch.action.support.WriteRequest;
1314
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
1415
import org.elasticsearch.action.support.WriteResponse;
@@ -249,6 +250,10 @@ public String getLocation(@Nullable String routing) {
249250
return location.toString();
250251
}
251252

253+
public IndexDocFailureStoreStatus getFailureStoreStatus() {
254+
return IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN;
255+
}
256+
252257
public void writeThin(StreamOutput out) throws IOException {
253258
super.writeTo(out);
254259
writeWithoutShardId(out);

0 commit comments

Comments
 (0)