Skip to content

Commit 9995147

Browse files
authored
IngestService code cleanups (#94593)
1 parent ea218dd commit 9995147

File tree

2 files changed

+59
-42
lines changed

2 files changed

+59
-42
lines changed

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ private static Map<String, Processor.Factory> processorFactories(List<IngestPlug
198198
/**
199199
* Resolves the potential pipelines (default and final) from the requests or templates associated to the index and then **mutates**
200200
* the {@link org.elasticsearch.action.index.IndexRequest} passed object with the pipeline information.
201-
*
201+
* <p>
202202
* Also, this method marks the request as `isPipelinesResolved = true`: Due to the request could be rerouted from a coordinating node
203203
* to an ingest node, we have to be able to avoid double resolving the pipelines and also able to distinguish that either the pipeline
204204
* comes as part of the request or resolved from this method. All this is made to later be able to reject the request in case the
@@ -476,10 +476,9 @@ Map<String, PipelineHolder> pipelines() {
476476
* 'on_failure', so we report metrics for the set processor, not an on_failure processor.
477477
*
478478
* @param compoundProcessor The compound processor to start walking the non-failure processors
479-
* @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples.
480-
* @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor
479+
* @param processorMetrics The list to populate with {@link Processor} {@link IngestMetric} tuples.
481480
*/
482-
private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(
481+
private static void collectProcessorMetrics(
483482
CompoundProcessor compoundProcessor,
484483
List<Tuple<Processor, IngestMetric>> processorMetrics
485484
) {
@@ -505,12 +504,11 @@ private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(
505504
} while (unwrapped);
506505

507506
if (processor instanceof CompoundProcessor cp) {
508-
getProcessorMetrics(cp, processorMetrics);
507+
collectProcessorMetrics(cp, processorMetrics);
509508
} else {
510509
processorMetrics.add(new Tuple<>(processor, metric));
511510
}
512511
}
513-
return processorMetrics;
514512
}
515513

516514
/**
@@ -840,7 +838,7 @@ public IngestStats stats() {
840838
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
841839
statsBuilder.addPipelineMetrics(id, pipeline.getMetrics());
842840
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
843-
getProcessorMetrics(rootProcessor, processorMetrics);
841+
collectProcessorMetrics(rootProcessor, processorMetrics);
844842
processorMetrics.forEach(t -> {
845843
Processor processor = t.v1();
846844
IngestMetric processorMetric = t.v2();
@@ -1008,8 +1006,8 @@ synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
10081006
newPipeline.getMetrics().add(oldPipeline.getMetrics());
10091007
List<Tuple<Processor, IngestMetric>> oldPerProcessMetrics = new ArrayList<>();
10101008
List<Tuple<Processor, IngestMetric>> newPerProcessMetrics = new ArrayList<>();
1011-
getProcessorMetrics(oldPipeline.getCompoundProcessor(), oldPerProcessMetrics);
1012-
getProcessorMetrics(newPipeline.getCompoundProcessor(), newPerProcessMetrics);
1009+
collectProcessorMetrics(oldPipeline.getCompoundProcessor(), oldPerProcessMetrics);
1010+
collectProcessorMetrics(newPipeline.getCompoundProcessor(), newPerProcessMetrics);
10131011
// Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since
10141012
// the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and
10151013
// consistent id's per processor and/or semantic equals for each processor will be needed.
@@ -1144,14 +1142,11 @@ public String getType() {
11441142
return new Pipeline(id, description, null, null, new CompoundProcessor(failureProcessor));
11451143
}
11461144

1147-
static class PipelineHolder {
1148-
1149-
final PipelineConfiguration configuration;
1150-
final Pipeline pipeline;
1145+
record PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) {
11511146

1152-
PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) {
1153-
this.configuration = Objects.requireNonNull(configuration);
1154-
this.pipeline = Objects.requireNonNull(pipeline);
1147+
public PipelineHolder {
1148+
Objects.requireNonNull(configuration);
1149+
Objects.requireNonNull(pipeline);
11551150
}
11561151
}
11571152

@@ -1236,7 +1231,7 @@ private static Optional<Pipelines> resolvePipelinesFromIndexTemplates(IndexReque
12361231

12371232
/**
12381233
* Checks whether an IndexRequest has at least one pipeline defined.
1239-
*
1234+
* <p>
12401235
* This method assumes that the pipelines are beforehand resolved.
12411236
*/
12421237
public static boolean hasPipeline(IndexRequest indexRequest) {

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -209,10 +209,12 @@ public void testUpdatePipelines() {
209209
.build();
210210
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
211211
assertThat(ingestService.pipelines().size(), is(1));
212-
assertThat(ingestService.pipelines().get("_id").pipeline.getId(), equalTo("_id"));
213-
assertThat(ingestService.pipelines().get("_id").pipeline.getDescription(), nullValue());
214-
assertThat(ingestService.pipelines().get("_id").pipeline.getProcessors().size(), equalTo(1));
215-
assertThat(ingestService.pipelines().get("_id").pipeline.getProcessors().get(0).getType(), equalTo("set"));
212+
213+
Pipeline p = ingestService.getPipeline("_id");
214+
assertThat(p.getId(), equalTo("_id"));
215+
assertThat(p.getDescription(), nullValue());
216+
assertThat(p.getProcessors().size(), equalTo(1));
217+
assertThat(p.getProcessors().get(0).getType(), equalTo("set"));
216218
}
217219

218220
public void testInnerUpdatePipelines() {
@@ -224,51 +226,71 @@ public void testInnerUpdatePipelines() {
224226

225227
ingestService.innerUpdatePipelines(ingestMetadata);
226228
assertThat(ingestService.pipelines().size(), is(1));
227-
assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1"));
228-
assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0));
229+
{
230+
Pipeline p1 = ingestService.getPipeline("_id1");
231+
assertThat(p1.getId(), equalTo("_id1"));
232+
assertThat(p1.getProcessors().size(), equalTo(0));
233+
}
229234

230235
PipelineConfiguration pipeline2 = new PipelineConfiguration("_id2", new BytesArray("{\"processors\": []}"), XContentType.JSON);
231236
ingestMetadata = new IngestMetadata(Map.of("_id1", pipeline1, "_id2", pipeline2));
232237

233238
ingestService.innerUpdatePipelines(ingestMetadata);
234239
assertThat(ingestService.pipelines().size(), is(2));
235-
assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1"));
236-
assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0));
237-
assertThat(ingestService.pipelines().get("_id2").pipeline.getId(), equalTo("_id2"));
238-
assertThat(ingestService.pipelines().get("_id2").pipeline.getProcessors().size(), equalTo(0));
240+
{
241+
Pipeline p1 = ingestService.getPipeline("_id1");
242+
assertThat(p1.getId(), equalTo("_id1"));
243+
assertThat(p1.getProcessors().size(), equalTo(0));
244+
Pipeline p2 = ingestService.getPipeline("_id2");
245+
assertThat(p2.getId(), equalTo("_id2"));
246+
assertThat(p2.getProcessors().size(), equalTo(0));
247+
}
239248

240249
PipelineConfiguration pipeline3 = new PipelineConfiguration("_id3", new BytesArray("{\"processors\": []}"), XContentType.JSON);
241250
ingestMetadata = new IngestMetadata(Map.of("_id1", pipeline1, "_id2", pipeline2, "_id3", pipeline3));
242251

243252
ingestService.innerUpdatePipelines(ingestMetadata);
244253
assertThat(ingestService.pipelines().size(), is(3));
245-
assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1"));
246-
assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0));
247-
assertThat(ingestService.pipelines().get("_id2").pipeline.getId(), equalTo("_id2"));
248-
assertThat(ingestService.pipelines().get("_id2").pipeline.getProcessors().size(), equalTo(0));
249-
assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3"));
250-
assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(0));
254+
{
255+
Pipeline p1 = ingestService.getPipeline("_id1");
256+
assertThat(p1.getId(), equalTo("_id1"));
257+
assertThat(p1.getProcessors().size(), equalTo(0));
258+
Pipeline p2 = ingestService.getPipeline("_id2");
259+
assertThat(p2.getId(), equalTo("_id2"));
260+
assertThat(p2.getProcessors().size(), equalTo(0));
261+
Pipeline p3 = ingestService.getPipeline("_id3");
262+
assertThat(p3.getId(), equalTo("_id3"));
263+
assertThat(p3.getProcessors().size(), equalTo(0));
264+
}
251265

252266
ingestMetadata = new IngestMetadata(Map.of("_id1", pipeline1, "_id3", pipeline3));
253267

254268
ingestService.innerUpdatePipelines(ingestMetadata);
255269
assertThat(ingestService.pipelines().size(), is(2));
256-
assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1"));
257-
assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0));
258-
assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3"));
259-
assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(0));
270+
{
271+
Pipeline p1 = ingestService.getPipeline("_id1");
272+
assertThat(p1.getId(), equalTo("_id1"));
273+
assertThat(p1.getProcessors().size(), equalTo(0));
274+
Pipeline p3 = ingestService.getPipeline("_id3");
275+
assertThat(p3.getId(), equalTo("_id3"));
276+
assertThat(p3.getProcessors().size(), equalTo(0));
277+
}
260278

261279
pipeline3 = new PipelineConfiguration("_id3", new BytesArray("""
262280
{"processors": [{"set" : {"field": "_field", "value": "_value"}}]}"""), XContentType.JSON);
263281
ingestMetadata = new IngestMetadata(Map.of("_id1", pipeline1, "_id3", pipeline3));
264282

265283
ingestService.innerUpdatePipelines(ingestMetadata);
266284
assertThat(ingestService.pipelines().size(), is(2));
267-
assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1"));
268-
assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0));
269-
assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3"));
270-
assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(1));
271-
assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().get(0).getType(), equalTo("set"));
285+
{
286+
Pipeline p1 = ingestService.getPipeline("_id1");
287+
assertThat(p1.getId(), equalTo("_id1"));
288+
assertThat(p1.getProcessors().size(), equalTo(0));
289+
Pipeline p3 = ingestService.getPipeline("_id3");
290+
assertThat(p3.getId(), equalTo("_id3"));
291+
assertThat(p3.getProcessors().size(), equalTo(1));
292+
assertThat(p3.getProcessors().get(0).getType(), equalTo("set"));
293+
}
272294

273295
// Perform an update with no changes:
274296
Map<String, IngestService.PipelineHolder> pipelines = ingestService.pipelines();

0 commit comments

Comments
 (0)