Skip to content

Commit 6c66b09

Browse files
jbaieraelasticsearchmachine
andauthored
[8.19] Add ability to redirect ingestion failures on data streams to a failure store (#126973) (#127546)
* Add ability to redirect ingestion failures on data streams to a failure store (#126973) Removes the feature flags and guards that prevent the new failure store functionality from operating in production runtimes. * Fix build * [CI] Auto commit changes from spotless * Fix build * Fix build * Fix build * [CI] Auto commit changes from spotless --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent 182b9b4 commit 6c66b09

File tree

87 files changed

+1176
-1412
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+1176
-1412
lines changed

docs/build.gradle

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,6 @@ testClusters.matching { it.name == "yamlRestTest"}.configureEach {
8383
setting 'xpack.license.self_generated.type', 'trial'
8484
setting 'indices.lifecycle.history_index_enabled', 'false'
8585
keystorePassword 'keystore-password'
86-
if (buildParams.snapshotBuild == false) {
87-
requiresFeature 'es.failure_store_feature_flag_enabled', new Version(8, 12, 0)
88-
}
8986
}
9087

9188
// debug ccr test failures:
@@ -124,7 +121,6 @@ testClusters.matching { it.name == "yamlRestTest"}.configureEach {
124121

125122

126123
requiresFeature 'es.index_mode_feature_flag_registered', Version.fromString("8.0.0")
127-
requiresFeature 'es.failure_store_feature_flag_enabled', Version.fromString("8.12.0")
128124

129125
// TODO Rene: clean up this kind of cross project file references
130126
extraConfigFile 'op-jwks.json', project(':x-pack:test:idp-fixture').file("src/main/resources/oidc/op-jwks.json")

docs/changelog/126973.yaml

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
pr: 126973
2+
summary: Add ability to redirect ingestion failures on data streams to a failure store
3+
area: Data streams
4+
type: feature
5+
issues: []
6+
highlight:
7+
title: Add ability to redirect ingestion failures on data streams to a failure store
8+
body: |-
9+
Documents that encountered ingest pipeline failures or mapping conflicts
10+
would previously be returned to the client as errors in the bulk and
11+
index operations. Many client applications are not equipped to respond
12+
to these failures. This leads to the failed documents often being
13+
dropped by the client which cannot hold the broken documents
14+
indefinitely. In many end user workloads, these failed documents
15+
represent events that could be critical signals for observability or
16+
security use cases.
17+
18+
To help mitigate this problem, data streams can now maintain a "failure
19+
store" which is used to accept and hold documents that fail to be
20+
ingested due to preventable configuration errors. The data stream's
21+
failure store operates like a separate set of backing indices with their
22+
own mappings and access patterns that allow Elasticsearch to accept
23+
documents that would otherwise be rejected due to unhandled ingest
24+
pipeline exceptions or mapping conflicts.
25+
26+
Users can enable redirection of ingest failures to the failure store on
27+
new data streams by specifying it in the new `data_stream_options` field
28+
inside of a component or index template:
29+
30+
[source,yaml]
31+
----
32+
PUT _index_template/my-template
33+
{
34+
"index_patterns": ["logs-test-*"],
35+
"data_stream": {},
36+
"template": {
37+
"data_stream_options": {
38+
"failure_store": {
39+
"enabled": true
40+
}
41+
}
42+
}
43+
}'
44+
----
45+
46+
Existing data streams can be configured with the new data stream
47+
`_options` endpoint:
48+
49+
[source,yaml]
50+
----
51+
PUT _data_stream/logs-test-apache/_options
52+
{
53+
"failure_store": {
54+
"enabled": "true"
55+
}
56+
}
57+
----
58+
59+
When redirection is enabled, any ingestion related failures will be
60+
captured in the failure store if the cluster is able to, along with the
61+
timestamp that the failure occurred, details about the error
62+
encountered, and the document that could not be ingested. Since failure
63+
stores are a kind of Elasticsearch index, we can search the data stream
64+
for the failures that it has collected. The failures are not shown by
65+
default as they are stored in different indices than the normal data
66+
stream data. In order to retrieve the failures, we use the `_search` API
67+
along with a new bit of index pattern syntax, the `::` selector.
68+
69+
[source,yaml]
70+
----
71+
POST logs-test-apache::failures/_search
72+
----
73+
74+
This index syntax informs the search operation to target the indices in
75+
its failure store instead of its backing indices. It can be mixed in a
76+
number of ways with other index patterns to include their failure store
77+
indices in the search operation:
78+
79+
[source,yaml]
80+
----
81+
POST logs-*::failures/_search
82+
POST logs-*,logs-*::failures/_search
83+
POST *::failures/_search
84+
POST _query
85+
{
86+
"query": "FROM my_data_stream*::failures"
87+
}
88+
----
89+
notable: true

modules/data-streams/build.gradle

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,6 @@ if (buildParams.inFipsJvm){
3636
tasks.named("yamlRestTest").configure{enabled = false }
3737
}
3838

39-
if (buildParams.snapshotBuild == false) {
40-
tasks.withType(Test).configureEach {
41-
systemProperty 'es.failure_store_feature_flag_enabled', 'true'
42-
}
43-
}
44-
4539
tasks.named("yamlRestTestV7CompatTransform").configure({ task ->
4640
task.skipTest("data_stream/10_basic/Create hidden data stream", "warning does not exist for compatibility")
4741

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AbstractDataStreamIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.common.settings.Settings;
1717
import org.elasticsearch.common.util.concurrent.ThreadContext;
1818
import org.elasticsearch.test.cluster.ElasticsearchCluster;
19-
import org.elasticsearch.test.cluster.FeatureFlag;
2019
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
2120
import org.elasticsearch.test.rest.ESRestTestCase;
2221
import org.junit.After;
@@ -38,7 +37,6 @@ public abstract class AbstractDataStreamIT extends ESRestTestCase {
3837
@ClassRule
3938
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
4039
.distribution(DistributionType.DEFAULT)
41-
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
4240
.setting("xpack.security.enabled", "false")
4341
.setting("xpack.watcher.enabled", "false")
4442
// Disable apm-data so the index templates it installs do not impact

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamWithSecurityIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.common.settings.Settings;
1616
import org.elasticsearch.common.util.concurrent.ThreadContext;
1717
import org.elasticsearch.test.cluster.ElasticsearchCluster;
18-
import org.elasticsearch.test.cluster.FeatureFlag;
1918
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
2019
import org.elasticsearch.test.cluster.util.resource.Resource;
2120
import org.elasticsearch.test.rest.ESRestTestCase;
@@ -29,7 +28,6 @@ public class DataStreamWithSecurityIT extends ESRestTestCase {
2928
@ClassRule
3029
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
3130
.distribution(DistributionType.DEFAULT)
32-
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
3331
.setting("xpack.watcher.enabled", "false")
3432
.setting("xpack.ml.enabled", "false")
3533
.setting("xpack.security.enabled", "true")

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DisabledSecurityDataStreamTestCase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.common.settings.Settings;
1414
import org.elasticsearch.common.util.concurrent.ThreadContext;
1515
import org.elasticsearch.test.cluster.ElasticsearchCluster;
16-
import org.elasticsearch.test.cluster.FeatureFlag;
1716
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
1817
import org.elasticsearch.test.rest.ESRestTestCase;
1918
import org.junit.ClassRule;
@@ -27,7 +26,6 @@ public abstract class DisabledSecurityDataStreamTestCase extends ESRestTestCase
2726
@ClassRule
2827
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
2928
.distribution(DistributionType.DEFAULT)
30-
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
3129
.setting("xpack.security.enabled", "false")
3230
.setting("xpack.watcher.enabled", "false")
3331
.build();

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.elasticsearch.common.settings.Settings;
1818
import org.elasticsearch.common.util.concurrent.ThreadContext;
1919
import org.elasticsearch.test.cluster.ElasticsearchCluster;
20-
import org.elasticsearch.test.cluster.FeatureFlag;
2120
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
2221
import org.elasticsearch.test.cluster.util.resource.Resource;
2322
import org.elasticsearch.test.rest.ESRestTestCase;
@@ -41,7 +40,6 @@ public class LazyRolloverDataStreamIT extends ESRestTestCase {
4140
@ClassRule
4241
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
4342
.distribution(DistributionType.DEFAULT)
44-
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
4543
.setting("xpack.watcher.enabled", "false")
4644
.setting("xpack.ml.enabled", "false")
4745
.setting("xpack.security.enabled", "true")

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecyclePermissionsTestCase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.common.util.concurrent.ThreadContext;
2020
import org.elasticsearch.rest.RestStatus;
2121
import org.elasticsearch.test.cluster.ElasticsearchCluster;
22-
import org.elasticsearch.test.cluster.FeatureFlag;
2322
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
2423
import org.elasticsearch.test.cluster.util.resource.Resource;
2524
import org.elasticsearch.test.rest.ESRestTestCase;
@@ -46,7 +45,6 @@ public abstract class DataStreamLifecyclePermissionsTestCase extends ESRestTestC
4645
@ClassRule
4746
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
4847
.distribution(DistributionType.DEFAULT)
49-
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
5048
.setting("xpack.watcher.enabled", "false")
5149
.setting("xpack.ml.enabled", "false")
5250
.setting("xpack.security.enabled", "true")

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,13 @@ public Map<NodeFeature, Version> getHistoricalFeatures() {
4040

4141
@Override
4242
public Set<NodeFeature> getFeatures() {
43-
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
44-
return Set.of(
45-
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
46-
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
47-
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE,
48-
DataStreamGlobalRetention.GLOBAL_RETENTION, // Added in 8.14
49-
DataStream.DATA_STREAM_FAILURE_STORE_FEATURE // Added in 8.19
50-
);
51-
} else {
52-
return Set.of(
53-
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
54-
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
55-
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE,
56-
DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14
57-
);
58-
}
43+
return Set.of(
44+
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
45+
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
46+
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE,
47+
DataStreamGlobalRetention.GLOBAL_RETENTION, // Added in 8.14
48+
DataStream.DATA_STREAM_FAILURE_STORE_FEATURE // Added in 8.19
49+
);
5950
}
6051

6152
@Override

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
2424
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
2525
import org.elasticsearch.client.internal.OriginSettingClient;
26-
import org.elasticsearch.cluster.metadata.DataStream;
2726
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2827
import org.elasticsearch.cluster.node.DiscoveryNodes;
2928
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -239,11 +238,9 @@ public Collection<?> createComponents(PluginServices services) {
239238
actions.add(new ActionHandler<>(DeleteDataStreamLifecycleAction.INSTANCE, TransportDeleteDataStreamLifecycleAction.class));
240239
actions.add(new ActionHandler<>(ExplainDataStreamLifecycleAction.INSTANCE, TransportExplainDataStreamLifecycleAction.class));
241240
actions.add(new ActionHandler<>(GetDataStreamLifecycleStatsAction.INSTANCE, TransportGetDataStreamLifecycleStatsAction.class));
242-
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
243-
actions.add(new ActionHandler<>(GetDataStreamOptionsAction.INSTANCE, TransportGetDataStreamOptionsAction.class));
244-
actions.add(new ActionHandler<>(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class));
245-
actions.add(new ActionHandler<>(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class));
246-
}
241+
actions.add(new ActionHandler<>(GetDataStreamOptionsAction.INSTANCE, TransportGetDataStreamOptionsAction.class));
242+
actions.add(new ActionHandler<>(PutDataStreamOptionsAction.INSTANCE, TransportPutDataStreamOptionsAction.class));
243+
actions.add(new ActionHandler<>(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class));
247244
return actions;
248245
}
249246

@@ -276,11 +273,9 @@ public List<RestHandler> getRestHandlers(
276273
handlers.add(new RestDeleteDataStreamLifecycleAction());
277274
handlers.add(new RestExplainDataStreamLifecycleAction());
278275
handlers.add(new RestDataStreamLifecycleStatsAction());
279-
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
280-
handlers.add(new RestGetDataStreamOptionsAction());
281-
handlers.add(new RestPutDataStreamOptionsAction());
282-
handlers.add(new RestDeleteDataStreamOptionsAction());
283-
}
276+
handlers.add(new RestGetDataStreamOptionsAction());
277+
handlers.add(new RestPutDataStreamOptionsAction());
278+
handlers.add(new RestDeleteDataStreamOptionsAction());
284279
return handlers;
285280
}
286281

0 commit comments

Comments
 (0)