Skip to content

Commit 34e944e

Browse files
committed
Use of failure store is now wrapped in cluster feature check
1 parent 3569947 commit 34e944e

File tree

4 files changed

+30
-9
lines changed

4 files changed

+30
-9
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2727
import org.elasticsearch.cluster.metadata.ComponentTemplate;
2828
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
29+
import org.elasticsearch.cluster.metadata.DataStream;
2930
import org.elasticsearch.cluster.metadata.ProjectId;
3031
import org.elasticsearch.cluster.metadata.ProjectMetadata;
3132
import org.elasticsearch.cluster.project.ProjectResolver;
@@ -36,6 +37,7 @@
3637
import org.elasticsearch.core.Assertions;
3738
import org.elasticsearch.core.Releasable;
3839
import org.elasticsearch.core.TimeValue;
40+
import org.elasticsearch.features.FeatureService;
3941
import org.elasticsearch.index.IndexingPressure;
4042
import org.elasticsearch.indices.SystemIndices;
4143
import org.elasticsearch.ingest.IngestService;
@@ -70,6 +72,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
7072
protected final Executor coordinationExecutor;
7173
protected final Executor systemCoordinationExecutor;
7274
private final ActionType<BulkResponse> bulkAction;
75+
protected final FeatureService featureService;
7376

7477
public TransportAbstractBulkAction(
7578
ActionType<BulkResponse> action,
@@ -82,7 +85,8 @@ public TransportAbstractBulkAction(
8285
IndexingPressure indexingPressure,
8386
SystemIndices systemIndices,
8487
ProjectResolver projectResolver,
85-
LongSupplier relativeTimeNanosProvider
88+
LongSupplier relativeTimeNanosProvider,
89+
FeatureService featureService
8690
) {
8791
super(action.name(), transportService, actionFilters, requestReader, EsExecutors.DIRECT_EXECUTOR_SERVICE);
8892
this.threadPool = threadPool;
@@ -94,6 +98,7 @@ public TransportAbstractBulkAction(
9498
this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
9599
this.systemCoordinationExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION);
96100
this.ingestForwarder = new IngestActionForwarder(transportService);
101+
this.featureService = featureService;
97102
clusterService.addStateApplier(this.ingestForwarder);
98103
this.relativeTimeNanosProvider = relativeTimeNanosProvider;
99104
this.bulkAction = action;
@@ -416,11 +421,19 @@ private void applyPipelinesAndDoInternalExecute(
416421
+ "] stream instead"
417422
);
418423
Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis());
419-
if (Boolean.TRUE.equals(failureStoreEnabled)) {
420-
bulkRequestModifier.markItemForFailureStore(i, req.index(), e);
424+
425+
if (featureService.clusterHasFeature(clusterService.state(), DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)) {
426+
if (Boolean.TRUE.equals(failureStoreEnabled)) {
427+
bulkRequestModifier.markItemForFailureStore(i, req.index(), e);
428+
} else if (Boolean.FALSE.equals(failureStoreEnabled)) {
429+
bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED);
430+
} else {
431+
bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
432+
}
421433
} else {
422434
bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
423435
}
436+
424437
break;
425438
}
426439
}

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
8686
private final OriginSettingClient rolloverClient;
8787
private final FailureStoreMetrics failureStoreMetrics;
8888
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
89-
private final FeatureService featureService;
9089

9190
@Inject
9291
public TransportBulkAction(
@@ -187,15 +186,15 @@ public TransportBulkAction(
187186
indexingPressure,
188187
systemIndices,
189188
projectResolver,
190-
relativeTimeProvider
189+
relativeTimeProvider,
190+
featureService
191191
);
192192
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
193193
Objects.requireNonNull(relativeTimeProvider);
194194
this.client = client;
195195
this.indexNameExpressionResolver = indexNameExpressionResolver;
196196
this.rolloverClient = new OriginSettingClient(client, LAZY_ROLLOVER_ORIGIN);
197197
this.failureStoreMetrics = failureStoreMetrics;
198-
this.featureService = featureService;
199198
}
200199

201200
public static <Response extends ReplicationResponse & WriteResponse> ActionListener<BulkResponse> unwrappingSingleItemBulkResponse(

server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.common.xcontent.XContentHelper;
3737
import org.elasticsearch.core.Nullable;
3838
import org.elasticsearch.core.Tuple;
39+
import org.elasticsearch.features.FeatureService;
3940
import org.elasticsearch.features.NodeFeature;
4041
import org.elasticsearch.index.IndexSettingProvider;
4142
import org.elasticsearch.index.IndexSettingProviders;
@@ -102,7 +103,8 @@ public TransportSimulateBulkAction(
102103
ProjectResolver projectResolver,
103104
IndicesService indicesService,
104105
NamedXContentRegistry xContentRegistry,
105-
IndexSettingProviders indexSettingProviders
106+
IndexSettingProviders indexSettingProviders,
107+
FeatureService featureService
106108
) {
107109
super(
108110
SimulateBulkAction.INSTANCE,
@@ -115,7 +117,8 @@ public TransportSimulateBulkAction(
115117
indexingPressure,
116118
systemIndices,
117119
projectResolver,
118-
threadPool::relativeTimeInNanos
120+
threadPool::relativeTimeInNanos,
121+
featureService
119122
);
120123
this.indicesService = indicesService;
121124
this.xContentRegistry = xContentRegistry;

server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.support.ActionFilters;
1818
import org.elasticsearch.cluster.ClusterState;
1919
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
20+
import org.elasticsearch.cluster.metadata.DataStream;
2021
import org.elasticsearch.cluster.metadata.IndexMetadata;
2122
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
2223
import org.elasticsearch.cluster.metadata.ProjectId;
@@ -30,6 +31,7 @@
3031
import org.elasticsearch.common.bytes.BytesReference;
3132
import org.elasticsearch.common.settings.Settings;
3233
import org.elasticsearch.common.xcontent.XContentHelper;
34+
import org.elasticsearch.features.FeatureService;
3335
import org.elasticsearch.index.IndexSettingProviders;
3436
import org.elasticsearch.index.IndexVersion;
3537
import org.elasticsearch.index.IndexVersions;
@@ -79,6 +81,7 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
7981
private ClusterService clusterService;
8082
private TestThreadPool threadPool;
8183
private IndicesService indicesService;
84+
private FeatureService mockFeatureService;
8285

8386
private TestTransportSimulateBulkAction bulkAction;
8487

@@ -96,7 +99,8 @@ class TestTransportSimulateBulkAction extends TransportSimulateBulkAction {
9699
TestProjectResolvers.DEFAULT_PROJECT_ONLY,
97100
indicesService,
98101
NamedXContentRegistry.EMPTY,
99-
new IndexSettingProviders(Set.of())
102+
new IndexSettingProviders(Set.of()),
103+
mockFeatureService
100104
);
101105
}
102106
}
@@ -126,6 +130,8 @@ public void setUp() throws Exception {
126130
transportService.acceptIncomingRequests();
127131
indicesService = mock(IndicesService.class);
128132
bulkAction = new TestTransportSimulateBulkAction();
133+
mockFeatureService = mock(FeatureService.class);
134+
when(mockFeatureService.clusterHasFeature(clusterService.state(), DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)).thenReturn(false);
129135
}
130136

131137
@After

0 commit comments

Comments
 (0)