Skip to content

Commit ab2b6b9

Browse files
joegallomasseyke
andauthored
Correctly handling download_database_on_pipeline_creation within a pipeline processor within a default or final pipeline (#131236) (#131649) (#131653)
Co-authored-by: Keith Massey <[email protected]>
1 parent ebdfe25 commit ab2b6b9

File tree

3 files changed

+264
-14
lines changed

3 files changed

+264
-14
lines changed

docs/changelog/131236.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 131236
2+
summary: Correctly handling `download_database_on_pipeline_creation` within a pipeline
3+
processor within a default or final pipeline
4+
area: Ingest Node
5+
type: bug
6+
issues: []

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 151 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import org.elasticsearch.cluster.ClusterState;
2222
import org.elasticsearch.cluster.ClusterStateListener;
2323
import org.elasticsearch.cluster.metadata.IndexAbstraction;
24+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2425
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.cluster.service.ClusterService;
2627
import org.elasticsearch.common.settings.Setting;
2728
import org.elasticsearch.common.settings.Settings;
29+
import org.elasticsearch.common.util.Maps;
2830
import org.elasticsearch.core.TimeValue;
2931
import org.elasticsearch.gateway.GatewayService;
3032
import org.elasticsearch.index.Index;
@@ -247,11 +249,14 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
247249
return false;
248250
}
249251

250-
return clusterState.getMetadata().indices().values().stream().anyMatch(indexMetadata -> {
252+
for (IndexMetadata indexMetadata : clusterState.getMetadata().indices().values()) {
251253
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings());
252254
String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings());
253-
return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline);
254-
});
255+
if (checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline)) {
256+
return true;
257+
}
258+
}
259+
return false;
255260
}
256261

257262
/**
@@ -264,12 +269,26 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
264269
@SuppressWarnings("unchecked")
265270
private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) {
266271
List<PipelineConfiguration> configurations = IngestService.getPipelines(clusterState);
272+
Map<String, PipelineConfiguration> pipelineConfigById = Maps.newHashMapWithExpectedSize(configurations.size());
273+
for (PipelineConfiguration configuration : configurations) {
274+
pipelineConfigById.put(configuration.getId(), configuration);
275+
}
276+
// this map is used to keep track of pipelines that have already been checked
277+
Map<String, Boolean> pipelineHasGeoProcessorById = Maps.newHashMapWithExpectedSize(configurations.size());
267278
Set<String> ids = new HashSet<>();
268279
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
269280
for (PipelineConfiguration configuration : configurations) {
270281
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY);
271-
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
272-
ids.add(configuration.getId());
282+
String pipelineName = configuration.getId();
283+
if (pipelineHasGeoProcessorById.containsKey(pipelineName) == false) {
284+
if (hasAtLeastOneGeoipProcessor(
285+
processors,
286+
downloadDatabaseOnPipelineCreation,
287+
pipelineConfigById,
288+
pipelineHasGeoProcessorById
289+
)) {
290+
ids.add(pipelineName);
291+
}
273292
}
274293
}
275294
return Collections.unmodifiableSet(ids);
@@ -279,13 +298,27 @@ private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState
279298
* Check if a list of processor contains at least a geoip processor.
280299
* @param processors List of processors.
281300
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
301+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
302+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
303+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
304+
* out (null).
282305
* @return true if a geoip processor is found in the processor list.
283306
*/
284-
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
307+
private static boolean hasAtLeastOneGeoipProcessor(
308+
List<Map<String, Object>> processors,
309+
boolean downloadDatabaseOnPipelineCreation,
310+
Map<String, PipelineConfiguration> pipelineConfigById,
311+
Map<String, Boolean> pipelineHasGeoProcessorById
312+
) {
285313
if (processors != null) {
286314
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
287315
for (Map<String, Object> processor : processors) {
288-
if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) {
316+
if (hasAtLeastOneGeoipProcessor(
317+
processor,
318+
downloadDatabaseOnPipelineCreation,
319+
pipelineConfigById,
320+
pipelineHasGeoProcessorById
321+
)) {
289322
return true;
290323
}
291324
}
@@ -297,10 +330,19 @@ private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> pro
297330
* Check if a processor config is a geoip processor or contains at least a geoip processor.
298331
* @param processor Processor config.
299332
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
333+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
334+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
335+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
336+
* out (null).
300337
* @return true if a geoip processor is found in the processor list.
301338
*/
302339
@SuppressWarnings("unchecked")
303-
private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
340+
private static boolean hasAtLeastOneGeoipProcessor(
341+
Map<String, Object> processor,
342+
boolean downloadDatabaseOnPipelineCreation,
343+
Map<String, PipelineConfiguration> pipelineConfigById,
344+
Map<String, Boolean> pipelineHasGeoProcessorById
345+
) {
304346
if (processor == null) {
305347
return false;
306348
}
@@ -319,27 +361,51 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
319361
}
320362
}
321363

322-
return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation)
323-
|| isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation);
364+
return isProcessorWithOnFailureGeoIpProcessor(
365+
processor,
366+
downloadDatabaseOnPipelineCreation,
367+
pipelineConfigById,
368+
pipelineHasGeoProcessorById
369+
)
370+
|| isForeachProcessorWithGeoipProcessor(
371+
processor,
372+
downloadDatabaseOnPipelineCreation,
373+
pipelineConfigById,
374+
pipelineHasGeoProcessorById
375+
)
376+
|| isPipelineProcessorWithGeoIpProcessor(
377+
processor,
378+
downloadDatabaseOnPipelineCreation,
379+
pipelineConfigById,
380+
pipelineHasGeoProcessorById
381+
);
324382
}
325383

326384
/**
327385
* Check if a processor config has an on_failure clause containing at least a geoip processor.
328386
* @param processor Processor config.
329387
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
388+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
389+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
390+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
391+
* out (null).
330392
* @return true if a geoip processor is found in the processor list.
331393
*/
332394
@SuppressWarnings("unchecked")
333395
private static boolean isProcessorWithOnFailureGeoIpProcessor(
334396
Map<String, Object> processor,
335-
boolean downloadDatabaseOnPipelineCreation
397+
boolean downloadDatabaseOnPipelineCreation,
398+
Map<String, PipelineConfiguration> pipelineConfigById,
399+
Map<String, Boolean> pipelineHasGeoProcessorById
336400
) {
337401
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
338402
for (Object value : processor.values()) {
339403
if (value instanceof Map
340404
&& hasAtLeastOneGeoipProcessor(
341405
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
342-
downloadDatabaseOnPipelineCreation
406+
downloadDatabaseOnPipelineCreation,
407+
pipelineConfigById,
408+
pipelineHasGeoProcessorById
343409
)) {
344410
return true;
345411
}
@@ -351,13 +417,84 @@ && hasAtLeastOneGeoipProcessor(
351417
* Check if a processor is a foreach processor containing at least a geoip processor.
352418
* @param processor Processor config.
353419
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
420+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
421+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
422+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
423+
* out (null).
354424
* @return true if a geoip processor is found in the processor list.
355425
*/
356426
@SuppressWarnings("unchecked")
357-
private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
427+
private static boolean isForeachProcessorWithGeoipProcessor(
428+
Map<String, Object> processor,
429+
boolean downloadDatabaseOnPipelineCreation,
430+
Map<String, PipelineConfiguration> pipelineConfigById,
431+
Map<String, Boolean> pipelineHasGeoProcessorById
432+
) {
358433
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("foreach");
359434
return processorConfig != null
360-
&& hasAtLeastOneGeoipProcessor((Map<String, Object>) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation);
435+
&& hasAtLeastOneGeoipProcessor(
436+
(Map<String, Object>) processorConfig.get("processor"),
437+
downloadDatabaseOnPipelineCreation,
438+
pipelineConfigById,
439+
pipelineHasGeoProcessorById
440+
);
441+
}
442+
443+
/**
444+
* Check if a processor is a pipeline processor containing at least a geoip processor. This method also updates
445+
* pipelineHasGeoProcessorById with a result for any pipelines it looks at.
446+
* @param processor Processor config.
447+
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
448+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
449+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
450+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
451+
* out (null).
452+
* @return true if a geoip processor is found in the processors of this processor if this processor is a pipeline processor.
453+
*/
454+
@SuppressWarnings("unchecked")
455+
private static boolean isPipelineProcessorWithGeoIpProcessor(
456+
Map<String, Object> processor,
457+
boolean downloadDatabaseOnPipelineCreation,
458+
Map<String, PipelineConfiguration> pipelineConfigById,
459+
Map<String, Boolean> pipelineHasGeoProcessorById
460+
) {
461+
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("pipeline");
462+
if (processorConfig != null) {
463+
String pipelineName = (String) processorConfig.get("name");
464+
if (pipelineName != null) {
465+
if (pipelineHasGeoProcessorById.containsKey(pipelineName)) {
466+
if (pipelineHasGeoProcessorById.get(pipelineName) == null) {
467+
/*
468+
* If the value is null here, it indicates that this method has been called recursively with the same pipeline name.
469+
* This will cause a runtime error when the pipeline is executed, but we're avoiding changing existing behavior at
470+
* server startup time. Instead, we just bail out as quickly as possible. It is possible that this could lead to a
471+
* geo database not being downloaded for the pipeline, but it doesn't really matter since the pipeline was going to
472+
* fail anyway.
473+
*/
474+
pipelineHasGeoProcessorById.put(pipelineName, false);
475+
}
476+
} else {
477+
List<Map<String, Object>> childProcessors = null;
478+
PipelineConfiguration config = pipelineConfigById.get(pipelineName);
479+
if (config != null) {
480+
childProcessors = (List<Map<String, Object>>) config.getConfig().get(Pipeline.PROCESSORS_KEY);
481+
}
482+
// We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors:
483+
pipelineHasGeoProcessorById.put(pipelineName, null);
484+
pipelineHasGeoProcessorById.put(
485+
pipelineName,
486+
hasAtLeastOneGeoipProcessor(
487+
childProcessors,
488+
downloadDatabaseOnPipelineCreation,
489+
pipelineConfigById,
490+
pipelineHasGeoProcessorById
491+
)
492+
);
493+
}
494+
return pipelineHasGeoProcessorById.get(pipelineName);
495+
}
496+
}
497+
return false;
361498
}
362499

363500
private static final TimeValue MASTER_TIMEOUT = TimeValue.MAX_VALUE;

0 commit comments

Comments
 (0)