Skip to content

Commit d928d1a

Browse files
authored
Add node feature for failure store, refactor capability names (#126885)
Adds a node feature that is conditionally added to the cluster state if the failure store feature flag is enabled. Requires all nodes in the cluster to have the node feature present in order to redirect failed documents to the failure store from the ingest node or from shard level bulk failures.
1 parent 85749d6 commit d928d1a

File tree

27 files changed

+309
-60
lines changed

27 files changed

+309
-60
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.datastreams;
1111

12+
import org.elasticsearch.cluster.metadata.DataStream;
1213
import org.elasticsearch.features.FeatureSpecification;
1314
import org.elasticsearch.features.NodeFeature;
1415

@@ -27,7 +28,7 @@ public class DataStreamFeatures implements FeatureSpecification {
2728

2829
@Override
2930
public Set<NodeFeature> getFeatures() {
30-
return Set.of();
31+
return DataStream.isFailureStoreFeatureFlagEnabled() ? Set.of(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE) : Set.of();
3132
}
3233

3334
@Override

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ setup:
217217
capabilities:
218218
- method: POST
219219
path: /_index_template/{template}
220-
capabilities: [ 'failure_store_in_template' ]
220+
capabilities: [ 'data_stream_options.failure_store' ]
221221

222222
- do:
223223
ingest.put_pipeline:
@@ -643,7 +643,7 @@ setup:
643643
capabilities:
644644
- method: POST
645645
path: /_index_template/{template}
646-
capabilities: [ 'failure_store_in_template' ]
646+
capabilities: [ 'data_stream_options.failure_store' ]
647647

648648
- do:
649649
allowed_warnings:
@@ -739,7 +739,7 @@ setup:
739739
capabilities:
740740
- method: POST
741741
path: /_index_template/{template}
742-
capabilities: [ 'failure_store_in_template' ]
742+
capabilities: [ 'data_stream_options.failure_store' ]
743743

744744
- do:
745745
allowed_warnings:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ TSDB failures go to failure store:
190190
capabilities:
191191
- method: POST
192192
path: /_index_template/{template}
193-
capabilities: [ 'failure_store_in_template' ]
193+
capabilities: [ 'data_stream_options.failure_store' ]
194194
- do:
195195
allowed_warnings:
196196
- "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
capabilities:
9898
- method: POST
9999
path: /_index_template/{template}
100-
capabilities: [ 'failure_store_in_template' ]
100+
capabilities: [ 'data_stream_options.failure_store' ]
101101

102102
- do:
103103
allowed_warnings:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ setup:
88
capabilities: [ 'failure_store_status' ]
99
- method: POST
1010
path: /_index_template/{template}
11-
capabilities: [ 'failure_store_in_template' ]
11+
capabilities: [ 'data_stream_options.failure_store' ]
1212

1313
---
1414
teardown:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/210_rollover_failure_store.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ setup:
66
capabilities:
77
- method: POST
88
path: /_index_template/{template}
9-
capabilities: [ 'failure_store_in_template' ]
9+
capabilities: [ 'data_stream_options.failure_store' ]
1010
- method: POST
1111
path: /{index}/_rollover
12-
capabilities: [ 'lazy-rollover-failure-store', 'index-expression-selectors' ]
12+
capabilities: [ 'index_expression_selectors' ]
1313

1414
- do:
1515
allowed_warnings:
@@ -313,7 +313,7 @@ teardown:
313313
capabilities:
314314
- method: POST
315315
path: /{index}/_rollover
316-
capabilities: [lazy-rollover-failure-store]
316+
capabilities: [index_expression_selectors]
317317

318318
# Initialize failure store
319319
- do:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
setup:
22
- requires:
3-
reason: "Data stream options was added in 8.18+"
3+
cluster_features: [ "data_stream.failure_store" ]
4+
reason: "Failure store GA in 8.19+"
45
test_runner_features: [ capabilities, allowed_warnings, contains ]
56
capabilities:
67
- method: POST
78
path: /{index}/_doc
89
capabilities: [ 'failure_store_status' ]
910
- method: POST
1011
path: /_index_template/{template}
11-
capabilities: [ 'failure_store_in_template' ]
12-
- method: PUT
13-
path: /_cluster/settings
14-
capabilities: [ 'data_stream_failure_store_cluster_setting' ]
12+
capabilities: [ 'data_stream_options.failure_store' ]
1513

1614
- do:
1715
cluster.put_settings:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/230_data_stream_options.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ setup:
66
capabilities:
77
- method: POST
88
path: /_index_template/{template}
9-
capabilities: [ 'failure_store_in_template' ]
9+
capabilities: [ 'data_stream_options.failure_store' ]
1010

1111
- do:
1212
allowed_warnings:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
capabilities:
5656
- method: POST
5757
path: /_index_template/{template}
58-
capabilities: [ 'failure_store_in_template' ]
58+
capabilities: [ 'data_stream_options.failure_store' ]
5959

6060
- do:
6161
allowed_warnings:

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
103103
private final Map<ShardId, Exception> shortCircuitShardFailures = ConcurrentCollections.newConcurrentMap();
104104
private final FailureStoreMetrics failureStoreMetrics;
105105
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
106+
private final boolean clusterHasFailureStoreFeature;
106107

107108
BulkOperation(
108109
Task task,
@@ -118,7 +119,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
118119
long startTimeNanos,
119120
ActionListener<BulkResponse> listener,
120121
FailureStoreMetrics failureStoreMetrics,
121-
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
122+
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
123+
boolean clusterHasFailureStoreFeature
122124
) {
123125
this(
124126
task,
@@ -136,7 +138,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
136138
new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()),
137139
new FailureStoreDocumentConverter(),
138140
failureStoreMetrics,
139-
dataStreamFailureStoreSettings
141+
dataStreamFailureStoreSettings,
142+
clusterHasFailureStoreFeature
140143
);
141144
}
142145

@@ -156,7 +159,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
156159
ClusterStateObserver observer,
157160
FailureStoreDocumentConverter failureStoreDocumentConverter,
158161
FailureStoreMetrics failureStoreMetrics,
159-
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
162+
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
163+
boolean clusterHasFailureStoreFeature
160164
) {
161165
super(listener);
162166
this.task = task;
@@ -177,6 +181,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
177181
this.shortCircuitShardFailures.putAll(bulkRequest.incrementalState().shardLevelFailures());
178182
this.failureStoreMetrics = failureStoreMetrics;
179183
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
184+
this.clusterHasFailureStoreFeature = clusterHasFailureStoreFeature;
180185
}
181186

182187
@Override
@@ -556,7 +561,7 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques
556561
DataStream failureStoreCandidate = getRedirectTargetCandidate(docWriteRequest, projectMetadata);
557562
// If the candidate is not null, the BulkItemRequest targets a data stream, but we'll still have to check if
558563
// it has the failure store enabled.
559-
if (failureStoreCandidate != null) {
564+
if (failureStoreCandidate != null && clusterHasFailureStoreFeature) {
560565
// Do not redirect documents to a failure store that were already headed to one.
561566
var isFailureStoreRequest = isFailureStoreRequest(docWriteRequest);
562567
if (isFailureStoreRequest == false

0 commit comments

Comments
 (0)