Skip to content

Commit bb87e67

Browse files
author
Pablo Alcantar Morales
authored
Refactoring pipeline's resolving code (#93821)
1 parent 50bccfb commit bb87e67

File tree

3 files changed

+233
-167
lines changed

3 files changed

+233
-167
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,8 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
229229
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
230230
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
231231
if (indexRequest != null) {
232-
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
233-
boolean indexRequestHasPipeline = IngestService.resolvePipelines(actionRequest, indexRequest, metadata);
234-
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
232+
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
233+
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
235234
}
236235

237236
if (actionRequest instanceof IndexRequest ir) {

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 134 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import java.util.Locale;
7474
import java.util.Map;
7575
import java.util.Objects;
76+
import java.util.Optional;
7677
import java.util.Set;
7778
import java.util.concurrent.CopyOnWriteArrayList;
7879
import java.util.function.BiConsumer;
@@ -194,120 +195,51 @@ private static Map<String, Processor.Factory> processorFactories(List<IngestPlug
194195
return Collections.unmodifiableMap(processorFactories);
195196
}
196197

197-
public static boolean resolvePipelines(
198+
/**
199+
* Resolves the potential pipelines (default and final) from the requests or templates associated to the index and then **mutates**
200+
* the {@link org.elasticsearch.action.index.IndexRequest} passed object with the pipeline information.
201+
*
202+
* Also, this method marks the request as `isPipelinesResolved = true`: Due to the request could be rerouted from a coordinating node
203+
* to an ingest node, we have to be able to avoid double resolving the pipelines and also able to distinguish that either the pipeline
204+
* comes as part of the request or resolved from this method. All this is made to later be able to reject the request in case the
205+
* pipeline was set by a required pipeline **and** the request also has a pipeline request too.
206+
*
207+
* @param originalRequest Original write request received.
208+
* @param indexRequest The {@link org.elasticsearch.action.index.IndexRequest} object to update.
209+
* @param metadata Cluster metadata from where the pipeline information could be derived.
210+
*/
211+
public static void resolvePipelinesAndUpdateIndexRequest(
198212
final DocWriteRequest<?> originalRequest,
199213
final IndexRequest indexRequest,
200214
final Metadata metadata
201215
) {
202-
return resolvePipelines(originalRequest, indexRequest, metadata, System.currentTimeMillis());
216+
resolvePipelinesAndUpdateIndexRequest(originalRequest, indexRequest, metadata, System.currentTimeMillis());
203217
}
204218

205-
public static boolean resolvePipelines(
219+
static void resolvePipelinesAndUpdateIndexRequest(
206220
final DocWriteRequest<?> originalRequest,
207221
final IndexRequest indexRequest,
208222
final Metadata metadata,
209223
final long epochMillis
210224
) {
211-
if (indexRequest.isPipelineResolved() == false) {
212-
final String requestPipeline = indexRequest.getPipeline();
213-
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
214-
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
215-
String defaultPipeline = null;
216-
String finalPipeline = null;
217-
IndexMetadata indexMetadata = null;
218-
// start to look for default or final pipelines via settings found in the index metadata
219-
if (originalRequest != null) {
220-
indexMetadata = metadata.indices()
221-
.get(IndexNameExpressionResolver.resolveDateMathExpression(originalRequest.index(), epochMillis));
222-
}
223-
// check the alias for the index request (this is how normal index requests are modeled)
224-
if (indexMetadata == null && indexRequest.index() != null) {
225-
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(indexRequest.index());
226-
if (indexAbstraction != null && indexAbstraction.getWriteIndex() != null) {
227-
indexMetadata = metadata.index(indexAbstraction.getWriteIndex());
228-
}
229-
}
230-
// check the alias for the action request (this is how upserts are modeled)
231-
if (indexMetadata == null && originalRequest != null && originalRequest.index() != null) {
232-
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(originalRequest.index());
233-
if (indexAbstraction != null && indexAbstraction.getWriteIndex() != null) {
234-
indexMetadata = metadata.index(indexAbstraction.getWriteIndex());
235-
}
236-
}
237-
if (indexMetadata != null) {
238-
final Settings indexSettings = indexMetadata.getSettings();
239-
if (IndexSettings.DEFAULT_PIPELINE.exists(indexSettings)) {
240-
// find the default pipeline if one is defined from an existing index setting
241-
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
242-
indexRequest.setPipeline(defaultPipeline);
243-
}
244-
if (IndexSettings.FINAL_PIPELINE.exists(indexSettings)) {
245-
// find the final pipeline if one is defined from an existing index setting
246-
finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexSettings);
247-
indexRequest.setFinalPipeline(finalPipeline);
248-
}
249-
} else if (indexRequest.index() != null) {
250-
// the index does not exist yet (and this is a valid request), so match index
251-
// templates to look for pipelines in either a matching V2 template (which takes
252-
// precedence), or if a V2 template does not match, any V1 templates
253-
String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexRequest.index(), false);
254-
if (v2Template != null) {
255-
Settings settings = MetadataIndexTemplateService.resolveSettings(metadata, v2Template);
256-
if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
257-
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
258-
}
259-
if (IndexSettings.FINAL_PIPELINE.exists(settings)) {
260-
finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings);
261-
}
262-
indexRequest.setPipeline(Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME));
263-
indexRequest.setFinalPipeline(Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME));
264-
} else {
265-
List<IndexTemplateMetadata> templates = MetadataIndexTemplateService.findV1Templates(
266-
metadata,
267-
indexRequest.index(),
268-
null
269-
);
270-
// order of templates are highest order first
271-
for (final IndexTemplateMetadata template : templates) {
272-
final Settings settings = template.settings();
273-
if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
274-
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
275-
// we can not break in case a lower-order template has a final pipeline that we need to collect
276-
}
277-
if (finalPipeline == null && IndexSettings.FINAL_PIPELINE.exists(settings)) {
278-
finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings);
279-
// we can not break in case a lower-order template has a default pipeline that we need to collect
280-
}
281-
if (defaultPipeline != null && finalPipeline != null) {
282-
// we can break if we have already collected a default and final pipeline
283-
break;
284-
}
285-
}
286-
indexRequest.setPipeline(Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME));
287-
indexRequest.setFinalPipeline(Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME));
288-
}
289-
}
225+
if (indexRequest.isPipelineResolved()) {
226+
return;
227+
}
290228

291-
if (requestPipeline != null) {
292-
indexRequest.setPipeline(requestPipeline);
293-
}
229+
String requestPipeline = indexRequest.getPipeline();
294230

295-
/*
296-
* We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
297-
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
298-
* has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
299-
* already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
300-
* can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
301-
* set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
302-
* these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
303-
* pipeline parameter too.
304-
*/
305-
indexRequest.isPipelineResolved(true);
306-
}
231+
Pipelines pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis) //
232+
.or(() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata))
233+
.orElse(Pipelines.NO_PIPELINES_DEFINED);
307234

308-
// return whether this index request has a pipeline
309-
return NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false
310-
|| NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false;
235+
// The pipeline coming as part of the request always has priority over the resolved one from metadata or templates
236+
if (requestPipeline != null) {
237+
indexRequest.setPipeline(requestPipeline);
238+
} else {
239+
indexRequest.setPipeline(pipelines.defaultPipeline);
240+
}
241+
indexRequest.setFinalPipeline(pipelines.finalPipeline);
242+
indexRequest.isPipelineResolved(true);
311243
}
312244

313245
public ClusterService getClusterService() {
@@ -855,7 +787,7 @@ private void executePipelines(
855787
return; // document failed!
856788
} else {
857789
indexRequest.isPipelineResolved(false);
858-
resolvePipelines(null, indexRequest, state.metadata());
790+
resolvePipelinesAndUpdateIndexRequest(null, indexRequest, state.metadata());
859791
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) {
860792
newPipelineIds = Collections.singleton(indexRequest.getFinalPipeline()).iterator();
861793
newHasFinalPipeline = true;
@@ -1223,4 +1155,105 @@ static class PipelineHolder {
12231155
}
12241156
}
12251157

1158+
private static Optional<Pipelines> resolvePipelinesFromMetadata(
1159+
DocWriteRequest<?> originalRequest,
1160+
IndexRequest indexRequest,
1161+
Metadata metadata,
1162+
long epochMillis
1163+
) {
1164+
IndexMetadata indexMetadata = null;
1165+
// start to look for default or final pipelines via settings found in the cluster metadata
1166+
if (originalRequest != null) {
1167+
indexMetadata = metadata.indices()
1168+
.get(IndexNameExpressionResolver.resolveDateMathExpression(originalRequest.index(), epochMillis));
1169+
}
1170+
// check the alias for the index request (this is how normal index requests are modeled)
1171+
if (indexMetadata == null && indexRequest.index() != null) {
1172+
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(indexRequest.index());
1173+
if (indexAbstraction != null && indexAbstraction.getWriteIndex() != null) {
1174+
indexMetadata = metadata.index(indexAbstraction.getWriteIndex());
1175+
}
1176+
}
1177+
// check the alias for the action request (this is how upserts are modeled)
1178+
if (indexMetadata == null && originalRequest != null && originalRequest.index() != null) {
1179+
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(originalRequest.index());
1180+
if (indexAbstraction != null && indexAbstraction.getWriteIndex() != null) {
1181+
indexMetadata = metadata.index(indexAbstraction.getWriteIndex());
1182+
}
1183+
}
1184+
1185+
if (indexMetadata == null) {
1186+
return Optional.empty();
1187+
}
1188+
1189+
final Settings settings = indexMetadata.getSettings();
1190+
return Optional.of(new Pipelines(IndexSettings.DEFAULT_PIPELINE.get(settings), IndexSettings.FINAL_PIPELINE.get(settings)));
1191+
}
1192+
1193+
private static Optional<Pipelines> resolvePipelinesFromIndexTemplates(IndexRequest indexRequest, Metadata metadata) {
1194+
if (indexRequest.index() == null) {
1195+
return Optional.empty();
1196+
}
1197+
1198+
// the index does not exist yet (and this is a valid request), so match index
1199+
// templates to look for pipelines in either a matching V2 template (which takes
1200+
// precedence), or if a V2 template does not match, any V1 templates
1201+
String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexRequest.index(), false);
1202+
if (v2Template != null) {
1203+
final Settings settings = MetadataIndexTemplateService.resolveSettings(metadata, v2Template);
1204+
return Optional.of(new Pipelines(IndexSettings.DEFAULT_PIPELINE.get(settings), IndexSettings.FINAL_PIPELINE.get(settings)));
1205+
}
1206+
1207+
String defaultPipeline = null;
1208+
String finalPipeline = null;
1209+
List<IndexTemplateMetadata> templates = MetadataIndexTemplateService.findV1Templates(metadata, indexRequest.index(), null);
1210+
// order of templates are the highest order first
1211+
for (final IndexTemplateMetadata template : templates) {
1212+
final Settings settings = template.settings();
1213+
1214+
// note: the exists/get trickiness here is because we explicitly *don't* want the default value
1215+
// of the settings -- a non-null value would terminate the search too soon
1216+
if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
1217+
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
1218+
// we can not break in case a lower-order template has a final pipeline that we need to collect
1219+
}
1220+
if (finalPipeline == null && IndexSettings.FINAL_PIPELINE.exists(settings)) {
1221+
finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings);
1222+
// we can not break in case a lower-order template has a default pipeline that we need to collect
1223+
}
1224+
if (defaultPipeline != null && finalPipeline != null) {
1225+
// we can break if we have already collected a default and final pipeline
1226+
break;
1227+
}
1228+
}
1229+
1230+
// having exhausted the search, if nothing was found, then use the default noop pipeline names
1231+
defaultPipeline = Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME);
1232+
finalPipeline = Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME);
1233+
1234+
return Optional.of(new Pipelines(defaultPipeline, finalPipeline));
1235+
}
1236+
1237+
/**
1238+
* Checks whether an IndexRequest has at least one pipeline defined.
1239+
*
1240+
* This method assumes that the pipelines are beforehand resolved.
1241+
*/
1242+
public static boolean hasPipeline(IndexRequest indexRequest) {
1243+
assert indexRequest.isPipelineResolved();
1244+
assert indexRequest.getPipeline() != null;
1245+
assert indexRequest.getFinalPipeline() != null;
1246+
return NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false
1247+
|| NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false;
1248+
}
1249+
1250+
private record Pipelines(String defaultPipeline, String finalPipeline) {
1251+
1252+
private static final Pipelines NO_PIPELINES_DEFINED = new Pipelines(NOOP_PIPELINE_NAME, NOOP_PIPELINE_NAME);
1253+
1254+
public Pipelines {
1255+
Objects.requireNonNull(defaultPipeline);
1256+
Objects.requireNonNull(finalPipeline);
1257+
}
1258+
}
12261259
}

0 commit comments

Comments
 (0)