2121import org .elasticsearch .cluster .ClusterState ;
2222import org .elasticsearch .cluster .ClusterStateListener ;
2323import org .elasticsearch .cluster .metadata .IndexAbstraction ;
24+ import org .elasticsearch .cluster .metadata .IndexMetadata ;
2425import org .elasticsearch .cluster .node .DiscoveryNode ;
2526import org .elasticsearch .cluster .service .ClusterService ;
2627import org .elasticsearch .common .settings .Setting ;
2728import org .elasticsearch .common .settings .Settings ;
29+ import org .elasticsearch .common .util .Maps ;
2830import org .elasticsearch .core .TimeValue ;
2931import org .elasticsearch .core .UpdateForV9 ;
3032import org .elasticsearch .gateway .GatewayService ;
@@ -248,11 +250,14 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
248250 return false ;
249251 }
250252
251- return clusterState .getMetadata ().indices ().values (). stream (). anyMatch ( indexMetadata -> {
253+ for ( IndexMetadata indexMetadata : clusterState .getMetadata ().indices ().values ()) {
252254 String defaultPipeline = IndexSettings .DEFAULT_PIPELINE .get (indexMetadata .getSettings ());
253255 String finalPipeline = IndexSettings .FINAL_PIPELINE .get (indexMetadata .getSettings ());
254- return checkReferencedPipelines .contains (defaultPipeline ) || checkReferencedPipelines .contains (finalPipeline );
255- });
256+ if (checkReferencedPipelines .contains (defaultPipeline ) || checkReferencedPipelines .contains (finalPipeline )) {
257+ return true ;
258+ }
259+ }
260+ return false ;
256261 }
257262
258263 /**
@@ -265,12 +270,26 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
265270 @ SuppressWarnings ("unchecked" )
266271 private static Set <String > pipelinesWithGeoIpProcessor (ClusterState clusterState , boolean downloadDatabaseOnPipelineCreation ) {
267272 List <PipelineConfiguration > configurations = IngestService .getPipelines (clusterState );
273+ Map <String , PipelineConfiguration > pipelineConfigById = Maps .newHashMapWithExpectedSize (configurations .size ());
274+ for (PipelineConfiguration configuration : configurations ) {
275+ pipelineConfigById .put (configuration .getId (), configuration );
276+ }
277+ // this map is used to keep track of pipelines that have already been checked
278+ Map <String , Boolean > pipelineHasGeoProcessorById = Maps .newHashMapWithExpectedSize (configurations .size ());
268279 Set <String > ids = new HashSet <>();
269280 // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
270281 for (PipelineConfiguration configuration : configurations ) {
271282 List <Map <String , Object >> processors = (List <Map <String , Object >>) configuration .getConfig ().get (Pipeline .PROCESSORS_KEY );
272- if (hasAtLeastOneGeoipProcessor (processors , downloadDatabaseOnPipelineCreation )) {
273- ids .add (configuration .getId ());
283+ String pipelineName = configuration .getId ();
284+ if (pipelineHasGeoProcessorById .containsKey (pipelineName ) == false ) {
285+ if (hasAtLeastOneGeoipProcessor (
286+ processors ,
287+ downloadDatabaseOnPipelineCreation ,
288+ pipelineConfigById ,
289+ pipelineHasGeoProcessorById
290+ )) {
291+ ids .add (pipelineName );
292+ }
274293 }
275294 }
276295 return Collections .unmodifiableSet (ids );
@@ -280,13 +299,27 @@ private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState
280299 * Check if a list of processor contains at least a geoip processor.
281300 * @param processors List of processors.
282301 * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
302+ * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
303+ * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
304+ * (true), does not reference a geoip processor (false), or we are currently trying to figure that
305+ * out (null).
283306 * @return true if a geoip processor is found in the processor list.
284307 */
285- private static boolean hasAtLeastOneGeoipProcessor (List <Map <String , Object >> processors , boolean downloadDatabaseOnPipelineCreation ) {
308+ private static boolean hasAtLeastOneGeoipProcessor (
309+ List <Map <String , Object >> processors ,
310+ boolean downloadDatabaseOnPipelineCreation ,
311+ Map <String , PipelineConfiguration > pipelineConfigById ,
312+ Map <String , Boolean > pipelineHasGeoProcessorById
313+ ) {
286314 if (processors != null ) {
287315 // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
288316 for (Map <String , Object > processor : processors ) {
289- if (hasAtLeastOneGeoipProcessor (processor , downloadDatabaseOnPipelineCreation )) {
317+ if (hasAtLeastOneGeoipProcessor (
318+ processor ,
319+ downloadDatabaseOnPipelineCreation ,
320+ pipelineConfigById ,
321+ pipelineHasGeoProcessorById
322+ )) {
290323 return true ;
291324 }
292325 }
@@ -298,10 +331,19 @@ private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> pro
298331 * Check if a processor config is a geoip processor or contains at least a geoip processor.
299332 * @param processor Processor config.
300333 * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
334+ * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
335+ * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
336+ * (true), does not reference a geoip processor (false), or we are currently trying to figure that
337+ * out (null).
301338 * @return true if a geoip processor is found in the processor list.
302339 */
303340 @ SuppressWarnings ("unchecked" )
304- private static boolean hasAtLeastOneGeoipProcessor (Map <String , Object > processor , boolean downloadDatabaseOnPipelineCreation ) {
341+ private static boolean hasAtLeastOneGeoipProcessor (
342+ Map <String , Object > processor ,
343+ boolean downloadDatabaseOnPipelineCreation ,
344+ Map <String , PipelineConfiguration > pipelineConfigById ,
345+ Map <String , Boolean > pipelineHasGeoProcessorById
346+ ) {
305347 if (processor == null ) {
306348 return false ;
307349 }
@@ -320,27 +362,51 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
320362 }
321363 }
322364
323- return isProcessorWithOnFailureGeoIpProcessor (processor , downloadDatabaseOnPipelineCreation )
324- || isForeachProcessorWithGeoipProcessor (processor , downloadDatabaseOnPipelineCreation );
365+ return isProcessorWithOnFailureGeoIpProcessor (
366+ processor ,
367+ downloadDatabaseOnPipelineCreation ,
368+ pipelineConfigById ,
369+ pipelineHasGeoProcessorById
370+ )
371+ || isForeachProcessorWithGeoipProcessor (
372+ processor ,
373+ downloadDatabaseOnPipelineCreation ,
374+ pipelineConfigById ,
375+ pipelineHasGeoProcessorById
376+ )
377+ || isPipelineProcessorWithGeoIpProcessor (
378+ processor ,
379+ downloadDatabaseOnPipelineCreation ,
380+ pipelineConfigById ,
381+ pipelineHasGeoProcessorById
382+ );
325383 }
326384
327385 /**
328386 * Check if a processor config has an on_failure clause containing at least a geoip processor.
329387 * @param processor Processor config.
330388 * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
389+ * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
390+ * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
391+ * (true), does not reference a geoip processor (false), or we are currently trying to figure that
392+ * out (null).
331393 * @return true if a geoip processor is found in the processor list.
332394 */
333395 @ SuppressWarnings ("unchecked" )
334396 private static boolean isProcessorWithOnFailureGeoIpProcessor (
335397 Map <String , Object > processor ,
336- boolean downloadDatabaseOnPipelineCreation
398+ boolean downloadDatabaseOnPipelineCreation ,
399+ Map <String , PipelineConfiguration > pipelineConfigById ,
400+ Map <String , Boolean > pipelineHasGeoProcessorById
337401 ) {
338402 // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
339403 for (Object value : processor .values ()) {
340404 if (value instanceof Map
341405 && hasAtLeastOneGeoipProcessor (
342406 ((Map <String , List <Map <String , Object >>>) value ).get ("on_failure" ),
343- downloadDatabaseOnPipelineCreation
407+ downloadDatabaseOnPipelineCreation ,
408+ pipelineConfigById ,
409+ pipelineHasGeoProcessorById
344410 )) {
345411 return true ;
346412 }
@@ -352,13 +418,84 @@ && hasAtLeastOneGeoipProcessor(
352418 * Check if a processor is a foreach processor containing at least a geoip processor.
353419 * @param processor Processor config.
354420 * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
421+ * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
422+ * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
423+ * (true), does not reference a geoip processor (false), or we are currently trying to figure that
424+ * out (null).
355425 * @return true if a geoip processor is found in the processor list.
356426 */
357427 @ SuppressWarnings ("unchecked" )
358- private static boolean isForeachProcessorWithGeoipProcessor (Map <String , Object > processor , boolean downloadDatabaseOnPipelineCreation ) {
428+ private static boolean isForeachProcessorWithGeoipProcessor (
429+ Map <String , Object > processor ,
430+ boolean downloadDatabaseOnPipelineCreation ,
431+ Map <String , PipelineConfiguration > pipelineConfigById ,
432+ Map <String , Boolean > pipelineHasGeoProcessorById
433+ ) {
359434 final Map <String , Object > processorConfig = (Map <String , Object >) processor .get ("foreach" );
360435 return processorConfig != null
361- && hasAtLeastOneGeoipProcessor ((Map <String , Object >) processorConfig .get ("processor" ), downloadDatabaseOnPipelineCreation );
436+ && hasAtLeastOneGeoipProcessor (
437+ (Map <String , Object >) processorConfig .get ("processor" ),
438+ downloadDatabaseOnPipelineCreation ,
439+ pipelineConfigById ,
440+ pipelineHasGeoProcessorById
441+ );
442+ }
443+
444+ /**
445+ * Check if a processor is a pipeline processor containing at least a geoip processor. This method also updates
446+ * pipelineHasGeoProcessorById with a result for any pipelines it looks at.
447+ * @param processor Processor config.
448+ * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
449+ * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
450+ * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
451+ * (true), does not reference a geoip processor (false), or we are currently trying to figure that
452+ * out (null).
453+ * @return true if a geoip processor is found in the processors of this processor if this processor is a pipeline processor.
454+ */
455+ @ SuppressWarnings ("unchecked" )
456+ private static boolean isPipelineProcessorWithGeoIpProcessor (
457+ Map <String , Object > processor ,
458+ boolean downloadDatabaseOnPipelineCreation ,
459+ Map <String , PipelineConfiguration > pipelineConfigById ,
460+ Map <String , Boolean > pipelineHasGeoProcessorById
461+ ) {
462+ final Map <String , Object > processorConfig = (Map <String , Object >) processor .get ("pipeline" );
463+ if (processorConfig != null ) {
464+ String pipelineName = (String ) processorConfig .get ("name" );
465+ if (pipelineName != null ) {
466+ if (pipelineHasGeoProcessorById .containsKey (pipelineName )) {
467+ if (pipelineHasGeoProcessorById .get (pipelineName ) == null ) {
468+ /*
469+ * If the value is null here, it indicates that this method has been called recursively with the same pipeline name.
470+ * This will cause a runtime error when the pipeline is executed, but we're avoiding changing existing behavior at
471+ * server startup time. Instead, we just bail out as quickly as possible. It is possible that this could lead to a
472+ * geo database not being downloaded for the pipeline, but it doesn't really matter since the pipeline was going to
473+ * fail anyway.
474+ */
475+ pipelineHasGeoProcessorById .put (pipelineName , false );
476+ }
477+ } else {
478+ List <Map <String , Object >> childProcessors = null ;
479+ PipelineConfiguration config = pipelineConfigById .get (pipelineName );
480+ if (config != null ) {
481+ childProcessors = (List <Map <String , Object >>) config .getConfig ().get (Pipeline .PROCESSORS_KEY );
482+ }
483+ // We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors:
484+ pipelineHasGeoProcessorById .put (pipelineName , null );
485+ pipelineHasGeoProcessorById .put (
486+ pipelineName ,
487+ hasAtLeastOneGeoipProcessor (
488+ childProcessors ,
489+ downloadDatabaseOnPipelineCreation ,
490+ pipelineConfigById ,
491+ pipelineHasGeoProcessorById
492+ )
493+ );
494+ }
495+ return pipelineHasGeoProcessorById .get (pipelineName );
496+ }
497+ }
498+ return false ;
362499 }
363500
364501 @ UpdateForV9 // use MINUS_ONE once that means no timeout
0 commit comments