2121import org .elasticsearch .cluster .ClusterChangedEvent ;
2222import org .elasticsearch .cluster .ClusterStateListener ;
2323import org .elasticsearch .cluster .metadata .IndexAbstraction ;
24+ import org .elasticsearch .cluster .metadata .IndexMetadata ;
2425import org .elasticsearch .cluster .metadata .ProjectId ;
2526import org .elasticsearch .cluster .metadata .ProjectMetadata ;
2627import org .elasticsearch .cluster .node .DiscoveryNode ;
4748import org .elasticsearch .transport .RemoteTransportException ;
4849
4950import java .util .Collections ;
51+ import java .util .HashMap ;
5052import java .util .HashSet ;
5153import java .util .List ;
5254import java .util .Map ;
@@ -280,11 +282,14 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) {
280282 return false ;
281283 }
282284
283- return projectMetadata .indices ().values (). stream (). anyMatch ( indexMetadata -> {
285+ for ( IndexMetadata indexMetadata : projectMetadata .indices ().values ()) {
284286 String defaultPipeline = IndexSettings .DEFAULT_PIPELINE .get (indexMetadata .getSettings ());
285287 String finalPipeline = IndexSettings .FINAL_PIPELINE .get (indexMetadata .getSettings ());
286- return checkReferencedPipelines .contains (defaultPipeline ) || checkReferencedPipelines .contains (finalPipeline );
287- });
288+ if (checkReferencedPipelines .contains (defaultPipeline ) || checkReferencedPipelines .contains (finalPipeline )) {
289+ return true ;
290+ }
291+ }
292+ return false ;
288293 }
289294
290295 /**
@@ -297,12 +302,26 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) {
297302 @ SuppressWarnings ("unchecked" )
298303 private static Set <String > pipelinesWithGeoIpProcessor (ProjectMetadata projectMetadata , boolean downloadDatabaseOnPipelineCreation ) {
299304 List <PipelineConfiguration > configurations = IngestService .getPipelines (projectMetadata );
305+ Map <String , PipelineConfiguration > pipelineConfigById = HashMap .newHashMap (configurations .size ());
306+ for (PipelineConfiguration configuration : configurations ) {
307+ pipelineConfigById .put (configuration .getId (), configuration );
308+ }
309+ // this map is used to keep track of pipelines that have already been checked
310+ Map <String , Boolean > pipelineHasGeoProcessorById = HashMap .newHashMap (configurations .size ());
300311 Set <String > ids = new HashSet <>();
301312 // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
302313 for (PipelineConfiguration configuration : configurations ) {
303314 List <Map <String , Object >> processors = (List <Map <String , Object >>) configuration .getConfig ().get (Pipeline .PROCESSORS_KEY );
304- if (hasAtLeastOneGeoipProcessor (processors , downloadDatabaseOnPipelineCreation )) {
305- ids .add (configuration .getId ());
315+ String pipelineName = configuration .getId ();
316+ if (pipelineHasGeoProcessorById .containsKey (pipelineName ) == false ) {
317+ if (hasAtLeastOneGeoipProcessor (
318+ processors ,
319+ downloadDatabaseOnPipelineCreation ,
320+ pipelineConfigById ,
321+ pipelineHasGeoProcessorById
322+ )) {
323+ ids .add (pipelineName );
324+ }
306325 }
307326 }
308327 return Collections .unmodifiableSet (ids );
@@ -312,13 +331,27 @@ private static Set<String> pipelinesWithGeoIpProcessor(ProjectMetadata projectMe
312331 * Check if a list of processor contains at least a geoip processor.
313332 * @param processors List of processors.
314333 * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
334+ * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
335+ * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
336+ * (true), does not reference a geoip processor (false), or we are currently trying to figure that
337+ * out (null).
315338 * @return true if a geoip processor is found in the processor list.
316339 */
317- private static boolean hasAtLeastOneGeoipProcessor (List <Map <String , Object >> processors , boolean downloadDatabaseOnPipelineCreation ) {
340+ private static boolean hasAtLeastOneGeoipProcessor (
341+ List <Map <String , Object >> processors ,
342+ boolean downloadDatabaseOnPipelineCreation ,
343+ Map <String , PipelineConfiguration > pipelineConfigById ,
344+ Map <String , Boolean > pipelineHasGeoProcessorById
345+ ) {
318346 if (processors != null ) {
319347 // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
320348 for (Map <String , Object > processor : processors ) {
321- if (hasAtLeastOneGeoipProcessor (processor , downloadDatabaseOnPipelineCreation )) {
349+ if (hasAtLeastOneGeoipProcessor (
350+ processor ,
351+ downloadDatabaseOnPipelineCreation ,
352+ pipelineConfigById ,
353+ pipelineHasGeoProcessorById
354+ )) {
322355 return true ;
323356 }
324357 }
@@ -330,10 +363,19 @@ private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> pro
330363 * Check if a processor config is a geoip processor or contains at least a geoip processor.
331364 * @param processor Processor config.
332365 * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
366+ * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
367+ * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
368+ * (true), does not reference a geoip processor (false), or we are currently trying to figure that
369+ * out (null).
333370 * @return true if a geoip processor is found in the processor list.
334371 */
335372 @ SuppressWarnings ("unchecked" )
336- private static boolean hasAtLeastOneGeoipProcessor (Map <String , Object > processor , boolean downloadDatabaseOnPipelineCreation ) {
373+ private static boolean hasAtLeastOneGeoipProcessor (
374+ Map <String , Object > processor ,
375+ boolean downloadDatabaseOnPipelineCreation ,
376+ Map <String , PipelineConfiguration > pipelineConfigById ,
377+ Map <String , Boolean > pipelineHasGeoProcessorById
378+ ) {
337379 if (processor == null ) {
338380 return false ;
339381 }
@@ -352,27 +394,51 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
352394 }
353395 }
354396
355- return isProcessorWithOnFailureGeoIpProcessor (processor , downloadDatabaseOnPipelineCreation )
356- || isForeachProcessorWithGeoipProcessor (processor , downloadDatabaseOnPipelineCreation );
397+ return isProcessorWithOnFailureGeoIpProcessor (
398+ processor ,
399+ downloadDatabaseOnPipelineCreation ,
400+ pipelineConfigById ,
401+ pipelineHasGeoProcessorById
402+ )
403+ || isForeachProcessorWithGeoipProcessor (
404+ processor ,
405+ downloadDatabaseOnPipelineCreation ,
406+ pipelineConfigById ,
407+ pipelineHasGeoProcessorById
408+ )
409+ || isPipelineProcessorWithGeoIpProcessor (
410+ processor ,
411+ downloadDatabaseOnPipelineCreation ,
412+ pipelineConfigById ,
413+ pipelineHasGeoProcessorById
414+ );
357415 }
358416
359417 /**
360418 * Check if a processor config has an on_failure clause containing at least a geoip processor.
361419 * @param processor Processor config.
362420 * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
421+ * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
422+ * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
423+ * (true), does not reference a geoip processor (false), or we are currently trying to figure that
424+ * out (null).
363425 * @return true if a geoip processor is found in the processor list.
364426 */
365427 @ SuppressWarnings ("unchecked" )
366428 private static boolean isProcessorWithOnFailureGeoIpProcessor (
367429 Map <String , Object > processor ,
368- boolean downloadDatabaseOnPipelineCreation
430+ boolean downloadDatabaseOnPipelineCreation ,
431+ Map <String , PipelineConfiguration > pipelineConfigById ,
432+ Map <String , Boolean > pipelineHasGeoProcessorById
369433 ) {
370434 // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
371435 for (Object value : processor .values ()) {
372436 if (value instanceof Map
373437 && hasAtLeastOneGeoipProcessor (
374438 ((Map <String , List <Map <String , Object >>>) value ).get ("on_failure" ),
375- downloadDatabaseOnPipelineCreation
439+ downloadDatabaseOnPipelineCreation ,
440+ pipelineConfigById ,
441+ pipelineHasGeoProcessorById
376442 )) {
377443 return true ;
378444 }
@@ -384,13 +450,84 @@ && hasAtLeastOneGeoipProcessor(
384450 * Check if a processor is a foreach processor containing at least a geoip processor.
385451 * @param processor Processor config.
386452 * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
453+ * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
454+ * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
455+ * (true), does not reference a geoip processor (false), or we are currently trying to figure that
456+ * out (null).
387457 * @return true if a geoip processor is found in the processor list.
388458 */
389459 @ SuppressWarnings ("unchecked" )
390- private static boolean isForeachProcessorWithGeoipProcessor (Map <String , Object > processor , boolean downloadDatabaseOnPipelineCreation ) {
460+ private static boolean isForeachProcessorWithGeoipProcessor (
461+ Map <String , Object > processor ,
462+ boolean downloadDatabaseOnPipelineCreation ,
463+ Map <String , PipelineConfiguration > pipelineConfigById ,
464+ Map <String , Boolean > pipelineHasGeoProcessorById
465+ ) {
391466 final Map <String , Object > processorConfig = (Map <String , Object >) processor .get ("foreach" );
392467 return processorConfig != null
393- && hasAtLeastOneGeoipProcessor ((Map <String , Object >) processorConfig .get ("processor" ), downloadDatabaseOnPipelineCreation );
468+ && hasAtLeastOneGeoipProcessor (
469+ (Map <String , Object >) processorConfig .get ("processor" ),
470+ downloadDatabaseOnPipelineCreation ,
471+ pipelineConfigById ,
472+ pipelineHasGeoProcessorById
473+ );
474+ }
475+
476+ /**
477+ * Check if a processor is a pipeline processor containing at least a geoip processor. This method also updates
478+ * pipelineHasGeoProcessorById with a result for any pipelines it looks at.
479+ * @param processor Processor config.
480+ * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
481+ * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
482+ * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
483+ * (true), does not reference a geoip processor (false), or we are currently trying to figure that
484+ * out (null).
485+ * @return true if a geoip processor is found in the processors of this processor if this processor is a pipeline processor.
486+ */
487+ @ SuppressWarnings ("unchecked" )
488+ private static boolean isPipelineProcessorWithGeoIpProcessor (
489+ Map <String , Object > processor ,
490+ boolean downloadDatabaseOnPipelineCreation ,
491+ Map <String , PipelineConfiguration > pipelineConfigById ,
492+ Map <String , Boolean > pipelineHasGeoProcessorById
493+ ) {
494+ final Map <String , Object > processorConfig = (Map <String , Object >) processor .get ("pipeline" );
495+ if (processorConfig != null ) {
496+ String pipelineName = (String ) processorConfig .get ("name" );
497+ if (pipelineName != null ) {
498+ if (pipelineHasGeoProcessorById .containsKey (pipelineName )) {
499+ if (pipelineHasGeoProcessorById .get (pipelineName ) == null ) {
500+ /*
501+ * If the value is null here, it indicates that this method has been called recursively with the same pipeline name.
502+ * This will cause a runtime error when the pipeline is executed, but we're avoiding changing existing behavior at
503+ * server startup time. Instead, we just bail out as quickly as possible. It is possible that this could lead to a
504+ * geo database not being downloaded for the pipeline, but it doesn't really matter since the pipeline was going to
505+ * fail anyway.
506+ */
507+ pipelineHasGeoProcessorById .put (pipelineName , false );
508+ }
509+ } else {
510+ List <Map <String , Object >> childProcessors = null ;
511+ PipelineConfiguration config = pipelineConfigById .get (pipelineName );
512+ if (config != null ) {
513+ childProcessors = (List <Map <String , Object >>) config .getConfig ().get (Pipeline .PROCESSORS_KEY );
514+ }
515+ // We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors:
516+ pipelineHasGeoProcessorById .put (pipelineName , null );
517+ pipelineHasGeoProcessorById .put (
518+ pipelineName ,
519+ hasAtLeastOneGeoipProcessor (
520+ childProcessors ,
521+ downloadDatabaseOnPipelineCreation ,
522+ pipelineConfigById ,
523+ pipelineHasGeoProcessorById
524+ )
525+ );
526+ }
527+ return pipelineHasGeoProcessorById .get (pipelineName );
528+ }
529+ }
530+ return false ;
394531 }
395532
396533 // starts GeoIP downloader task for a single project
0 commit comments