diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java index 024f0af4ab9b2..9a15d59e93f00 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java @@ -12,6 +12,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction; import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher; import org.elasticsearch.features.FeatureSpecification; @@ -39,12 +40,22 @@ public Map getHistoricalFeatures() { @Override public Set getFeatures() { - return Set.of( - DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12 - LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13 - DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE, - DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14 - ); + if (DataStream.isFailureStoreFeatureFlagEnabled()) { + return Set.of( + DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12 + LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13 + DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE, + DataStreamGlobalRetention.GLOBAL_RETENTION, // Added in 8.14 + DataStream.DATA_STREAM_FAILURE_STORE_FEATURE // Added in 8.19 + ); + } else { + return Set.of( + DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12 + LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13 + DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE, + DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14 + ); + } } @Override diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml index 0d19f555d10a4..ea3a506c34394 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml @@ -217,7 +217,7 @@ setup: capabilities: - method: POST path: /_index_template/{template} - capabilities: [ 'failure_store_in_template' ] + capabilities: [ 'data_stream_options.failure_store' ] - do: ingest.put_pipeline: @@ -643,7 +643,7 @@ setup: capabilities: - method: POST path: /_index_template/{template} - capabilities: [ 'failure_store_in_template' ] + capabilities: [ 'data_stream_options.failure_store' ] - do: allowed_warnings: @@ -739,7 +739,7 @@ setup: capabilities: - method: POST path: /_index_template/{template} - capabilities: [ 'failure_store_in_template' ] + capabilities: [ 'data_stream_options.failure_store' ] - do: allowed_warnings: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml index 9ea3bfefabdf8..7c6b87a48f92e 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml @@ -190,7 +190,7 @@ TSDB failures go to failure store: capabilities: - method: POST path: /_index_template/{template} - capabilities: [ 'failure_store_in_template' ] + capabilities: [ 'data_stream_options.failure_store' ] - do: allowed_warnings: - "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation" diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml index f439cf59bf2d3..4589e4fc7d821 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml @@ -97,7 +97,7 @@ capabilities: - method: POST path: /_index_template/{template} - capabilities: [ 'failure_store_in_template' ] + capabilities: [ 'data_stream_options.failure_store' ] - do: allowed_warnings: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml index 0b4fe28f3961d..8d271842e5af9 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml @@ -8,7 +8,7 @@ setup: capabilities: [ 'failure_store_status' ] - method: POST path: /_index_template/{template} - capabilities: [ 'failure_store_in_template' ] + capabilities: [ 'data_stream_options.failure_store' ] --- teardown: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/210_rollover_failure_store.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/210_rollover_failure_store.yml index 51a1e96b1e937..cb667fa6b07f3 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/210_rollover_failure_store.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/210_rollover_failure_store.yml @@ -6,10 +6,10 @@ setup: capabilities: - method: POST path: /_index_template/{template} - capabilities: [ 'failure_store_in_template' ] + capabilities: [ 'data_stream_options.failure_store' ] - method: POST path: /{index}/_rollover - capabilities: [ 'lazy-rollover-failure-store', 'index-expression-selectors' ] + capabilities: [ 'index_expression_selectors' ] - do: allowed_warnings: @@ -313,7 +313,7 @@ teardown: capabilities: - method: POST path: /{index}/_rollover - capabilities: [lazy-rollover-failure-store] + capabilities: [index_expression_selectors] # Initialize failure store - do: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml index a2c46f0ca4042..e43f84113883d 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/220_failure_store_cluster_setting.yml @@ -1,6 +1,7 @@ setup: - requires: - reason: "Data stream options was added in 8.18+" + cluster_features: [ "data_stream.failure_store" ] + reason: "Failure store GA in 8.19+" test_runner_features: [ capabilities, allowed_warnings, contains ] capabilities: - method: POST @@ -8,10 +9,7 @@ setup: capabilities: [ 'failure_store_status' ] - method: POST path: /_index_template/{template} - capabilities: [ 'failure_store_in_template' ] - - method: PUT - path: /_cluster/settings - capabilities: [ 'data_stream_failure_store_cluster_setting' ] + capabilities: [ 'data_stream_options.failure_store' ] - do: cluster.put_settings: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/230_data_stream_options.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/230_data_stream_options.yml index d62a7eba5d316..1356fd4910e5f 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/230_data_stream_options.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/230_data_stream_options.yml @@ -6,7 +6,7 @@ setup: capabilities: - method: POST path: /_index_template/{template} - capabilities: [ 'failure_store_in_template' ] + capabilities: [ 'data_stream_options.failure_store' ] - do: allowed_warnings: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml index 60500767213af..044dc86ccf810 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml @@ -55,7 +55,7 @@ capabilities: - method: POST path: /_index_template/{template} - capabilities: [ 'failure_store_in_template' ] + capabilities: [ 'data_stream_options.failure_store' ] - do: allowed_warnings: diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index ad614019b9a73..4d350df121f26 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -99,6 +99,7 @@ final class BulkOperation extends ActionRunnable { private final Map shortCircuitShardFailures = ConcurrentCollections.newConcurrentMap(); private final FailureStoreMetrics failureStoreMetrics; private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings; + private final boolean clusterHasFailureStoreFeature; BulkOperation( Task task, @@ -113,7 +114,8 @@ final class BulkOperation extends ActionRunnable { long startTimeNanos, ActionListener listener, FailureStoreMetrics failureStoreMetrics, - DataStreamFailureStoreSettings dataStreamFailureStoreSettings + DataStreamFailureStoreSettings dataStreamFailureStoreSettings, + boolean clusterHasFailureStoreFeature ) { this( task, @@ -130,7 +132,8 @@ final class BulkOperation extends ActionRunnable { new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()), new FailureStoreDocumentConverter(), failureStoreMetrics, - dataStreamFailureStoreSettings + dataStreamFailureStoreSettings, + clusterHasFailureStoreFeature ); } @@ -149,7 +152,8 @@ final class BulkOperation extends ActionRunnable { ClusterStateObserver observer, FailureStoreDocumentConverter failureStoreDocumentConverter, FailureStoreMetrics failureStoreMetrics, - DataStreamFailureStoreSettings dataStreamFailureStoreSettings + DataStreamFailureStoreSettings dataStreamFailureStoreSettings, + boolean clusterHasFailureStoreFeature ) { super(listener); this.task = task; @@ -169,6 +173,7 @@ final class BulkOperation extends ActionRunnable { this.shortCircuitShardFailures.putAll(bulkRequest.incrementalState().shardLevelFailures()); this.failureStoreMetrics = failureStoreMetrics; this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings; + this.clusterHasFailureStoreFeature = clusterHasFailureStoreFeature; } @Override @@ -543,7 +548,7 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques DataStream failureStoreCandidate = getRedirectTargetCandidate(docWriteRequest, clusterState.metadata()); // If the candidate is not null, the BulkItemRequest targets a data stream, but we'll still have to check if // it has the failure store enabled. - if (failureStoreCandidate != null) { + if (failureStoreCandidate != null && clusterHasFailureStoreFeature) { // Do not redirect documents to a failure store that were already headed to one. var isFailureStoreRequest = isFailureStoreRequest(docWriteRequest); if (isFailureStoreRequest == false diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 64c1debe075f6..8c743a289f756 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -578,6 +578,11 @@ void executeBulk( Executor executor, AtomicArray responses ) { + // Determine if we have the feature enabled once for entire bulk operation + final boolean clusterSupportsFailureStore = featureService.clusterHasFeature( + clusterService.state(), + DataStream.DATA_STREAM_FAILURE_STORE_FEATURE + ); new BulkOperation( task, threadPool, @@ -591,7 +596,8 @@ void executeBulk( startTimeNanos, listener, failureStoreMetrics, - dataStreamFailureStoreSettings + dataStreamFailureStoreSettings, + clusterSupportsFailureStore ).run(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 2eba5c0cf30b4..2fb8ab8dfda79 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -35,6 +35,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; @@ -72,13 +73,14 @@ public final class DataStream implements SimpleDiffable, ToXContentO private static final Logger LOGGER = LogManager.getLogger(DataStream.class); - public static final FeatureFlag FAILURE_STORE_FEATURE_FLAG = new FeatureFlag("failure_store"); + public static final boolean FAILURE_STORE_FEATURE_FLAG = new FeatureFlag("failure_store").isEnabled(); + public static final NodeFeature DATA_STREAM_FAILURE_STORE_FEATURE = new NodeFeature("data_stream.failure_store"); public static final TransportVersion ADDED_FAILURE_STORE_TRANSPORT_VERSION = TransportVersions.V_8_12_0; public static final TransportVersion ADDED_AUTO_SHARDING_EVENT_VERSION = TransportVersions.V_8_14_0; public static final TransportVersion ADD_DATA_STREAM_OPTIONS_VERSION = TransportVersions.V_8_16_0; public static boolean isFailureStoreFeatureFlagEnabled() { - return FAILURE_STORE_FEATURE_FLAG.isEnabled(); + return FAILURE_STORE_FEATURE_FLAG; } public static final String BACKING_INDEX_PREFIX = ".ds-"; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index f62f0f1503e9c..12b5e614f1a90 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -57,6 +57,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; +import org.elasticsearch.features.FeatureService; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.grok.MatcherWatchdog; import org.elasticsearch.index.IndexSettings; @@ -125,6 +126,7 @@ public class IngestService implements ClusterStateApplier, ReportingService> ingestClusterStateListeners = new CopyOnWriteArrayList<>(); private volatile ClusterState state; + private final FeatureService featureService; private static BiFunction createScheduler(ThreadPool threadPool) { return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic()); @@ -197,7 +199,8 @@ public IngestService( Client client, MatcherWatchdog matcherWatchdog, DocumentParsingProvider documentParsingProvider, - FailureStoreMetrics failureStoreMetrics + FailureStoreMetrics failureStoreMetrics, + FeatureService featureService ) { this.clusterService = clusterService; this.scriptService = scriptService; @@ -220,6 +223,7 @@ public IngestService( this.threadPool = threadPool; this.taskQueue = clusterService.createTaskQueue("ingest-pipelines", Priority.NORMAL, PIPELINE_TASK_EXECUTOR); this.failureStoreMetrics = failureStoreMetrics; + this.featureService = featureService; } /** @@ -237,6 +241,7 @@ public IngestService( this.pipelines = ingestService.pipelines; this.state = ingestService.state; this.failureStoreMetrics = ingestService.failureStoreMetrics; + this.featureService = ingestService.featureService; } private static Map processorFactories(List ingestPlugins, Processor.Parameters parameters) { @@ -785,6 +790,9 @@ public void executeBulkRequest( ) { assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]"; + // Adapt handler to ensure node features during ingest logic + final Function adaptedResolveFailureStore = wrapResolverWithFeatureCheck(resolveFailureStore); + executor.execute(new AbstractRunnable() { @Override @@ -870,7 +878,7 @@ public void onFailure(Exception e) { } ); - executePipelines(pipelines, indexRequest, ingestDocument, resolveFailureStore, documentListener); + executePipelines(pipelines, indexRequest, ingestDocument, adaptedResolveFailureStore, documentListener); assert actionRequest.index() != null; i++; @@ -880,6 +888,28 @@ public void onFailure(Exception e) { }); } + /** + * Adapts failure store resolver function so that if the failure store node feature is not present on every node it reverts to the + * old ingest behavior. + * @param resolveFailureStore Function that surfaces if failures for an index should be redirected to failure store. + * @return An adapted function that mutes the original if the cluster does not have the node feature universally applied. + */ + private Function wrapResolverWithFeatureCheck(Function resolveFailureStore) { + final boolean clusterHasFailureStoreFeature = featureService.clusterHasFeature( + clusterService.state(), + DataStream.DATA_STREAM_FAILURE_STORE_FEATURE + ); + return (indexName) -> { + if (clusterHasFailureStoreFeature) { + return resolveFailureStore.apply(indexName); + } else { + // If we get a non-null result but the cluster is not yet fully updated with required node features, + // force the result null to maintain old logic until all nodes are updated + return null; + } + }; + } + /** * Returns the pipelines of the request, and updates the request so that it no longer references * any pipelines (both the default and final pipeline are set to the noop pipeline). diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 7571320c87588..f2b4d4759c72f 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -696,6 +696,8 @@ private void construct( modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider); + FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class)); + FailureStoreMetrics failureStoreMetrics = new FailureStoreMetrics(telemetryProvider.getMeterRegistry()); final IngestService ingestService = new IngestService( clusterService, @@ -707,7 +709,8 @@ private void construct( client, IngestService.createGrokThreadWatchdog(environment, threadPool), documentParsingProvider, - failureStoreMetrics + failureStoreMetrics, + featureService ); SystemIndices systemIndices = createSystemIndices(settings); @@ -787,8 +790,6 @@ private void construct( final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry); - FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class)); - if (DiscoveryNode.isMasterNode(settings)) { clusterService.addListener(new SystemIndexMappingUpdateService(systemIndices, client)); clusterService.addListener( diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterUpdateSettingsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterUpdateSettingsAction.java index b4641a49d6977..112d34136bc99 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterUpdateSettingsAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.client.internal.node.NodeClient; -import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; @@ -34,9 +33,6 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler { private static final String PERSISTENT = "persistent"; private static final String TRANSIENT = "transient"; - // TODO: Remove this and use a single cluster feature / capability for the whole failure store feature when the feature flag is removed - private static final String DATA_STREAM_FAILURE_STORE_CLUSTER_SETTING_CAPABILITY = "data_stream_failure_store_cluster_setting"; - @Override public List routes() { return List.of(new Route(PUT, "/_cluster/settings")); @@ -78,8 +74,4 @@ public boolean canTripCircuitBreaker() { return false; } - @Override - public Set supportedCapabilities() { - return DataStream.isFailureStoreFeatureFlagEnabled() ? Set.of(DATA_STREAM_FAILURE_STORE_CLUSTER_SETTING_CAPABILITY) : Set.of(); - } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComponentTemplateAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComponentTemplateAction.java index 7013a444866f9..0b10e9ec8b9b8 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComponentTemplateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComponentTemplateAction.java @@ -30,7 +30,7 @@ @ServerlessScope(Scope.PUBLIC) public class RestPutComponentTemplateAction extends BaseRestHandler { - public static final String SUPPORTS_FAILURE_STORE = "failure_store_in_template"; + public static final String SUPPORTS_FAILURE_STORE = "data_stream_options.failure_store"; private static final Set capabilities = Set.of(SUPPORTS_FAILURE_STORE); @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java index 83abc0555b0b8..d86ace42cdd0d 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java @@ -50,7 +50,7 @@ public String getName() { @Override public Set supportedCapabilities() { if (DataStream.isFailureStoreFeatureFlagEnabled()) { - return Set.of("return-404-on-missing-target", "lazy-rollover-failure-store", "index-expression-selectors"); + return Set.of("return-404-on-missing-target", "index_expression_selectors"); } else { return Set.of("return-404-on-missing-target"); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/SearchCapabilities.java b/server/src/main/java/org/elasticsearch/rest/action/search/SearchCapabilities.java index 5fb8759374865..f2ced999426bb 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/SearchCapabilities.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/SearchCapabilities.java @@ -43,6 +43,8 @@ private SearchCapabilities() {} private static final String OPTIMIZED_SCALAR_QUANTIZATION_BBQ = "optimized_scalar_quantization_bbq"; private static final String KNN_QUANTIZED_VECTOR_RESCORE_OVERSAMPLE = "knn_quantized_vector_rescore_oversample"; + private static final String INDEX_SELECTOR_SYNTAX = "index_expression_selectors"; + public static final Set CAPABILITIES; static { HashSet capabilities = new HashSet<>(); @@ -58,6 +60,7 @@ private SearchCapabilities() {} capabilities.add(K_DEFAULT_TO_SIZE); capabilities.add(KQL_QUERY_SUPPORTED); capabilities.add(RRF_WINDOW_SIZE_SUPPORT_DEPRECATED); + capabilities.add(INDEX_SELECTOR_SYNTAX); CAPABILITIES = Set.copyOf(capabilities); } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java index 6155e11a127ec..af5a0d15da4a3 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java @@ -436,6 +436,45 @@ public void testFailingDocumentRedirectsToFailureStore() throws Exception { assertThat(failedItem.getFailureStoreStatus(), equalTo(IndexDocFailureStoreStatus.USED)); } + /** + * A bulk operation to a data stream with a failure store enabled should NOT redirect any documents that fail at a shard level to the + * failure store if the failure store node feature is not on every node in the cluster + */ + public void testFailingDocumentIgnoredByFailureStoreWhenFeatureIsDisabled() throws Exception { + Assume.assumeTrue(DataStream.isFailureStoreFeatureFlagEnabled()); + + // Requests that go to two separate shards + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest(fsDataStreamName).id("1").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE)); + bulkRequest.add(new IndexRequest(fsDataStreamName).id("3").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE)); + + NodeClient client = getNodeClient( + thatFailsDocuments(Map.of(new IndexAndId(ds2BackingIndex1.getIndex().getName(), "3"), () -> new MapperException("test"))) + ); + + BulkResponse bulkItemResponses = safeAwait( + l -> newBulkOperation( + DEFAULT_STATE, + client, + bulkRequest, + new AtomicArray<>(bulkRequest.numberOfActions()), + mockObserver(DEFAULT_STATE), + l, + new FailureStoreDocumentConverter(), + DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings()), + false + ).run() + ); + assertThat(bulkItemResponses.hasFailures(), is(true)); + BulkItemResponse failedItem = Arrays.stream(bulkItemResponses.getItems()) + .filter(BulkItemResponse::isFailed) + .findFirst() + .orElseThrow(() -> new AssertionError("Could not find redirected item")); + assertThat(failedItem.getFailure().getCause(), is(instanceOf(MapperException.class))); + assertThat(failedItem.getFailure().getCause().getMessage(), is(equalTo("test"))); + assertThat(failedItem.getFailureStoreStatus(), equalTo(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN)); + } + public void testFailingDocumentRedirectsToFailureStoreWhenEnabledByClusterSetting() { Assume.assumeTrue(DataStream.isFailureStoreFeatureFlagEnabled()); @@ -477,7 +516,8 @@ public void testFailingDocumentRedirectsToFailureStoreWhenEnabledByClusterSettin mockObserver(DEFAULT_STATE), l, new FailureStoreDocumentConverter(), - dataStreamFailureStoreSettings + dataStreamFailureStoreSettings, + true ).run() ); assertThat(bulkItemResponsesUsingClusterSetting.hasFailures(), is(false)); @@ -1170,7 +1210,8 @@ private BulkOperation newBulkOperation( observer, listener, failureStoreDocumentConverter, - DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings()) + DataStreamFailureStoreSettings.create(ClusterSettings.createBuiltInClusterSettings()), + true ); } @@ -1182,7 +1223,8 @@ private BulkOperation newBulkOperation( ClusterStateObserver observer, ActionListener listener, FailureStoreDocumentConverter failureStoreDocumentConverter, - DataStreamFailureStoreSettings dataStreamFailureStoreSettings + DataStreamFailureStoreSettings dataStreamFailureStoreSettings, + boolean failureStoreNodeFeatureEnabled ) { // Time provision long timeZero = TimeUnit.MILLISECONDS.toNanos(randomMillisUpToYear9999() - TimeUnit.DAYS.toMillis(1)); @@ -1215,7 +1257,8 @@ private BulkOperation newBulkOperation( observer, failureStoreDocumentConverter, FailureStoreMetrics.NOOP, - dataStreamFailureStoreSettings + dataStreamFailureStoreSettings, + failureStoreNodeFeatureEnabled ); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 2f033e4b5a383..e331893192632 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -31,6 +32,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.features.FeatureService; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.indices.EmptySystemIndices; @@ -51,6 +54,7 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -242,12 +246,12 @@ static class TestTransportBulkAction extends TransportBulkAction { IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier relativeTimeProvider ) { - super( - threadPool, - transportService, - clusterService, - null, - null, + super(threadPool, transportService, clusterService, null, new FeatureService(List.of()) { + @Override + public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { + return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); + } + }, client, actionFilters, indexNameExpressionResolver, diff --git a/server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java b/server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java index c45b7618a629e..e6081f6ad521c 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.service.ClusterService; @@ -25,6 +26,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.features.FeatureService; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.ingest.FakeProcessor; import org.elasticsearch.ingest.IngestInfo; @@ -51,7 +53,9 @@ import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -85,6 +89,9 @@ public void setup() { when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + var featureService = mock(FeatureService.class); + when(featureService.clusterHasFeature(any(), eq(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE))).thenReturn(true); + Client client = mock(Client.class); ingestService = new IngestService( mock(ClusterService.class), @@ -96,7 +103,8 @@ public void setup() { client, null, DocumentParsingProvider.EMPTY_INSTANCE, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + featureService ); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("set")); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index b6b2d18b291b0..6441d6004a9c0 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -48,8 +48,11 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Predicates; import org.elasticsearch.core.Strings; import org.elasticsearch.core.Tuple; +import org.elasticsearch.features.FeatureService; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; @@ -92,6 +95,7 @@ import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.LongSupplier; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils.executeAndAssertSuccessful; @@ -153,7 +157,13 @@ public void testIngestPlugin() { client, null, DocumentParsingProvider.EMPTY_INSTANCE, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + new FeatureService(List.of()) { + @Override + public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { + return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); + } + } ); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); @@ -174,7 +184,13 @@ public void testIngestPluginDuplicate() { client, null, DocumentParsingProvider.EMPTY_INSTANCE, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + new FeatureService(List.of()) { + @Override + public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { + return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); + } + } ) ); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); @@ -192,7 +208,13 @@ public void testExecuteIndexPipelineDoesNotExist() { client, null, DocumentParsingProvider.EMPTY_INSTANCE, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + new FeatureService(List.of()) { + @Override + public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { + return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); + } + } ); final IndexRequest indexRequest = new IndexRequest("_index").id("_id") .source(Map.of()) @@ -1665,6 +1687,67 @@ public void testExecuteFailureRedirection() throws Exception { verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } + public void testFailureRedirectionWithoutNodeFeatureEnabled() throws Exception { + final CompoundProcessor processor = mockCompoundProcessor(); + IngestService ingestService = createWithProcessors( + Map.of( + "mock", + (factories, tag, description, config) -> processor, + "set", + (factories, tag, description, config) -> new FakeProcessor("set", "", "", (ingestDocument) -> fail()) + ), + DocumentParsingProvider.EMPTY_INSTANCE, + Predicates.never() + ); + PutPipelineRequest putRequest1 = new PutPipelineRequest( + "_id1", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), + XContentType.JSON + ); + // given that set -> fail() above, it's a failure if a document executes against this pipeline + PutPipelineRequest putRequest2 = new PutPipelineRequest( + "_id2", + new BytesArray("{\"processors\": [{\"set\" : {}}]}"), + XContentType.JSON + ); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build(); + ClusterState previousClusterState = clusterState; + clusterState = executePut(putRequest1, clusterState); + clusterState = executePut(putRequest2, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index").id("_id") + .source(Map.of()) + .setPipeline("_id1") + .setFinalPipeline("_id2"); + doThrow(new RuntimeException()).when(processor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + final Function redirectCheck = (idx) -> indexRequest.index().equals(idx); + @SuppressWarnings("unchecked") + final TriConsumer redirectHandler = mock(TriConsumer.class); + @SuppressWarnings("unchecked") + final TriConsumer failureHandler = mock(TriConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + redirectCheck, + redirectHandler, + failureHandler, + completionHandler, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + verifyNoInteractions(redirectHandler); + verify(failureHandler, times(1)).apply( + eq(0), + any(RuntimeException.class), + eq(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN) + ); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + } + public void testExecuteFailureStatusOnFailureWithoutRedirection() throws Exception { final CompoundProcessor processor = mockCompoundProcessor(); IngestService ingestService = createWithProcessors( @@ -2353,7 +2436,13 @@ public Map getProcessors(Processor.Parameters paramet client, null, DocumentParsingProvider.EMPTY_INSTANCE, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + new FeatureService(List.of()) { + @Override + public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { + return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); + } + } ); ingestService.addIngestClusterStateListener(ingestClusterStateListener); @@ -2847,7 +2936,13 @@ private void testUpdatingPipeline(String pipelineString) throws Exception { client, null, DocumentParsingProvider.EMPTY_INSTANCE, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + new FeatureService(List.of()) { + @Override + public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { + return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); + } + } ); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState)); @@ -3121,12 +3216,17 @@ private static IngestService createWithProcessors() { } private static IngestService createWithProcessors(Map processors) { - return createWithProcessors(processors, DocumentParsingProvider.EMPTY_INSTANCE); + return createWithProcessors( + processors, + DocumentParsingProvider.EMPTY_INSTANCE, + DataStream.DATA_STREAM_FAILURE_STORE_FEATURE::equals + ); } private static IngestService createWithProcessors( Map processors, - DocumentParsingProvider documentParsingProvider + DocumentParsingProvider documentParsingProvider, + Predicate featureTest ) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); @@ -3147,7 +3247,13 @@ public Map getProcessors(final Processor.Parameters p client, null, documentParsingProvider, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + new FeatureService(List.of()) { + @Override + public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { + return featureTest.test(feature); + } + } ); if (randomBoolean()) { /* diff --git a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java index 94b3607bd7608..8e3dc8abd78b8 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java @@ -12,9 +12,13 @@ import org.elasticsearch.action.bulk.FailureStoreMetrics; import org.elasticsearch.action.bulk.SimulateBulkRequest; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.features.FeatureService; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.internal.DocumentParsingProvider; import org.elasticsearch.test.ESTestCase; @@ -133,7 +137,13 @@ public Map getProcessors(final Processor.Parameters p client, null, DocumentParsingProvider.EMPTY_INSTANCE, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + new FeatureService(List.of()) { + @Override + public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { + return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); + } + } ); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index c312052bf71d8..c601a7d3306e7 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -93,6 +93,7 @@ import org.elasticsearch.cluster.coordination.LeaderHeartbeatService; import org.elasticsearch.cluster.coordination.Reconfigurator; import org.elasticsearch.cluster.coordination.StatefulPreVoteCollector; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadataVerifier; @@ -134,6 +135,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.features.FeatureService; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; import org.elasticsearch.index.Index; @@ -2402,7 +2404,13 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { client, null, DocumentParsingProvider.EMPTY_INSTANCE, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + new FeatureService(List.of()) { + @Override + public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { + return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); + } + } ), mockFeatureService, client, diff --git a/x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml b/x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml index 899cec817a19d..d57ecd25cd435 100644 --- a/x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml +++ b/x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml @@ -35,7 +35,7 @@ Test failure store with logsdb: capabilities: [ 'failure_store_status' ] - method: POST path: /_index_template/{template} - capabilities: [ 'failure_store_in_template' ] + capabilities: [ 'data_stream_options.failure_store' ] reason: "Support for 'logsdb' index mode & failure status config in templates" - do: diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java index a310f2e2c2ce9..8f98d48d8cc1c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java @@ -12,6 +12,8 @@ import org.elasticsearch.action.bulk.FailureStoreMetrics; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.service.ClusterApplierService; @@ -21,6 +23,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.features.FeatureService; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.IngestStats; @@ -147,7 +151,13 @@ public void setUpVariables() { client, null, DocumentParsingProvider.EMPTY_INSTANCE, - FailureStoreMetrics.NOOP + FailureStoreMetrics.NOOP, + new FeatureService(List.of()) { + @Override + public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { + return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); + } + } ); }