Skip to content

Commit 44d611e

Browse files
authored
[8.x] Add node feature for failure store, refactor capability names (#126885) (#127091)
* Add node feature for failure store, refactor capability names (#126885) Adds a node feature that is conditionally added to the cluster state if the failure store feature flag is enabled. Requires all nodes in the cluster to have the node feature present in order to redirect failed documents to the failure store from the ingest node or from shard level bulk failures. * Fix backporting issues
1 parent 66db6f2 commit 44d611e

File tree

26 files changed

+304
-67
lines changed

26 files changed

+304
-67
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.Version;
1313
import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
1414
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
15+
import org.elasticsearch.cluster.metadata.DataStream;
1516
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
1617
import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher;
1718
import org.elasticsearch.features.FeatureSpecification;
@@ -39,12 +40,22 @@ public Map<NodeFeature, Version> getHistoricalFeatures() {
3940

4041
@Override
4142
public Set<NodeFeature> getFeatures() {
42-
return Set.of(
43-
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
44-
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
45-
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE,
46-
DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14
47-
);
43+
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
44+
return Set.of(
45+
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
46+
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
47+
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE,
48+
DataStreamGlobalRetention.GLOBAL_RETENTION, // Added in 8.14
49+
DataStream.DATA_STREAM_FAILURE_STORE_FEATURE // Added in 8.19
50+
);
51+
} else {
52+
return Set.of(
53+
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
54+
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
55+
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE,
56+
DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14
57+
);
58+
}
4859
}
4960

5061
@Override

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ setup:
217217
capabilities:
218218
- method: POST
219219
path: /_index_template/{template}
220-
capabilities: [ 'failure_store_in_template' ]
220+
capabilities: [ 'data_stream_options.failure_store' ]
221221

222222
- do:
223223
ingest.put_pipeline:
@@ -643,7 +643,7 @@ setup:
643643
capabilities:
644644
- method: POST
645645
path: /_index_template/{template}
646-
capabilities: [ 'failure_store_in_template' ]
646+
capabilities: [ 'data_stream_options.failure_store' ]
647647

648648
- do:
649649
allowed_warnings:
@@ -739,7 +739,7 @@ setup:
739739
capabilities:
740740
- method: POST
741741
path: /_index_template/{template}
742-
capabilities: [ 'failure_store_in_template' ]
742+
capabilities: [ 'data_stream_options.failure_store' ]
743743

744744
- do:
745745
allowed_warnings:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ TSDB failures go to failure store:
190190
capabilities:
191191
- method: POST
192192
path: /_index_template/{template}
193-
capabilities: [ 'failure_store_in_template' ]
193+
capabilities: [ 'data_stream_options.failure_store' ]
194194
- do:
195195
allowed_warnings:
196196
- "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
capabilities:
9898
- method: POST
9999
path: /_index_template/{template}
100-
capabilities: [ 'failure_store_in_template' ]
100+
capabilities: [ 'data_stream_options.failure_store' ]
101101

102102
- do:
103103
allowed_warnings:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ setup:
88
capabilities: [ 'failure_store_status' ]
99
- method: POST
1010
path: /_index_template/{template}
11-
capabilities: [ 'failure_store_in_template' ]
11+
capabilities: [ 'data_stream_options.failure_store' ]
1212

1313
---
1414
teardown:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/210_rollover_failure_store.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ setup:
66
capabilities:
77
- method: POST
88
path: /_index_template/{template}
9-
capabilities: [ 'failure_store_in_template' ]
9+
capabilities: [ 'data_stream_options.failure_store' ]
1010
- method: POST
1111
path: /{index}/_rollover
12-
capabilities: [ 'lazy-rollover-failure-store', 'index-expression-selectors' ]
12+
capabilities: [ 'index_expression_selectors' ]
1313

1414
- do:
1515
allowed_warnings:
@@ -313,7 +313,7 @@ teardown:
313313
capabilities:
314314
- method: POST
315315
path: /{index}/_rollover
316-
capabilities: [lazy-rollover-failure-store]
316+
capabilities: [index_expression_selectors]
317317

318318
# Initialize failure store
319319
- do:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
setup:
22
- requires:
3-
reason: "Data stream options was added in 8.18+"
3+
cluster_features: [ "data_stream.failure_store" ]
4+
reason: "Failure store GA in 8.19+"
45
test_runner_features: [ capabilities, allowed_warnings, contains ]
56
capabilities:
67
- method: POST
78
path: /{index}/_doc
89
capabilities: [ 'failure_store_status' ]
910
- method: POST
1011
path: /_index_template/{template}
11-
capabilities: [ 'failure_store_in_template' ]
12-
- method: PUT
13-
path: /_cluster/settings
14-
capabilities: [ 'data_stream_failure_store_cluster_setting' ]
12+
capabilities: [ 'data_stream_options.failure_store' ]
1513

1614
- do:
1715
cluster.put_settings:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/230_data_stream_options.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ setup:
66
capabilities:
77
- method: POST
88
path: /_index_template/{template}
9-
capabilities: [ 'failure_store_in_template' ]
9+
capabilities: [ 'data_stream_options.failure_store' ]
1010

1111
- do:
1212
allowed_warnings:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
capabilities:
5656
- method: POST
5757
path: /_index_template/{template}
58-
capabilities: [ 'failure_store_in_template' ]
58+
capabilities: [ 'data_stream_options.failure_store' ]
5959

6060
- do:
6161
allowed_warnings:

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
9999
private final Map<ShardId, Exception> shortCircuitShardFailures = ConcurrentCollections.newConcurrentMap();
100100
private final FailureStoreMetrics failureStoreMetrics;
101101
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
102+
private final boolean clusterHasFailureStoreFeature;
102103

103104
BulkOperation(
104105
Task task,
@@ -113,7 +114,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
113114
long startTimeNanos,
114115
ActionListener<BulkResponse> listener,
115116
FailureStoreMetrics failureStoreMetrics,
116-
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
117+
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
118+
boolean clusterHasFailureStoreFeature
117119
) {
118120
this(
119121
task,
@@ -130,7 +132,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
130132
new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()),
131133
new FailureStoreDocumentConverter(),
132134
failureStoreMetrics,
133-
dataStreamFailureStoreSettings
135+
dataStreamFailureStoreSettings,
136+
clusterHasFailureStoreFeature
134137
);
135138
}
136139

@@ -149,7 +152,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
149152
ClusterStateObserver observer,
150153
FailureStoreDocumentConverter failureStoreDocumentConverter,
151154
FailureStoreMetrics failureStoreMetrics,
152-
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
155+
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
156+
boolean clusterHasFailureStoreFeature
153157
) {
154158
super(listener);
155159
this.task = task;
@@ -169,6 +173,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
169173
this.shortCircuitShardFailures.putAll(bulkRequest.incrementalState().shardLevelFailures());
170174
this.failureStoreMetrics = failureStoreMetrics;
171175
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
176+
this.clusterHasFailureStoreFeature = clusterHasFailureStoreFeature;
172177
}
173178

174179
@Override
@@ -543,7 +548,7 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques
543548
DataStream failureStoreCandidate = getRedirectTargetCandidate(docWriteRequest, clusterState.metadata());
544549
// If the candidate is not null, the BulkItemRequest targets a data stream, but we'll still have to check if
545550
// it has the failure store enabled.
546-
if (failureStoreCandidate != null) {
551+
if (failureStoreCandidate != null && clusterHasFailureStoreFeature) {
547552
// Do not redirect documents to a failure store that were already headed to one.
548553
var isFailureStoreRequest = isFailureStoreRequest(docWriteRequest);
549554
if (isFailureStoreRequest == false

0 commit comments

Comments
 (0)