diff --git a/build-tools-internal/version.properties b/build-tools-internal/version.properties
index 0df89b184badc..2e32afe078e74 100644
--- a/build-tools-internal/version.properties
+++ b/build-tools-internal/version.properties
@@ -1,5 +1,5 @@
elasticsearch = 9.1.0
-lucene = 10.2.0
+lucene = 10.2.1-snapshot-ae6484f43e6
bundled_jdk_vendor = openjdk
bundled_jdk = 24+36@1f9ff9062db4449d8ca828c504ffae90
diff --git a/docs/Versions.asciidoc b/docs/Versions.asciidoc
index 58195d7313a5a..5066c315ecfd6 100644
--- a/docs/Versions.asciidoc
+++ b/docs/Versions.asciidoc
@@ -1,8 +1,8 @@
include::{docs-root}/shared/versions/stack/{source_branch}.asciidoc[]
-:lucene_version: 10.2.0
-:lucene_version_path: 10_2_0
+:lucene_version: 10.2.1
+:lucene_version_path: 10_2_1
:jdk: 11.0.2
:jdk_major: 11
:build_type: tar
diff --git a/gradle/verification-metadata.xml b/gradle/verification-metadata.xml
index 38b174fd5c2a4..f420ca79e7287 100644
--- a/gradle/verification-metadata.xml
+++ b/gradle/verification-metadata.xml
@@ -2961,129 +2961,129 @@
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java
index 506c107b382a1..329b11c5c17bd 100644
--- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java
+++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java
@@ -9,6 +9,7 @@
package org.elasticsearch.datastreams;
+import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.features.FeatureSpecification;
import org.elasticsearch.features.NodeFeature;
@@ -27,7 +28,7 @@ public class DataStreamFeatures implements FeatureSpecification {
@Override
public Set getFeatures() {
- return Set.of();
+ return DataStream.isFailureStoreFeatureFlagEnabled() ? Set.of(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE) : Set.of();
}
@Override
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml
index 0d19f555d10a4..ea3a506c34394 100644
--- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml
@@ -217,7 +217,7 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
- capabilities: [ 'failure_store_in_template' ]
+ capabilities: [ 'data_stream_options.failure_store' ]
- do:
ingest.put_pipeline:
@@ -643,7 +643,7 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
- capabilities: [ 'failure_store_in_template' ]
+ capabilities: [ 'data_stream_options.failure_store' ]
- do:
allowed_warnings:
@@ -739,7 +739,7 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
- capabilities: [ 'failure_store_in_template' ]
+ capabilities: [ 'data_stream_options.failure_store' ]
- do:
allowed_warnings:
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml
index 884adb5458102..c986e715ce3bd 100644
--- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml
@@ -190,7 +190,7 @@ TSDB failures go to failure store:
capabilities:
- method: POST
path: /_index_template/{template}
- capabilities: [ 'failure_store_in_template' ]
+ capabilities: [ 'data_stream_options.failure_store' ]
- do:
allowed_warnings:
- "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml
index f439cf59bf2d3..4589e4fc7d821 100644
--- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml
@@ -97,7 +97,7 @@
capabilities:
- method: POST
path: /_index_template/{template}
- capabilities: [ 'failure_store_in_template' ]
+ capabilities: [ 'data_stream_options.failure_store' ]
- do:
allowed_warnings:
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml
index 0b4fe28f3961d..8d271842e5af9 100644
--- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml
@@ -8,7 +8,7 @@ setup:
capabilities: [ 'failure_store_status' ]
- method: POST
path: /_index_template/{template}
- capabilities: [ 'failure_store_in_template' ]
+ capabilities: [ 'data_stream_options.failure_store' ]
---
teardown:
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/210_rollover_failure_store.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/210_rollover_failure_store.yml
index 51a1e96b1e937..cb667fa6b07f3 100644
--- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/210_rollover_failure_store.yml
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/210_rollover_failure_store.yml
@@ -6,10 +6,10 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
- capabilities: [ 'failure_store_in_template' ]
+ capabilities: [ 'data_stream_options.failure_store' ]
- method: POST
path: /{index}/_rollover
- capabilities: [ 'lazy-rollover-failure-store', 'index-expression-selectors' ]
+ capabilities: [ 'index_expression_selectors' ]
- do:
allowed_warnings:
@@ -313,7 +313,7 @@ teardown:
capabilities:
- method: POST
path: /{index}/_rollover
- capabilities: [lazy-rollover-failure-store]
+ capabilities: [index_expression_selectors]
# Initialize failure store
- do:
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml
index a2c46f0ca4042..e43f84113883d 100644
--- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml
@@ -1,6 +1,7 @@
setup:
- requires:
- reason: "Data stream options was added in 8.18+"
+ cluster_features: [ "data_stream.failure_store" ]
+ reason: "Failure store GA in 8.19+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
@@ -8,10 +9,7 @@ setup:
capabilities: [ 'failure_store_status' ]
- method: POST
path: /_index_template/{template}
- capabilities: [ 'failure_store_in_template' ]
- - method: PUT
- path: /_cluster/settings
- capabilities: [ 'data_stream_failure_store_cluster_setting' ]
+ capabilities: [ 'data_stream_options.failure_store' ]
- do:
cluster.put_settings:
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/230_data_stream_options.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/230_data_stream_options.yml
index d62a7eba5d316..1356fd4910e5f 100644
--- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/230_data_stream_options.yml
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/230_data_stream_options.yml
@@ -6,7 +6,7 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
- capabilities: [ 'failure_store_in_template' ]
+ capabilities: [ 'data_stream_options.failure_store' ]
- do:
allowed_warnings:
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml
index 60500767213af..044dc86ccf810 100644
--- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml
@@ -55,7 +55,7 @@
capabilities:
- method: POST
path: /_index_template/{template}
- capabilities: [ 'failure_store_in_template' ]
+ capabilities: [ 'data_stream_options.failure_store' ]
- do:
allowed_warnings:
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java
index b687bd2ef8e76..be3cf58e9e0d0 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java
@@ -103,6 +103,7 @@ final class BulkOperation extends ActionRunnable {
private final Map shortCircuitShardFailures = ConcurrentCollections.newConcurrentMap();
private final FailureStoreMetrics failureStoreMetrics;
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
+ private final boolean clusterHasFailureStoreFeature;
BulkOperation(
Task task,
@@ -118,7 +119,8 @@ final class BulkOperation extends ActionRunnable {
long startTimeNanos,
ActionListener listener,
FailureStoreMetrics failureStoreMetrics,
- DataStreamFailureStoreSettings dataStreamFailureStoreSettings
+ DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
+ boolean clusterHasFailureStoreFeature
) {
this(
task,
@@ -136,7 +138,8 @@ final class BulkOperation extends ActionRunnable {
new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()),
new FailureStoreDocumentConverter(),
failureStoreMetrics,
- dataStreamFailureStoreSettings
+ dataStreamFailureStoreSettings,
+ clusterHasFailureStoreFeature
);
}
@@ -156,7 +159,8 @@ final class BulkOperation extends ActionRunnable {
ClusterStateObserver observer,
FailureStoreDocumentConverter failureStoreDocumentConverter,
FailureStoreMetrics failureStoreMetrics,
- DataStreamFailureStoreSettings dataStreamFailureStoreSettings
+ DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
+ boolean clusterHasFailureStoreFeature
) {
super(listener);
this.task = task;
@@ -177,6 +181,7 @@ final class BulkOperation extends ActionRunnable {
this.shortCircuitShardFailures.putAll(bulkRequest.incrementalState().shardLevelFailures());
this.failureStoreMetrics = failureStoreMetrics;
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
+ this.clusterHasFailureStoreFeature = clusterHasFailureStoreFeature;
}
@Override
@@ -556,7 +561,7 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques
DataStream failureStoreCandidate = getRedirectTargetCandidate(docWriteRequest, projectMetadata);
// If the candidate is not null, the BulkItemRequest targets a data stream, but we'll still have to check if
// it has the failure store enabled.
- if (failureStoreCandidate != null) {
+ if (failureStoreCandidate != null && clusterHasFailureStoreFeature) {
// Do not redirect documents to a failure store that were already headed to one.
var isFailureStoreRequest = isFailureStoreRequest(docWriteRequest);
if (isFailureStoreRequest == false
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
index 5002628c083b4..d3915af457fda 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
@@ -45,6 +45,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.Nullable;
+import org.elasticsearch.features.FeatureService;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.indices.SystemIndices;
@@ -85,6 +86,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
private final OriginSettingClient rolloverClient;
private final FailureStoreMetrics failureStoreMetrics;
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
+ private final FeatureService featureService;
@Inject
public TransportBulkAction(
@@ -99,7 +101,8 @@ public TransportBulkAction(
SystemIndices systemIndices,
ProjectResolver projectResolver,
FailureStoreMetrics failureStoreMetrics,
- DataStreamFailureStoreSettings dataStreamFailureStoreSettings
+ DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
+ FeatureService featureService
) {
this(
threadPool,
@@ -114,7 +117,8 @@ public TransportBulkAction(
projectResolver,
threadPool::relativeTimeInNanos,
failureStoreMetrics,
- dataStreamFailureStoreSettings
+ dataStreamFailureStoreSettings,
+ featureService
);
}
@@ -131,7 +135,8 @@ public TransportBulkAction(
ProjectResolver projectResolver,
LongSupplier relativeTimeProvider,
FailureStoreMetrics failureStoreMetrics,
- DataStreamFailureStoreSettings dataStreamFailureStoreSettings
+ DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
+ FeatureService featureService
) {
this(
TYPE,
@@ -148,7 +153,8 @@ public TransportBulkAction(
projectResolver,
relativeTimeProvider,
failureStoreMetrics,
- dataStreamFailureStoreSettings
+ dataStreamFailureStoreSettings,
+ featureService
);
}
@@ -167,7 +173,8 @@ public TransportBulkAction(
ProjectResolver projectResolver,
LongSupplier relativeTimeProvider,
FailureStoreMetrics failureStoreMetrics,
- DataStreamFailureStoreSettings dataStreamFailureStoreSettings
+ DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
+ FeatureService featureService
) {
super(
bulkAction,
@@ -188,6 +195,7 @@ public TransportBulkAction(
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.rolloverClient = new OriginSettingClient(client, LAZY_ROLLOVER_ORIGIN);
this.failureStoreMetrics = failureStoreMetrics;
+ this.featureService = featureService;
}
public static ActionListener unwrappingSingleItemBulkResponse(
@@ -590,6 +598,11 @@ void executeBulk(
Executor executor,
AtomicArray responses
) {
+ // Determine if we have the feature enabled once for entire bulk operation
+ final boolean clusterSupportsFailureStore = featureService.clusterHasFeature(
+ clusterService.state(),
+ DataStream.DATA_STREAM_FAILURE_STORE_FEATURE
+ );
new BulkOperation(
task,
threadPool,
@@ -604,7 +617,8 @@ void executeBulk(
startTimeNanos,
listener,
failureStoreMetrics,
- dataStreamFailureStoreSettings
+ dataStreamFailureStoreSettings,
+ clusterSupportsFailureStore
).run();
}
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
index 607e3a8a07efb..937ef522b83d8 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
@@ -38,6 +38,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
+import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
@@ -76,6 +77,7 @@ public final class DataStream implements SimpleDiffable, ToXContentO
private static final Logger LOGGER = LogManager.getLogger(DataStream.class);
public static final boolean FAILURE_STORE_FEATURE_FLAG = new FeatureFlag("failure_store").isEnabled();
+ public static final NodeFeature DATA_STREAM_FAILURE_STORE_FEATURE = new NodeFeature("data_stream.failure_store");
public static final TransportVersion ADDED_FAILURE_STORE_TRANSPORT_VERSION = TransportVersions.V_8_12_0;
public static final TransportVersion ADDED_AUTO_SHARDING_EVENT_VERSION = TransportVersions.V_8_14_0;
public static final TransportVersion ADD_DATA_STREAM_OPTIONS_VERSION = TransportVersions.V_8_16_0;
diff --git a/server/src/main/java/org/elasticsearch/index/IndexVersions.java b/server/src/main/java/org/elasticsearch/index/IndexVersions.java
index 4c07be7d9200b..8af1caf8c20de 100644
--- a/server/src/main/java/org/elasticsearch/index/IndexVersions.java
+++ b/server/src/main/java/org/elasticsearch/index/IndexVersions.java
@@ -160,6 +160,7 @@ private static Version parseUnchecked(String version) {
public static final IndexVersion SYNTHETIC_SOURCE_STORE_ARRAYS_NATIVELY_SCALED_FLOAT = def(9_020_0_00, Version.LUCENE_10_1_0);
public static final IndexVersion USE_LUCENE101_POSTINGS_FORMAT = def(9_021_0_00, Version.LUCENE_10_1_0);
public static final IndexVersion UPGRADE_TO_LUCENE_10_2_0 = def(9_022_00_0, Version.LUCENE_10_2_0);
+ public static final IndexVersion UPGRADE_TO_LUCENE_10_2_1 = def(9_023_00_0, Version.LUCENE_10_2_1);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java
index 396572709f2cd..165632a69c783 100644
--- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java
+++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java
@@ -67,6 +67,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.env.Environment;
+import org.elasticsearch.features.FeatureService;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.grok.MatcherWatchdog;
import org.elasticsearch.index.IndexSettings;
@@ -135,6 +136,7 @@ public class IngestService implements ClusterStateApplier, ReportingService> ingestClusterStateListeners = new CopyOnWriteArrayList<>();
private volatile ClusterState state;
private final ProjectResolver projectResolver;
+ private final FeatureService featureService;
private static BiFunction createScheduler(ThreadPool threadPool) {
return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic());
@@ -221,7 +223,8 @@ public IngestService(
Client client,
MatcherWatchdog matcherWatchdog,
FailureStoreMetrics failureStoreMetrics,
- ProjectResolver projectResolver
+ ProjectResolver projectResolver,
+ FeatureService featureService
) {
this.clusterService = clusterService;
this.scriptService = scriptService;
@@ -244,6 +247,7 @@ public IngestService(
this.taskQueue = clusterService.createTaskQueue("ingest-pipelines", Priority.NORMAL, PIPELINE_TASK_EXECUTOR);
this.failureStoreMetrics = failureStoreMetrics;
this.projectResolver = projectResolver;
+ this.featureService = featureService;
}
/**
@@ -261,6 +265,7 @@ public IngestService(
this.state = ingestService.state;
this.failureStoreMetrics = ingestService.failureStoreMetrics;
this.projectResolver = ingestService.projectResolver;
+ this.featureService = ingestService.featureService;
}
private static Map processorFactories(List ingestPlugins, Processor.Parameters parameters) {
@@ -838,6 +843,9 @@ public void executeBulkRequest(
) {
assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]";
+ // Adapt handler to ensure node features during ingest logic
+ final Function adaptedResolveFailureStore = wrapResolverWithFeatureCheck(resolveFailureStore);
+
executor.execute(new AbstractRunnable() {
@Override
@@ -920,7 +928,7 @@ public void onFailure(Exception e) {
}
);
- executePipelines(pipelines, indexRequest, ingestDocument, resolveFailureStore, documentListener);
+ executePipelines(pipelines, indexRequest, ingestDocument, adaptedResolveFailureStore, documentListener);
assert actionRequest.index() != null;
i++;
@@ -930,6 +938,28 @@ public void onFailure(Exception e) {
});
}
+ /**
+ * Adapts failure store resolver function so that if the failure store node feature is not present on every node it reverts to the
+ * old ingest behavior.
+ * @param resolveFailureStore Function that surfaces if failures for an index should be redirected to failure store.
+ * @return An adapted function that mutes the original if the cluster does not have the node feature universally applied.
+ */
+ private Function wrapResolverWithFeatureCheck(Function resolveFailureStore) {
+ final boolean clusterHasFailureStoreFeature = featureService.clusterHasFeature(
+ clusterService.state(),
+ DataStream.DATA_STREAM_FAILURE_STORE_FEATURE
+ );
+ return (indexName) -> {
+ if (clusterHasFailureStoreFeature) {
+ return resolveFailureStore.apply(indexName);
+ } else {
+ // If we get a non-null result but the cluster is not yet fully updated with required node features,
+ // force the result null to maintain old logic until all nodes are updated
+ return null;
+ }
+ };
+ }
+
/**
* Returns the pipelines of the request, and updates the request so that it no longer references
* any pipelines (both the default and final pipeline are set to the noop pipeline).
diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java
index 0a36cd9114158..7b7d2b257c424 100644
--- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java
+++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java
@@ -689,6 +689,8 @@ private void construct(
modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider);
+ FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class));
+
FailureStoreMetrics failureStoreMetrics = new FailureStoreMetrics(telemetryProvider.getMeterRegistry());
final IngestService ingestService = new IngestService(
clusterService,
@@ -700,7 +702,8 @@ private void construct(
client,
IngestService.createGrokThreadWatchdog(environment, threadPool),
failureStoreMetrics,
- projectResolver
+ projectResolver,
+ featureService
);
SystemIndices systemIndices = createSystemIndices(settings);
@@ -784,8 +787,6 @@ private void construct(
final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);
- FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class));
-
if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(new SystemIndexMappingUpdateService(systemIndices, client, projectResolver));
}
diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterUpdateSettingsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterUpdateSettingsAction.java
index b4641a49d6977..112d34136bc99 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterUpdateSettingsAction.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterUpdateSettingsAction.java
@@ -11,7 +11,6 @@
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.client.internal.node.NodeClient;
-import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
@@ -34,9 +33,6 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
private static final String PERSISTENT = "persistent";
private static final String TRANSIENT = "transient";
- // TODO: Remove this and use a single cluster feature / capability for the whole failure store feature when the feature flag is removed
- private static final String DATA_STREAM_FAILURE_STORE_CLUSTER_SETTING_CAPABILITY = "data_stream_failure_store_cluster_setting";
-
@Override
public List routes() {
return List.of(new Route(PUT, "/_cluster/settings"));
@@ -78,8 +74,4 @@ public boolean canTripCircuitBreaker() {
return false;
}
- @Override
- public Set supportedCapabilities() {
- return DataStream.isFailureStoreFeatureFlagEnabled() ? Set.of(DATA_STREAM_FAILURE_STORE_CLUSTER_SETTING_CAPABILITY) : Set.of();
- }
}
diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComponentTemplateAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComponentTemplateAction.java
index 7013a444866f9..0b10e9ec8b9b8 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComponentTemplateAction.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComponentTemplateAction.java
@@ -30,7 +30,7 @@
@ServerlessScope(Scope.PUBLIC)
public class RestPutComponentTemplateAction extends BaseRestHandler {
- public static final String SUPPORTS_FAILURE_STORE = "failure_store_in_template";
+ public static final String SUPPORTS_FAILURE_STORE = "data_stream_options.failure_store";
private static final Set capabilities = Set.of(SUPPORTS_FAILURE_STORE);
@Override
diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java
index d957ad3c0153d..4cb12da967054 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java
@@ -44,7 +44,7 @@ public String getName() {
@Override
public Set supportedCapabilities() {
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
- return Set.of("return-404-on-missing-target", "lazy-rollover-failure-store", "index-expression-selectors");
+ return Set.of("return-404-on-missing-target", "index_expression_selectors");
} else {
return Set.of("return-404-on-missing-target");
}
diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/SearchCapabilities.java b/server/src/main/java/org/elasticsearch/rest/action/search/SearchCapabilities.java
index 139e304ab9e3a..07b5e372c00bb 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/search/SearchCapabilities.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/search/SearchCapabilities.java
@@ -46,6 +46,8 @@ private SearchCapabilities() {}
private static final String HIGHLIGHT_MAX_ANALYZED_OFFSET_DEFAULT = "highlight_max_analyzed_offset_default";
+ private static final String INDEX_SELECTOR_SYNTAX = "index_expression_selectors";
+
public static final Set CAPABILITIES;
static {
HashSet capabilities = new HashSet<>();
@@ -63,6 +65,7 @@ private SearchCapabilities() {}
capabilities.add(K_DEFAULT_TO_SIZE);
capabilities.add(KQL_QUERY_SUPPORTED);
capabilities.add(HIGHLIGHT_MAX_ANALYZED_OFFSET_DEFAULT);
+ capabilities.add(INDEX_SELECTOR_SYNTAX);
CAPABILITIES = Set.copyOf(capabilities);
}
}
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java
index b48261b1ec6c9..13605f34ac708 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java
@@ -441,6 +441,45 @@ public void testFailingDocumentRedirectsToFailureStore() throws Exception {
assertThat(failedItem.getFailureStoreStatus(), equalTo(IndexDocFailureStoreStatus.USED));
}
+ /**
+ * A bulk operation to a data stream with a failure store enabled should NOT redirect any documents that fail at a shard level to the
+ * failure store if the failure store node feature is not on every node in the cluster
+ */
+ public void testFailingDocumentIgnoredByFailureStoreWhenFeatureIsDisabled() throws Exception {
+ Assume.assumeTrue(DataStream.isFailureStoreFeatureFlagEnabled());
+
+ // Requests that go to two separate shards
+ BulkRequest bulkRequest = new BulkRequest();
+ bulkRequest.add(new IndexRequest(fsDataStreamName).id("1").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE));
+ bulkRequest.add(new IndexRequest(fsDataStreamName).id("3").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE));
+
+ NodeClient client = getNodeClient(
+ thatFailsDocuments(Map.of(new IndexAndId(ds2BackingIndex1.getIndex().getName(), "3"), () -> new MapperException("test")))
+ );
+
+ BulkResponse bulkItemResponses = safeAwait(
+ l -> newBulkOperation(
+ clusterState,
+ client,
+ bulkRequest,
+ new AtomicArray<>(bulkRequest.numberOfActions()),
+ mockObserver(clusterState),
+ l,
+ new FailureStoreDocumentConverter(),
+ DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings()),
+ false
+ ).run()
+ );
+ assertThat(bulkItemResponses.hasFailures(), is(true));
+ BulkItemResponse failedItem = Arrays.stream(bulkItemResponses.getItems())
+ .filter(BulkItemResponse::isFailed)
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Could not find redirected item"));
+ assertThat(failedItem.getFailure().getCause(), is(instanceOf(MapperException.class)));
+ assertThat(failedItem.getFailure().getCause().getMessage(), is(equalTo("test")));
+ assertThat(failedItem.getFailureStoreStatus(), equalTo(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN));
+ }
+
public void testFailingDocumentRedirectsToFailureStoreWhenEnabledByClusterSetting() {
Assume.assumeTrue(DataStream.isFailureStoreFeatureFlagEnabled());
@@ -482,7 +521,8 @@ public void testFailingDocumentRedirectsToFailureStoreWhenEnabledByClusterSettin
mockObserver(clusterState),
l,
new FailureStoreDocumentConverter(),
- dataStreamFailureStoreSettings
+ dataStreamFailureStoreSettings,
+ true
).run()
);
assertThat(bulkItemResponsesUsingClusterSetting.hasFailures(), is(false));
@@ -1175,7 +1215,8 @@ private BulkOperation newBulkOperation(
observer,
listener,
failureStoreDocumentConverter,
- DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings())
+ DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings()),
+ true
);
}
@@ -1187,7 +1228,8 @@ private BulkOperation newBulkOperation(
ClusterStateObserver observer,
ActionListener listener,
FailureStoreDocumentConverter failureStoreDocumentConverter,
- DataStreamFailureStoreSettings dataStreamFailureStoreSettings
+ DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
+ boolean failureStoreNodeFeatureEnabled
) {
// Time provision
long timeZero = TimeUnit.MILLISECONDS.toNanos(randomMillisUpToYear9999() - TimeUnit.DAYS.toMillis(1));
@@ -1221,7 +1263,8 @@ private BulkOperation newBulkOperation(
observer,
failureStoreDocumentConverter,
FailureStoreMetrics.NOOP,
- dataStreamFailureStoreSettings
+ dataStreamFailureStoreSettings,
+ failureStoreNodeFeatureEnabled
);
}
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
index c61981ac43a6d..8081853d86f94 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
@@ -27,6 +27,7 @@
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -45,6 +46,7 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexingPressure;
@@ -165,7 +167,13 @@ class TestTransportBulkAction extends TransportBulkAction {
EmptySystemIndices.INSTANCE,
TestProjectResolvers.singleProject(projectId),
FailureStoreMetrics.NOOP,
- DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings())
+ DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings()),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
+ }
+ }
);
}
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
index 294cb1fbdb7db..985a119e25ab7 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
@@ -53,6 +53,7 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexVersion;
@@ -138,7 +139,13 @@ public ProjectId getProjectId() {
}
},
FailureStoreMetrics.NOOP,
- DataStreamFailureStoreSettings.create(clusterSettings)
+ DataStreamFailureStoreSettings.create(clusterSettings),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
+ }
+ }
);
}
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java
index 8515ef36b538e..200fa66deba04 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java
@@ -20,6 +20,8 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
@@ -32,6 +34,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.indices.EmptySystemIndices;
@@ -52,6 +56,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -256,7 +261,13 @@ static class TestTransportBulkAction extends TransportBulkAction {
TestProjectResolvers.DEFAULT_PROJECT_ONLY,
relativeTimeProvider,
FailureStoreMetrics.NOOP,
- DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings())
+ DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings()),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
+ }
+ }
);
}
}
diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java
index 260aac9c02128..2ab28c2151198 100644
--- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java
@@ -53,9 +53,12 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.FixForMultiProject;
+import org.elasticsearch.core.Predicates;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
+import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
@@ -97,6 +100,7 @@
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.Metadata.DEFAULT_PROJECT_ID;
@@ -160,7 +164,13 @@ public void testIngestPlugin() {
client,
null,
FailureStoreMetrics.NOOP,
- TestProjectResolvers.alwaysThrow()
+ TestProjectResolvers.alwaysThrow(),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
+ }
+ }
);
Map factories = ingestService.getProcessorFactories();
assertTrue(factories.containsKey("foo"));
@@ -181,7 +191,13 @@ public void testIngestPluginDuplicate() {
client,
null,
FailureStoreMetrics.NOOP,
- TestProjectResolvers.alwaysThrow()
+ TestProjectResolvers.alwaysThrow(),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
+ }
+ }
)
);
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
@@ -199,7 +215,13 @@ public void testExecuteIndexPipelineDoesNotExist() {
client,
null,
FailureStoreMetrics.NOOP,
- TestProjectResolvers.alwaysThrow()
+ TestProjectResolvers.alwaysThrow(),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
+ }
+ }
);
final IndexRequest indexRequest = new IndexRequest("_index").id("_id")
.source(Map.of())
@@ -1753,6 +1775,62 @@ public void testExecuteFailureRedirection() throws Exception {
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}
+ public void testFailureRedirectionWithoutNodeFeatureEnabled() throws Exception {
+ final CompoundProcessor processor = mockCompoundProcessor();
+ IngestService ingestService = createWithProcessors(
+ Map.of(
+ "mock",
+ (factories, tag, description, config, projectId) -> processor,
+ "set",
+ (factories, tag, description, config, projectId) -> new FakeProcessor("set", "", "", (ingestDocument) -> fail())
+ ),
+ Predicates.never()
+ );
+ PutPipelineRequest putRequest1 = putJsonPipelineRequest("_id1", "{\"processors\": [{\"mock\" : {}}]}");
+ // given that set -> fail() above, it's a failure if a document executes against this pipeline
+ PutPipelineRequest putRequest2 = putJsonPipelineRequest("_id2", "{\"processors\": [{\"set\" : {}}]}");
+ var projectId = randomProjectIdOrDefault();
+ ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+ .putProjectMetadata(ProjectMetadata.builder(projectId).build())
+ .build();
+ ClusterState previousClusterState = clusterState;
+ clusterState = executePut(projectId, putRequest1, clusterState);
+ clusterState = executePut(projectId, putRequest2, clusterState);
+ ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
+ final IndexRequest indexRequest = new IndexRequest("_index").id("_id")
+ .source(Map.of())
+ .setPipeline("_id1")
+ .setFinalPipeline("_id2");
+ doThrow(new RuntimeException()).when(processor)
+ .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any());
+ final Function redirectCheck = (idx) -> indexRequest.index().equals(idx);
+ @SuppressWarnings("unchecked")
+ final TriConsumer redirectHandler = mock(TriConsumer.class);
+ @SuppressWarnings("unchecked")
+ final TriConsumer failureHandler = mock(TriConsumer.class);
+ @SuppressWarnings("unchecked")
+ final BiConsumer completionHandler = mock(BiConsumer.class);
+ ingestService.executeBulkRequest(
+ projectId,
+ 1,
+ List.of(indexRequest),
+ indexReq -> {},
+ redirectCheck,
+ redirectHandler,
+ failureHandler,
+ completionHandler,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE
+ );
+ verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any());
+ verifyNoInteractions(redirectHandler);
+ verify(failureHandler, times(1)).apply(
+ eq(0),
+ any(RuntimeException.class),
+ eq(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN)
+ );
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
+ }
+
public void testExecuteFailureStatusOnFailureWithoutRedirection() throws Exception {
final CompoundProcessor processor = mockCompoundProcessor();
IngestService ingestService = createWithProcessors(
@@ -2440,7 +2518,13 @@ public Map getProcessors(Processor.Parameters paramet
client,
null,
FailureStoreMetrics.NOOP,
- TestProjectResolvers.alwaysThrow()
+ TestProjectResolvers.alwaysThrow(),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
+ }
+ }
);
ingestService.addIngestClusterStateListener(ingestClusterStateListener);
@@ -2929,7 +3013,13 @@ private void testUpdatingPipeline(String pipelineString) throws Exception {
client,
null,
FailureStoreMetrics.NOOP,
- TestProjectResolvers.alwaysThrow()
+ TestProjectResolvers.alwaysThrow(),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
+ }
+ }
);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState));
@@ -3244,6 +3334,10 @@ private static IngestService createWithProcessors() {
}
private static IngestService createWithProcessors(Map processors) {
+ return createWithProcessors(processors, DataStream.DATA_STREAM_FAILURE_STORE_FEATURE::equals);
+ }
+
+ private static IngestService createWithProcessors(Map processors, Predicate featureTest) {
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
@@ -3263,7 +3357,13 @@ public Map getProcessors(final Processor.Parameters p
client,
null,
FailureStoreMetrics.NOOP,
- TestProjectResolvers.alwaysThrow()
+ TestProjectResolvers.alwaysThrow(),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return featureTest.test(feature);
+ }
+ }
);
if (randomBoolean()) {
/*
diff --git a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java
index e9ec1f2d6260c..9cd3a74d9e73a 100644
--- a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java
@@ -12,11 +12,15 @@
import org.elasticsearch.action.bulk.FailureStoreMetrics;
import org.elasticsearch.action.bulk.SimulateBulkRequest;
import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@@ -135,7 +139,13 @@ public Map getProcessors(final Processor.Parameters p
client,
null,
FailureStoreMetrics.NOOP,
- TestProjectResolvers.singleProject(projectId)
+ TestProjectResolvers.singleProject(projectId),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
+ }
+ }
);
}
}
diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
index 8ca529d0707c1..cae33abb126af 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
@@ -92,6 +92,7 @@
import org.elasticsearch.cluster.coordination.LeaderHeartbeatService;
import org.elasticsearch.cluster.coordination.Reconfigurator;
import org.elasticsearch.cluster.coordination.StatefulPreVoteCollector;
+import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadataVerifier;
@@ -135,6 +136,7 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.index.Index;
@@ -2402,7 +2404,13 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() {
client,
null,
FailureStoreMetrics.NOOP,
- TestProjectResolvers.alwaysThrow()
+ TestProjectResolvers.alwaysThrow(),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
+ }
+ }
),
client,
actionFilters,
@@ -2411,7 +2419,13 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() {
EmptySystemIndices.INSTANCE,
TestProjectResolvers.DEFAULT_PROJECT_ONLY,
FailureStoreMetrics.NOOP,
- DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings())
+ DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings()),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
+ }
+ }
)
);
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(
diff --git a/x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml b/x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml
index 899cec817a19d..d57ecd25cd435 100644
--- a/x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml
+++ b/x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml
@@ -35,7 +35,7 @@ Test failure store with logsdb:
capabilities: [ 'failure_store_status' ]
- method: POST
path: /_index_template/{template}
- capabilities: [ 'failure_store_in_template' ]
+ capabilities: [ 'data_stream_options.failure_store' ]
reason: "Support for 'logsdb' index mode & failure status config in templates"
- do:
diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java
index 41947fb152adc..c4ec1fdc9eb59 100644
--- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java
+++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java
@@ -12,6 +12,8 @@
import org.elasticsearch.action.bulk.FailureStoreMetrics;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.TestProjectResolvers;
@@ -23,6 +25,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.IngestStats;
@@ -149,7 +153,13 @@ public void setUpVariables() {
client,
null,
FailureStoreMetrics.NOOP,
- TestProjectResolvers.alwaysThrow()
+ TestProjectResolvers.alwaysThrow(),
+ new FeatureService(List.of()) {
+ @Override
+ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
+ return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
+ }
+ }
);
}