Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.datastreams;

import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.features.FeatureSpecification;
import org.elasticsearch.features.NodeFeature;

Expand All @@ -27,7 +28,7 @@ public class DataStreamFeatures implements FeatureSpecification {

@Override
public Set<NodeFeature> getFeatures() {
return Set.of();
return DataStream.isFailureStoreFeatureFlagEnabled() ? Set.of(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE) : Set.of();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
ingest.put_pipeline:
Expand Down Expand Up @@ -643,7 +643,7 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
allowed_warnings:
Expand Down Expand Up @@ -739,7 +739,7 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
allowed_warnings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ TSDB failures go to failure store:
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]
- do:
allowed_warnings:
- "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
allowed_warnings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ setup:
capabilities: [ 'failure_store_status' ]
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

---
teardown:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]
- method: POST
path: /{index}/_rollover
capabilities: [ 'lazy-rollover-failure-store', 'index-expression-selectors' ]
capabilities: [ 'index_expression_selectors' ]

- do:
allowed_warnings:
Expand Down Expand Up @@ -313,7 +313,7 @@ teardown:
capabilities:
- method: POST
path: /{index}/_rollover
capabilities: [lazy-rollover-failure-store]
capabilities: [index_expression_selectors]

# Initialize failure store
- do:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
setup:
- requires:
reason: "Data stream options was added in 8.18+"
cluster_features: [ "data_stream.failure_store" ]
reason: "Failure store GA in 8.19+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
- method: PUT
path: /_cluster/settings
capabilities: [ 'data_stream_failure_store_cluster_setting' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
cluster.put_settings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ setup:
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
allowed_warnings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
capabilities: [ 'data_stream_options.failure_store' ]

- do:
allowed_warnings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
private final Map<ShardId, Exception> shortCircuitShardFailures = ConcurrentCollections.newConcurrentMap();
private final FailureStoreMetrics failureStoreMetrics;
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
private final boolean clusterHasFailureStoreFeature;

BulkOperation(
Task task,
Expand All @@ -118,7 +119,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
long startTimeNanos,
ActionListener<BulkResponse> listener,
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
boolean clusterHasFailureStoreFeature
) {
this(
task,
Expand All @@ -136,7 +138,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()),
new FailureStoreDocumentConverter(),
failureStoreMetrics,
dataStreamFailureStoreSettings
dataStreamFailureStoreSettings,
clusterHasFailureStoreFeature
);
}

Expand All @@ -156,7 +159,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
ClusterStateObserver observer,
FailureStoreDocumentConverter failureStoreDocumentConverter,
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
boolean clusterHasFailureStoreFeature
) {
super(listener);
this.task = task;
Expand All @@ -177,6 +181,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
this.shortCircuitShardFailures.putAll(bulkRequest.incrementalState().shardLevelFailures());
this.failureStoreMetrics = failureStoreMetrics;
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
this.clusterHasFailureStoreFeature = clusterHasFailureStoreFeature;
}

@Override
Expand Down Expand Up @@ -556,7 +561,7 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques
DataStream failureStoreCandidate = getRedirectTargetCandidate(docWriteRequest, projectMetadata);
// If the candidate is not null, the BulkItemRequest targets a data stream, but we'll still have to check if
// it has the failure store enabled.
if (failureStoreCandidate != null) {
if (failureStoreCandidate != null && clusterHasFailureStoreFeature) {
// Do not redirect documents to a failure store that were already headed to one.
var isFailureStoreRequest = isFailureStoreRequest(docWriteRequest);
if (isFailureStoreRequest == false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.indices.SystemIndices;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
private final OriginSettingClient rolloverClient;
private final FailureStoreMetrics failureStoreMetrics;
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
private final FeatureService featureService;

@Inject
public TransportBulkAction(
Expand All @@ -99,7 +101,8 @@ public TransportBulkAction(
SystemIndices systemIndices,
ProjectResolver projectResolver,
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
FeatureService featureService
) {
this(
threadPool,
Expand All @@ -114,7 +117,8 @@ public TransportBulkAction(
projectResolver,
threadPool::relativeTimeInNanos,
failureStoreMetrics,
dataStreamFailureStoreSettings
dataStreamFailureStoreSettings,
featureService
);
}

Expand All @@ -131,7 +135,8 @@ public TransportBulkAction(
ProjectResolver projectResolver,
LongSupplier relativeTimeProvider,
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
FeatureService featureService
) {
this(
TYPE,
Expand All @@ -148,7 +153,8 @@ public TransportBulkAction(
projectResolver,
relativeTimeProvider,
failureStoreMetrics,
dataStreamFailureStoreSettings
dataStreamFailureStoreSettings,
featureService
);
}

Expand All @@ -167,7 +173,8 @@ public TransportBulkAction(
ProjectResolver projectResolver,
LongSupplier relativeTimeProvider,
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
FeatureService featureService
) {
super(
bulkAction,
Expand All @@ -188,6 +195,7 @@ public TransportBulkAction(
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.rolloverClient = new OriginSettingClient(client, LAZY_ROLLOVER_ORIGIN);
this.failureStoreMetrics = failureStoreMetrics;
this.featureService = featureService;
}

public static <Response extends ReplicationResponse & WriteResponse> ActionListener<BulkResponse> unwrappingSingleItemBulkResponse(
Expand Down Expand Up @@ -590,6 +598,11 @@ void executeBulk(
Executor executor,
AtomicArray<BulkItemResponse> responses
) {
// Determine if we have the feature enabled once for entire bulk operation
final boolean clusterSupportsFailureStore = featureService.clusterHasFeature(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume even for a very large cluster, this check is cheap enough relatively to a bulk request that it's fine to run on every bulk? I think it just loops through all the nodes in the cluster state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed a linear check over the list of nodes in the cluster. Unfortunately there's not a great way to hoist this up further than a request-by-request basis without introducing some kind of timer element or observer interface. We need to be responsive to changes in the cluster in a timely manner, and refactoring things to build off a cluster state listener seems like more complexity than is worth at the moment. I'd be hard pressed to try and optimize this further without indication that it's in a bad place.

clusterService.state(),
DataStream.DATA_STREAM_FAILURE_STORE_FEATURE
);
new BulkOperation(
task,
threadPool,
Expand All @@ -604,7 +617,8 @@ void executeBulk(
startTimeNanos,
listener,
failureStoreMetrics,
dataStreamFailureStoreSettings
dataStreamFailureStoreSettings,
clusterSupportsFailureStore
).run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -76,6 +77,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
private static final Logger LOGGER = LogManager.getLogger(DataStream.class);

public static final boolean FAILURE_STORE_FEATURE_FLAG = new FeatureFlag("failure_store").isEnabled();
public static final NodeFeature DATA_STREAM_FAILURE_STORE_FEATURE = new NodeFeature("data_stream.failure_store");
public static final TransportVersion ADDED_FAILURE_STORE_TRANSPORT_VERSION = TransportVersions.V_8_12_0;
public static final TransportVersion ADDED_AUTO_SHARDING_EVENT_VERSION = TransportVersions.V_8_14_0;
public static final TransportVersion ADD_DATA_STREAM_OPTIONS_VERSION = TransportVersions.V_8_16_0;
Expand Down
34 changes: 32 additions & 2 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.env.Environment;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.grok.MatcherWatchdog;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -135,6 +136,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
private final List<Consumer<ClusterState>> ingestClusterStateListeners = new CopyOnWriteArrayList<>();
private volatile ClusterState state;
private final ProjectResolver projectResolver;
private final FeatureService featureService;

private static BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> createScheduler(ThreadPool threadPool) {
return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic());
Expand Down Expand Up @@ -221,7 +223,8 @@ public IngestService(
Client client,
MatcherWatchdog matcherWatchdog,
FailureStoreMetrics failureStoreMetrics,
ProjectResolver projectResolver
ProjectResolver projectResolver,
FeatureService featureService
) {
this.clusterService = clusterService;
this.scriptService = scriptService;
Expand All @@ -244,6 +247,7 @@ public IngestService(
this.taskQueue = clusterService.createTaskQueue("ingest-pipelines", Priority.NORMAL, PIPELINE_TASK_EXECUTOR);
this.failureStoreMetrics = failureStoreMetrics;
this.projectResolver = projectResolver;
this.featureService = featureService;
}

/**
Expand All @@ -261,6 +265,7 @@ public IngestService(
this.state = ingestService.state;
this.failureStoreMetrics = ingestService.failureStoreMetrics;
this.projectResolver = ingestService.projectResolver;
this.featureService = ingestService.featureService;
}

private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins, Processor.Parameters parameters) {
Expand Down Expand Up @@ -838,6 +843,9 @@ public void executeBulkRequest(
) {
assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]";

// Adapt handler to ensure node features during ingest logic
final Function<String, Boolean> adaptedResolveFailureStore = wrapResolverWithFeatureCheck(resolveFailureStore);

executor.execute(new AbstractRunnable() {

@Override
Expand Down Expand Up @@ -920,7 +928,7 @@ public void onFailure(Exception e) {
}
);

executePipelines(pipelines, indexRequest, ingestDocument, resolveFailureStore, documentListener);
executePipelines(pipelines, indexRequest, ingestDocument, adaptedResolveFailureStore, documentListener);
assert actionRequest.index() != null;

i++;
Expand All @@ -930,6 +938,28 @@ public void onFailure(Exception e) {
});
}

/**
* Adapts failure store resolver function so that if the failure store node feature is not present on every node it reverts to the
* old ingest behavior.
* @param resolveFailureStore Function that surfaces if failures for an index should be redirected to failure store.
* @return An adapted function that mutes the original if the cluster does not have the node feature universally applied.
*/
private Function<String, Boolean> wrapResolverWithFeatureCheck(Function<String, Boolean> resolveFailureStore) {
final boolean clusterHasFailureStoreFeature = featureService.clusterHasFeature(
clusterService.state(),
DataStream.DATA_STREAM_FAILURE_STORE_FEATURE
);
return (indexName) -> {
if (clusterHasFailureStoreFeature) {
return resolveFailureStore.apply(indexName);
} else {
// If we get a non-null result but the cluster is not yet fully updated with required node features,
// force the result null to maintain old logic until all nodes are updated
return null;
}
};
}

/**
* Returns the pipelines of the request, and updates the request so that it no longer references
* any pipelines (both the default and final pipeline are set to the noop pipeline).
Expand Down
Loading