Skip to content

Commit 0aff24c

Browse files
committed
Refactor the node feature checks to apply one time before iterating documents.
1 parent e318e81 commit 0aff24c

File tree

9 files changed

+125
-31
lines changed

9 files changed

+125
-31
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import java.util.concurrent.TimeUnit;
7070
import java.util.function.BiConsumer;
7171
import java.util.function.Consumer;
72-
import java.util.function.Function;
7372
import java.util.function.LongSupplier;
7473

7574
import static org.elasticsearch.action.bulk.TransportBulkAction.LAZY_ROLLOVER_ORIGIN;
@@ -104,7 +103,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
104103
private final Map<ShardId, Exception> shortCircuitShardFailures = ConcurrentCollections.newConcurrentMap();
105104
private final FailureStoreMetrics failureStoreMetrics;
106105
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
107-
private final Function<ClusterState, Boolean> failureStoreFeatureEnabled;
106+
private final boolean clusterHasFailureStoreFeature;
108107

109108
BulkOperation(
110109
Task task,
@@ -121,7 +120,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
121120
ActionListener<BulkResponse> listener,
122121
FailureStoreMetrics failureStoreMetrics,
123122
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
124-
Function<ClusterState, Boolean> failureStoreFeatureEnabled
123+
boolean clusterHasFailureStoreFeature
125124
) {
126125
this(
127126
task,
@@ -140,7 +139,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
140139
new FailureStoreDocumentConverter(),
141140
failureStoreMetrics,
142141
dataStreamFailureStoreSettings,
143-
failureStoreFeatureEnabled
142+
clusterHasFailureStoreFeature
144143
);
145144
}
146145

@@ -161,7 +160,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
161160
FailureStoreDocumentConverter failureStoreDocumentConverter,
162161
FailureStoreMetrics failureStoreMetrics,
163162
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
164-
Function<ClusterState, Boolean> failureStoreFeatureEnabled
163+
boolean clusterHasFailureStoreFeature
165164
) {
166165
super(listener);
167166
this.task = task;
@@ -182,7 +181,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
182181
this.shortCircuitShardFailures.putAll(bulkRequest.incrementalState().shardLevelFailures());
183182
this.failureStoreMetrics = failureStoreMetrics;
184183
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
185-
this.failureStoreFeatureEnabled = failureStoreFeatureEnabled;
184+
this.clusterHasFailureStoreFeature = clusterHasFailureStoreFeature;
186185
}
187186

188187
@Override
@@ -562,7 +561,7 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques
562561
DataStream failureStoreCandidate = getRedirectTargetCandidate(docWriteRequest, projectMetadata);
563562
// If the candidate is not null, the BulkItemRequest targets a data stream, but we'll still have to check if
564563
// it has the failure store enabled.
565-
if (failureStoreCandidate != null && failureStoreFeatureEnabled.apply(clusterService.state())) {
564+
if (failureStoreCandidate != null && clusterHasFailureStoreFeature) {
566565
// Do not redirect documents to a failure store that were already headed to one.
567566
var isFailureStoreRequest = isFailureStoreRequest(docWriteRequest);
568567
if (isFailureStoreRequest == false

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,11 @@ void executeBulk(
598598
Executor executor,
599599
AtomicArray<BulkItemResponse> responses
600600
) {
601+
// Determine if we have the feature enabled once for entire bulk operation
602+
final boolean clusterSupportsFailureStore = featureService.clusterHasFeature(
603+
clusterService.state(),
604+
DataStream.DATA_STREAM_FAILURE_STORE_FEATURE
605+
);
601606
new BulkOperation(
602607
task,
603608
threadPool,
@@ -613,7 +618,7 @@ void executeBulk(
613618
listener,
614619
failureStoreMetrics,
615620
dataStreamFailureStoreSettings,
616-
(clusterState) -> featureService.clusterHasFeature(clusterState, DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)
621+
clusterSupportsFailureStore
617622
).run();
618623
}
619624

@@ -626,16 +631,10 @@ Boolean resolveFailureInternal(String indexName, ProjectMetadata projectMetadata
626631
return null;
627632
}
628633
var resolution = resolveFailureStoreFromMetadata(indexName, projectMetadata, epochMillis);
629-
if (resolution == null) {
630-
resolution = resolveFailureStoreFromTemplate(indexName, projectMetadata, epochMillis);
631-
}
632-
if (resolution == null || featureService.clusterHasFeature(clusterService.state(), DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)) {
634+
if (resolution != null) {
633635
return resolution;
634-
} else {
635-
// If we get a non-null result but the cluster is not yet fully updated with failure stores,
636-
// force the result false to keep from redirecting until all nodes are updated
637-
return false;
638636
}
637+
return resolveFailureStoreFromTemplate(indexName, projectMetadata, epochMillis);
639638
}
640639

641640
@Override

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.core.Tuple;
6868
import org.elasticsearch.core.UpdateForV10;
6969
import org.elasticsearch.env.Environment;
70+
import org.elasticsearch.features.FeatureService;
7071
import org.elasticsearch.gateway.GatewayService;
7172
import org.elasticsearch.grok.MatcherWatchdog;
7273
import org.elasticsearch.index.IndexSettings;
@@ -135,6 +136,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
135136
private final List<Consumer<ClusterState>> ingestClusterStateListeners = new CopyOnWriteArrayList<>();
136137
private volatile ClusterState state;
137138
private final ProjectResolver projectResolver;
139+
private final FeatureService featureService;
138140

139141
private static BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> createScheduler(ThreadPool threadPool) {
140142
return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic());
@@ -221,7 +223,8 @@ public IngestService(
221223
Client client,
222224
MatcherWatchdog matcherWatchdog,
223225
FailureStoreMetrics failureStoreMetrics,
224-
ProjectResolver projectResolver
226+
ProjectResolver projectResolver,
227+
FeatureService featureService
225228
) {
226229
this.clusterService = clusterService;
227230
this.scriptService = scriptService;
@@ -244,6 +247,7 @@ public IngestService(
244247
this.taskQueue = clusterService.createTaskQueue("ingest-pipelines", Priority.NORMAL, PIPELINE_TASK_EXECUTOR);
245248
this.failureStoreMetrics = failureStoreMetrics;
246249
this.projectResolver = projectResolver;
250+
this.featureService = featureService;
247251
}
248252

249253
/**
@@ -261,6 +265,7 @@ public IngestService(
261265
this.state = ingestService.state;
262266
this.failureStoreMetrics = ingestService.failureStoreMetrics;
263267
this.projectResolver = ingestService.projectResolver;
268+
this.featureService = ingestService.featureService;
264269
}
265270

266271
private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins, Processor.Parameters parameters) {
@@ -838,6 +843,9 @@ public void executeBulkRequest(
838843
) {
839844
assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]";
840845

846+
// Adapt handler to ensure node features during ingest logic
847+
final Function<String, Boolean> adaptedResolveFailureStore = wrapResolverWithFeatureCheck(resolveFailureStore);
848+
841849
executor.execute(new AbstractRunnable() {
842850

843851
@Override
@@ -920,7 +928,7 @@ public void onFailure(Exception e) {
920928
}
921929
);
922930

923-
executePipelines(pipelines, indexRequest, ingestDocument, resolveFailureStore, documentListener);
931+
executePipelines(pipelines, indexRequest, ingestDocument, adaptedResolveFailureStore, documentListener);
924932
assert actionRequest.index() != null;
925933

926934
i++;
@@ -930,6 +938,29 @@ public void onFailure(Exception e) {
930938
});
931939
}
932940

941+
/**
942+
* Adapts failure store resolver function so that if the failure store node feature is not present on every node it reverts to the
943+
* old ingest behavior.
944+
* @param resolveFailureStore Function that surfaces if failures for an index should be redirected to failure store.
945+
* @return An adapted function that mutes the original if the cluster does not have the node feature universally applied.
946+
*/
947+
private Function<String, Boolean> wrapResolverWithFeatureCheck(Function<String, Boolean> resolveFailureStore) {
948+
final boolean clusterHasFailureStoreFeature = featureService.clusterHasFeature(
949+
clusterService.state(),
950+
DataStream.DATA_STREAM_FAILURE_STORE_FEATURE
951+
);
952+
return (indexName) -> {
953+
Boolean resolution = resolveFailureStore.apply(indexName);
954+
if (clusterHasFailureStoreFeature || resolution == null) {
955+
return resolution;
956+
} else {
957+
// If we get a non-null result but the cluster is not yet fully updated with required node features,
958+
// force the result false to maintain old logic until all nodes are updated
959+
return false;
960+
}
961+
};
962+
}
963+
933964
/**
934965
* Returns the pipelines of the request, and updates the request so that it no longer references
935966
* any pipelines (both the default and final pipeline are set to the noop pipeline).

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,8 @@ private void construct(
689689

690690
modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider);
691691

692+
FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class));
693+
692694
FailureStoreMetrics failureStoreMetrics = new FailureStoreMetrics(telemetryProvider.getMeterRegistry());
693695
final IngestService ingestService = new IngestService(
694696
clusterService,
@@ -700,7 +702,8 @@ private void construct(
700702
client,
701703
IngestService.createGrokThreadWatchdog(environment, threadPool),
702704
failureStoreMetrics,
703-
projectResolver
705+
projectResolver,
706+
featureService
704707
);
705708

706709
SystemIndices systemIndices = createSystemIndices(settings);
@@ -784,8 +787,6 @@ private void construct(
784787

785788
final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);
786789

787-
FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class));
788-
789790
if (DiscoveryNode.isMasterNode(settings)) {
790791
clusterService.addListener(new SystemIndexMappingUpdateService(systemIndices, client, projectResolver));
791792
}

server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1222,7 +1222,7 @@ private BulkOperation newBulkOperation(
12221222
failureStoreDocumentConverter,
12231223
FailureStoreMetrics.NOOP,
12241224
dataStreamFailureStoreSettings,
1225-
(clusterState) -> true // Feature enabled
1225+
true
12261226
);
12271227
}
12281228

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import org.elasticsearch.core.Strings;
5757
import org.elasticsearch.core.TimeValue;
5858
import org.elasticsearch.core.Tuple;
59+
import org.elasticsearch.features.FeatureService;
60+
import org.elasticsearch.features.NodeFeature;
5961
import org.elasticsearch.index.Index;
6062
import org.elasticsearch.index.IndexSettings;
6163
import org.elasticsearch.index.IndexVersion;
@@ -160,7 +162,13 @@ public void testIngestPlugin() {
160162
client,
161163
null,
162164
FailureStoreMetrics.NOOP,
163-
TestProjectResolvers.alwaysThrow()
165+
TestProjectResolvers.alwaysThrow(),
166+
new FeatureService(List.of()) {
167+
@Override
168+
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
169+
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
170+
}
171+
}
164172
);
165173
Map<String, Processor.Factory> factories = ingestService.getProcessorFactories();
166174
assertTrue(factories.containsKey("foo"));
@@ -181,7 +189,13 @@ public void testIngestPluginDuplicate() {
181189
client,
182190
null,
183191
FailureStoreMetrics.NOOP,
184-
TestProjectResolvers.alwaysThrow()
192+
TestProjectResolvers.alwaysThrow(),
193+
new FeatureService(List.of()) {
194+
@Override
195+
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
196+
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
197+
}
198+
}
185199
)
186200
);
187201
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
@@ -199,7 +213,13 @@ public void testExecuteIndexPipelineDoesNotExist() {
199213
client,
200214
null,
201215
FailureStoreMetrics.NOOP,
202-
TestProjectResolvers.alwaysThrow()
216+
TestProjectResolvers.alwaysThrow(),
217+
new FeatureService(List.of()) {
218+
@Override
219+
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
220+
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
221+
}
222+
}
203223
);
204224
final IndexRequest indexRequest = new IndexRequest("_index").id("_id")
205225
.source(Map.of())
@@ -2440,7 +2460,13 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
24402460
client,
24412461
null,
24422462
FailureStoreMetrics.NOOP,
2443-
TestProjectResolvers.alwaysThrow()
2463+
TestProjectResolvers.alwaysThrow(),
2464+
new FeatureService(List.of()) {
2465+
@Override
2466+
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
2467+
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
2468+
}
2469+
}
24442470
);
24452471
ingestService.addIngestClusterStateListener(ingestClusterStateListener);
24462472

@@ -2929,7 +2955,13 @@ private void testUpdatingPipeline(String pipelineString) throws Exception {
29292955
client,
29302956
null,
29312957
FailureStoreMetrics.NOOP,
2932-
TestProjectResolvers.alwaysThrow()
2958+
TestProjectResolvers.alwaysThrow(),
2959+
new FeatureService(List.of()) {
2960+
@Override
2961+
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
2962+
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
2963+
}
2964+
}
29332965
);
29342966
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState));
29352967

@@ -3263,7 +3295,13 @@ public Map<String, Processor.Factory> getProcessors(final Processor.Parameters p
32633295
client,
32643296
null,
32653297
FailureStoreMetrics.NOOP,
3266-
TestProjectResolvers.alwaysThrow()
3298+
TestProjectResolvers.alwaysThrow(),
3299+
new FeatureService(List.of()) {
3300+
@Override
3301+
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
3302+
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
3303+
}
3304+
}
32673305
);
32683306
if (randomBoolean()) {
32693307
/*

server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@
1212
import org.elasticsearch.action.bulk.FailureStoreMetrics;
1313
import org.elasticsearch.action.bulk.SimulateBulkRequest;
1414
import org.elasticsearch.client.internal.Client;
15+
import org.elasticsearch.cluster.ClusterState;
16+
import org.elasticsearch.cluster.metadata.DataStream;
1517
import org.elasticsearch.cluster.metadata.ProjectId;
1618
import org.elasticsearch.cluster.project.TestProjectResolvers;
1719
import org.elasticsearch.cluster.service.ClusterService;
1820
import org.elasticsearch.common.bytes.BytesArray;
1921
import org.elasticsearch.common.util.concurrent.EsExecutors;
22+
import org.elasticsearch.features.FeatureService;
23+
import org.elasticsearch.features.NodeFeature;
2024
import org.elasticsearch.plugins.IngestPlugin;
2125
import org.elasticsearch.test.ESTestCase;
2226
import org.elasticsearch.threadpool.ThreadPool;
@@ -135,7 +139,13 @@ public Map<String, Processor.Factory> getProcessors(final Processor.Parameters p
135139
client,
136140
null,
137141
FailureStoreMetrics.NOOP,
138-
TestProjectResolvers.singleProject(projectId)
142+
TestProjectResolvers.singleProject(projectId),
143+
new FeatureService(List.of()) {
144+
@Override
145+
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
146+
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
147+
}
148+
}
139149
);
140150
}
141151
}

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2404,7 +2404,13 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() {
24042404
client,
24052405
null,
24062406
FailureStoreMetrics.NOOP,
2407-
TestProjectResolvers.alwaysThrow()
2407+
TestProjectResolvers.alwaysThrow(),
2408+
new FeatureService(List.of()) {
2409+
@Override
2410+
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
2411+
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
2412+
}
2413+
}
24082414
),
24092415
client,
24102416
actionFilters,

0 commit comments

Comments
 (0)