Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/131236.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 131236
summary: Correctly handling `download_database_on_pipeline_creation` within a pipeline
processor within a default or final pipeline
area: Ingest Node
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,17 @@
import org.elasticsearch.transport.RemoteTransportException;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
Expand Down Expand Up @@ -297,12 +301,23 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) {
@SuppressWarnings("unchecked")
private static Set<String> pipelinesWithGeoIpProcessor(ProjectMetadata projectMetadata, boolean downloadDatabaseOnPipelineCreation) {
List<PipelineConfiguration> configurations = IngestService.getPipelines(projectMetadata);
Map<String, PipelineConfiguration> pipelineConfigById = configurations.stream()
.collect(Collectors.toMap(PipelineConfiguration::getId, Function.identity()));
Map<String, Boolean> pipelineHasGeoProcessorById = new HashMap<>(); // used to keep track of pipelines we've checked before
Set<String> ids = new HashSet<>();
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (PipelineConfiguration configuration : configurations) {
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY);
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
ids.add(configuration.getId());
String pipelineName = configuration.getId();
if (pipelineHasGeoProcessorById.containsKey(pipelineName) == false) {
if (hasAtLeastOneGeoipProcessor(
processors,
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
)) {
ids.add(pipelineName);
}
}
}
return Collections.unmodifiableSet(ids);
Expand All @@ -312,13 +327,27 @@ private static Set<String> pipelinesWithGeoIpProcessor(ProjectMetadata projectMe
* Check if a list of processor contains at least a geoip processor.
* @param processors List of processors.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
* out (null).
* @return true if a geoip processor is found in the processor list.
*/
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
private static boolean hasAtLeastOneGeoipProcessor(
List<Map<String, Object>> processors,
boolean downloadDatabaseOnPipelineCreation,
Map<String, PipelineConfiguration> pipelineConfigById,
Map<String, Boolean> pipelineHasGeoProcessorById
) {
if (processors != null) {
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (Map<String, Object> processor : processors) {
if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) {
if (hasAtLeastOneGeoipProcessor(
processor,
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
)) {
return true;
}
}
Expand All @@ -330,10 +359,19 @@ private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> pro
* Check if a processor config is a geoip processor or contains at least a geoip processor.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
* out (null).
* @return true if a geoip processor is found in the processor list.
*/
@SuppressWarnings("unchecked")
private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
private static boolean hasAtLeastOneGeoipProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation,
Map<String, PipelineConfiguration> pipelineConfigById,
Map<String, Boolean> pipelineHasGeoProcessorById
) {
if (processor == null) {
return false;
}
Expand All @@ -352,27 +390,51 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
}
}

return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation)
|| isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation);
return isProcessorWithOnFailureGeoIpProcessor(
processor,
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
)
|| isForeachProcessorWithGeoipProcessor(
processor,
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
)
|| isPipelineProcessorWithGeoIpProcessor(
processor,
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
);
}

/**
* Check if a processor config has an on_failure clause containing at least a geoip processor.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
* out (null).
* @return true if a geoip processor is found in the processor list.
*/
@SuppressWarnings("unchecked")
private static boolean isProcessorWithOnFailureGeoIpProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation
boolean downloadDatabaseOnPipelineCreation,
Map<String, PipelineConfiguration> pipelineConfigById,
Map<String, Boolean> pipelineHasGeoProcessorById
) {
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (Object value : processor.values()) {
if (value instanceof Map
&& hasAtLeastOneGeoipProcessor(
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
downloadDatabaseOnPipelineCreation
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
)) {
return true;
}
Expand All @@ -384,13 +446,85 @@ && hasAtLeastOneGeoipProcessor(
* Check if a processor is a foreach processor containing at least a geoip processor.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
* out (null).
* @return true if a geoip processor is found in the processor list.
*/
@SuppressWarnings("unchecked")
private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
private static boolean isForeachProcessorWithGeoipProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation,
Map<String, PipelineConfiguration> pipelineConfigById,
Map<String, Boolean> pipelineHasGeoProcessorById
) {
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("foreach");
return processorConfig != null
&& hasAtLeastOneGeoipProcessor((Map<String, Object>) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation);
&& hasAtLeastOneGeoipProcessor(
(Map<String, Object>) processorConfig.get("processor"),
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
);
}

/**
* Check if a processor is a pipeline processor containing at least a geoip processor. This method also updates
* pipelineHasGeoProcessorById with a result for any pipelines it looks at.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
* out (null).
* @return true if a geoip processor is found in the processors of this processor if this processor is a pipeline processor.
*/
@SuppressWarnings("unchecked")
private static boolean isPipelineProcessorWithGeoIpProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation,
Map<String, PipelineConfiguration> pipelineConfigById,
Map<String, Boolean> pipelineHasGeoProcessorById
) {
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("pipeline");
if (processorConfig != null) {
String pipelineName = (String) processorConfig.get("name");
if (pipelineName != null) {
if (pipelineHasGeoProcessorById.containsKey(pipelineName)) {
if (pipelineHasGeoProcessorById.get(pipelineName) == null) {
/*
* If the value is null here, it indicates that this method has been called recursively with the same pipeline name.
* This will cause a runtime error when the pipeline is executed, but we're avoiding changing existing behavior at
* server startup time. Instead, we just log the problem and bail out as quickly as possible. It is possible that
* this could lead to a geo database not being downloaded for the pipeline, but it doesn't really matter since the
* pipeline was going to fail anyway.
*/
logger.warn("Detected that pipeline [{}] is called recursively.", pipelineName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ends up being too verbose in practice, so I think the logging has to go.

pipelineHasGeoProcessorById.put(pipelineName, false);
}
} else {
List<Map<String, Object>> childProcessors = Optional.ofNullable((String) processorConfig.get("name"))
.map(pipelineConfigById::get)
.map(PipelineConfiguration::getConfig)
.map(config -> (List<Map<String, Object>>) config.get(Pipeline.PROCESSORS_KEY))
.orElse(Collections.emptyList());
// We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors:
pipelineHasGeoProcessorById.put(pipelineName, null);
pipelineHasGeoProcessorById.put(
pipelineName,
hasAtLeastOneGeoipProcessor(
childProcessors,
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
)
);
}
return pipelineHasGeoProcessorById.get(pipelineName);
}
}
return false;
}

// starts GeoIP downloader task for a single project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,101 @@ public void testHasAtLeastOneGeoipProcessorWhenDownloadDatabaseOnPipelineCreatio

}

/*
* This tests that if a default or final pipeline has a pipeline processor that has a geoip processor that has
* download_database_on_pipeline_creation set to false, then we will correctly acknowledge that the pipeline has a geoip processor so
* that we download it appropriately.
*/
public void testHasAtLeastOneGeoipProcessorInPipelineProcessorWhenDownloadDatabaseOnPipelineCreationIsFalse() throws IOException {
String innerInnerPipelineJson = """
{
"processors":[""" + getGeoIpProcessor(false) + """
]
}
""";
String innerPipelineJson = """
{
"processors":[{"pipeline": {"name": "innerInnerPipeline"}}
]
}
""";
String outerPipelineJson = """
{
"processors":[{"pipeline": {"name": "innerPipeline"}}
]
}
""";
IngestMetadata ingestMetadata = new IngestMetadata(
Map.of(
"innerInnerPipeline",
new PipelineConfiguration("innerInnerPipeline", new BytesArray(innerInnerPipelineJson), XContentType.JSON),
"innerPipeline",
new PipelineConfiguration("innerPipeline", new BytesArray(innerPipelineJson), XContentType.JSON),
"outerPipeline",
new PipelineConfiguration("outerPipeline", new BytesArray(outerPipelineJson), XContentType.JSON)
)
);
// The pipeline is not used in any index, expected to return false.
var projectMetadata = projectMetadataWithIndex(b -> {}, ingestMetadata);
assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata));

// The pipeline is set as default pipeline in an index, expected to return true.
projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "outerPipeline"), ingestMetadata);
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata));

// The pipeline is set as final pipeline in an index, expected to return true.
projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.FINAL_PIPELINE.getKey(), "outerPipeline"), ingestMetadata);
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata));
}

public void testHasAtLeastOneGeoipProcessorRecursion() throws IOException {
/*
* The pipeline in this test is invalid -- it has a cycle from outerPipeline -> innerPipeline -> innerInnerPipeline ->
* innerPipeline. Since this method is called at server startup, we want to make sure that we don't get a StackOverFlowError and
* that we don't throw any kind of validation exception (since that would be an unexpected change of behavior).
*/
String innerInnerPipelineJson = """
{
"processors":[""" + getGeoIpProcessor(false) + """
, {"pipeline": {"name": "innerPipeline"}}
]
}
""";
String innerPipelineJson = """
{
"processors":[{"pipeline": {"name": "innerInnerPipeline"}}
]
}
""";
String outerPipelineJson = """
{
"processors":[{"pipeline": {"name": "innerPipeline"}}
]
}
""";
IngestMetadata ingestMetadata = new IngestMetadata(
Map.of(
"innerInnerPipeline",
new PipelineConfiguration("innerInnerPipeline", new BytesArray(innerInnerPipelineJson), XContentType.JSON),
"innerPipeline",
new PipelineConfiguration("innerPipeline", new BytesArray(innerPipelineJson), XContentType.JSON),
"outerPipeline",
new PipelineConfiguration("outerPipeline", new BytesArray(outerPipelineJson), XContentType.JSON)
)
);
// The pipeline is not used in any index, expected to return false.
var projectMetadata = projectMetadataWithIndex(b -> {}, ingestMetadata);
assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata));

// The pipeline is set as default pipeline in an index, expected to return true.
projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "outerPipeline"), ingestMetadata);
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata));

// The pipeline is set as final pipeline in an index, expected to return true.
projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.FINAL_PIPELINE.getKey(), "outerPipeline"), ingestMetadata);
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata));
}

public void testHasAtLeastOneGeoipProcessor() throws IOException {
var projectId = Metadata.DEFAULT_PROJECT_ID;
List<String> expectHitsInputs = getPipelinesWithGeoIpProcessors(true);
Expand Down
Loading