Skip to content

Commit 7b89f4d

Browse files
authored
Add ability to redirect ingestion failures on data streams to a failure store (#126973)
Removes the feature flags and guards that prevent the new failure store functionality from operating in production runtimes.
1 parent 72b4ed2 commit 7b89f4d

File tree

90 files changed

+2020
-2261
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+2020
-2261
lines changed

docs/build.gradle

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,6 @@ testClusters.matching { it.name == "yamlRestTest"}.configureEach {
8585
setting 'xpack.license.self_generated.type', 'trial'
8686
setting 'indices.lifecycle.history_index_enabled', 'false'
8787
keystorePassword 'keystore-password'
88-
if (buildParams.snapshotBuild == false) {
89-
requiresFeature 'es.failure_store_feature_flag_enabled', new Version(8, 12, 0)
90-
}
9188
}
9289

9390
// debug ccr test failures:
@@ -126,7 +123,6 @@ testClusters.matching { it.name == "yamlRestTest"}.configureEach {
126123

127124

128125
requiresFeature 'es.index_mode_feature_flag_registered', Version.fromString("8.0.0")
129-
requiresFeature 'es.failure_store_feature_flag_enabled', Version.fromString("8.12.0")
130126

131127
// TODO Rene: clean up this kind of cross project file references
132128
extraConfigFile 'op-jwks.json', project(':x-pack:test:idp-fixture').file("src/main/resources/oidc/op-jwks.json")

docs/changelog/126973.yaml

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
pr: 126973
2+
summary: Add ability to redirect ingestion failures on data streams to a failure store
3+
area: Data streams
4+
type: feature
5+
issues: []
6+
highlight:
7+
title: Add ability to redirect ingestion failures on data streams to a failure store
8+
body: |-
9+
Documents that encountered ingest pipeline failures or mapping conflicts
10+
would previously be returned to the client as errors in the bulk and
11+
index operations. Many client applications are not equipped to respond
12+
to these failures. This leads to the failed documents often being
13+
dropped by the client which cannot hold the broken documents
14+
indefinitely. In many end user workloads, these failed documents
15+
represent events that could be critical signals for observability or
16+
security use cases.
17+
18+
To help mitigate this problem, data streams can now maintain a "failure
19+
store" which is used to accept and hold documents that fail to be
20+
ingested due to preventable configuration errors. The data stream's
21+
failure store operates like a separate set of backing indices with their
22+
own mappings and access patterns that allow Elasticsearch to accept
23+
documents that would otherwise be rejected due to unhandled ingest
24+
pipeline exceptions or mapping conflicts.
25+
26+
Users can enable redirection of ingest failures to the failure store on
27+
new data streams by specifying it in the new `data_stream_options` field
28+
inside of a component or index template:
29+
30+
[source,yaml]
31+
----
32+
PUT _index_template/my-template
33+
{
34+
"index_patterns": ["logs-test-*"],
35+
"data_stream": {},
36+
"template": {
37+
"data_stream_options": {
38+
"failure_store": {
39+
"enabled": true
40+
}
41+
}
42+
}
43+
}'
44+
----
45+
46+
Existing data streams can be configured with the new data stream
47+
`_options` endpoint:
48+
49+
[source,yaml]
50+
----
51+
PUT _data_stream/logs-test-apache/_options
52+
{
53+
"failure_store": {
54+
"enabled": "true"
55+
}
56+
}
57+
----
58+
59+
When redirection is enabled, any ingestion related failures will be
60+
captured in the failure store if the cluster is able to, along with the
61+
timestamp that the failure occurred, details about the error
62+
encountered, and the document that could not be ingested. Since failure
63+
stores are a kind of Elasticsearch index, we can search the data stream
64+
for the failures that it has collected. The failures are not shown by
65+
default as they are stored in different indices than the normal data
66+
stream data. In order to retrieve the failures, we use the `_search` API
67+
along with a new bit of index pattern syntax, the `::` selector.
68+
69+
[source,yaml]
70+
----
71+
POST logs-test-apache::failures/_search
72+
----
73+
74+
This index syntax informs the search operation to target the indices in
75+
its failure store instead of its backing indices. It can be mixed in a
76+
number of ways with other index patterns to include their failure store
77+
indices in the search operation:
78+
79+
[source,yaml]
80+
----
81+
POST logs-*::failures/_search
82+
POST logs-*,logs-*::failures/_search
83+
POST *::failures/_search
84+
POST _query
85+
{
86+
"query": "FROM my_data_stream*::failures"
87+
}
88+
----
89+
notable: true

modules/data-streams/build.gradle

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,6 @@ if (buildParams.inFipsJvm){
3636
tasks.named("yamlRestTest").configure{enabled = false }
3737
}
3838

39-
if (buildParams.snapshotBuild == false) {
40-
tasks.withType(Test).configureEach {
41-
systemProperty 'es.failure_store_feature_flag_enabled', 'true'
42-
}
43-
}
44-
4539
tasks.named("yamlRestCompatTestTransform").configure({ task ->
4640
task.skipTest("data_stream/10_basic/Create hidden data stream", "warning does not exist for compatibility")
4741

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AbstractDataStreamIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.common.settings.Settings;
1717
import org.elasticsearch.common.util.concurrent.ThreadContext;
1818
import org.elasticsearch.test.cluster.ElasticsearchCluster;
19-
import org.elasticsearch.test.cluster.FeatureFlag;
2019
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
2120
import org.elasticsearch.test.rest.ESRestTestCase;
2221
import org.junit.After;
@@ -38,7 +37,6 @@ public abstract class AbstractDataStreamIT extends ESRestTestCase {
3837
@ClassRule
3938
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
4039
.distribution(DistributionType.DEFAULT)
41-
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
4240
.setting("xpack.security.enabled", "false")
4341
.setting("xpack.watcher.enabled", "false")
4442
// Disable apm-data so the index templates it installs do not impact

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamWithSecurityIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.common.settings.Settings;
1616
import org.elasticsearch.common.util.concurrent.ThreadContext;
1717
import org.elasticsearch.test.cluster.ElasticsearchCluster;
18-
import org.elasticsearch.test.cluster.FeatureFlag;
1918
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
2019
import org.elasticsearch.test.cluster.util.resource.Resource;
2120
import org.elasticsearch.test.rest.ESRestTestCase;
@@ -29,7 +28,6 @@ public class DataStreamWithSecurityIT extends ESRestTestCase {
2928
@ClassRule
3029
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
3130
.distribution(DistributionType.DEFAULT)
32-
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
3331
.setting("xpack.watcher.enabled", "false")
3432
.setting("xpack.ml.enabled", "false")
3533
.setting("xpack.security.enabled", "true")

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DisabledSecurityDataStreamTestCase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.common.settings.Settings;
1414
import org.elasticsearch.common.util.concurrent.ThreadContext;
1515
import org.elasticsearch.test.cluster.ElasticsearchCluster;
16-
import org.elasticsearch.test.cluster.FeatureFlag;
1716
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
1817
import org.elasticsearch.test.rest.ESRestTestCase;
1918
import org.junit.ClassRule;
@@ -27,7 +26,6 @@ public abstract class DisabledSecurityDataStreamTestCase extends ESRestTestCase
2726
@ClassRule
2827
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
2928
.distribution(DistributionType.DEFAULT)
30-
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
3129
.setting("xpack.security.enabled", "false")
3230
.setting("xpack.watcher.enabled", "false")
3331
.build();

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.elasticsearch.common.settings.Settings;
1818
import org.elasticsearch.common.util.concurrent.ThreadContext;
1919
import org.elasticsearch.test.cluster.ElasticsearchCluster;
20-
import org.elasticsearch.test.cluster.FeatureFlag;
2120
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
2221
import org.elasticsearch.test.cluster.util.resource.Resource;
2322
import org.elasticsearch.test.rest.ESRestTestCase;
@@ -41,7 +40,6 @@ public class LazyRolloverDataStreamIT extends ESRestTestCase {
4140
@ClassRule
4241
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
4342
.distribution(DistributionType.DEFAULT)
44-
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
4543
.setting("xpack.watcher.enabled", "false")
4644
.setting("xpack.ml.enabled", "false")
4745
.setting("xpack.security.enabled", "true")

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecyclePermissionsTestCase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.common.util.concurrent.ThreadContext;
2020
import org.elasticsearch.rest.RestStatus;
2121
import org.elasticsearch.test.cluster.ElasticsearchCluster;
22-
import org.elasticsearch.test.cluster.FeatureFlag;
2322
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
2423
import org.elasticsearch.test.cluster.util.resource.Resource;
2524
import org.elasticsearch.test.rest.ESRestTestCase;
@@ -46,7 +45,6 @@ public abstract class DataStreamLifecyclePermissionsTestCase extends ESRestTestC
4645
@ClassRule
4746
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
4847
.distribution(DistributionType.DEFAULT)
49-
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
5048
.setting("xpack.watcher.enabled", "false")
5149
.setting("xpack.ml.enabled", "false")
5250
.setting("xpack.security.enabled", "true")

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class DataStreamFeatures implements FeatureSpecification {
2828

2929
@Override
3030
public Set<NodeFeature> getFeatures() {
31-
return DataStream.isFailureStoreFeatureFlagEnabled() ? Set.of(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE) : Set.of();
31+
return Set.of(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE);
3232
}
3333

3434
@Override

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
2222
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
2323
import org.elasticsearch.client.internal.OriginSettingClient;
24-
import org.elasticsearch.cluster.metadata.DataStream;
2524
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2625
import org.elasticsearch.cluster.node.DiscoveryNodes;
2726
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -237,11 +236,9 @@ public List<ActionHandler> getActions() {
237236
actions.add(new ActionHandler(DeleteDataStreamLifecycleAction.INSTANCE, TransportDeleteDataStreamLifecycleAction.class));
238237
actions.add(new ActionHandler(ExplainDataStreamLifecycleAction.INSTANCE, TransportExplainDataStreamLifecycleAction.class));
239238
actions.add(new ActionHandler(GetDataStreamLifecycleStatsAction.INSTANCE, TransportGetDataStreamLifecycleStatsAction.class));
240-
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
241-
actions.add(new ActionHandler(GetDataStreamOptionsAction.INSTANCE, TransportGetDataStreamOptionsAction.class));
242-
actions.add(new ActionHandler(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class));
243-
actions.add(new ActionHandler(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class));
244-
}
239+
actions.add(new ActionHandler(GetDataStreamOptionsAction.INSTANCE, TransportGetDataStreamOptionsAction.class));
240+
actions.add(new ActionHandler(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class));
241+
actions.add(new ActionHandler(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class));
245242
return actions;
246243
}
247244

@@ -274,11 +271,9 @@ public List<RestHandler> getRestHandlers(
274271
handlers.add(new RestDeleteDataStreamLifecycleAction());
275272
handlers.add(new RestExplainDataStreamLifecycleAction());
276273
handlers.add(new RestDataStreamLifecycleStatsAction());
277-
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
278-
handlers.add(new RestGetDataStreamOptionsAction());
279-
handlers.add(new RestPutDataStreamOptionsAction());
280-
handlers.add(new RestDeleteDataStreamOptionsAction());
281-
}
274+
handlers.add(new RestGetDataStreamOptionsAction());
275+
handlers.add(new RestPutDataStreamOptionsAction());
276+
handlers.add(new RestDeleteDataStreamOptionsAction());
282277
return handlers;
283278
}
284279

0 commit comments

Comments
 (0)