Skip to content

Commit c9e5770

Browse files
authored
Optimize downloader task executor (#115355)
1 parent 0ab79db commit c9e5770

File tree

2 files changed

+40
-30
lines changed

2 files changed

+40
-30
lines changed

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,14 @@
4343
import org.elasticsearch.threadpool.ThreadPool;
4444
import org.elasticsearch.transport.RemoteTransportException;
4545

46+
import java.util.Collections;
47+
import java.util.HashSet;
4648
import java.util.List;
4749
import java.util.Map;
4850
import java.util.Objects;
4951
import java.util.Set;
5052
import java.util.concurrent.atomic.AtomicBoolean;
5153
import java.util.concurrent.atomic.AtomicReference;
52-
import java.util.stream.Collectors;
5354

5455
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
5556
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
@@ -238,14 +239,11 @@ public void clusterChanged(ClusterChangedEvent event) {
238239
}
239240

240241
static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
241-
if (pipelineConfigurationsWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
242+
if (pipelinesWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
242243
return true;
243244
}
244245

245-
Set<String> checkReferencedPipelines = pipelineConfigurationsWithGeoIpProcessor(clusterState, false).stream()
246-
.map(PipelineConfiguration::getId)
247-
.collect(Collectors.toSet());
248-
246+
final Set<String> checkReferencedPipelines = pipelinesWithGeoIpProcessor(clusterState, false);
249247
if (checkReferencedPipelines.isEmpty()) {
250248
return false;
251249
}
@@ -258,22 +256,24 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
258256
}
259257

260258
/**
261-
* Retrieve list of pipelines that have at least one geoip processor.
259+
* Retrieve the set of pipeline ids that have at least one geoip processor.
262260
* @param clusterState Cluster state.
263261
* @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation
264262
* matching the param.
265-
* @return A list of {@link PipelineConfiguration} matching criteria.
263+
* @return A set of pipeline ids matching criteria.
266264
*/
267265
@SuppressWarnings("unchecked")
268-
private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProcessor(
269-
ClusterState clusterState,
270-
boolean downloadDatabaseOnPipelineCreation
271-
) {
272-
List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
273-
return pipelineDefinitions.stream().filter(pipelineConfig -> {
274-
List<Map<String, Object>> processors = (List<Map<String, Object>>) pipelineConfig.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
275-
return hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation);
276-
}).toList();
266+
private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) {
267+
List<PipelineConfiguration> configurations = IngestService.getPipelines(clusterState);
268+
Set<String> ids = new HashSet<>();
269+
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
270+
for (PipelineConfiguration configuration : configurations) {
271+
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
272+
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
273+
ids.add(configuration.getId());
274+
}
275+
}
276+
return Collections.unmodifiableSet(ids);
277277
}
278278

279279
/**
@@ -283,7 +283,15 @@ private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProces
283283
* @return true if a geoip processor is found in the processor list.
284284
*/
285285
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
286-
return processors != null && processors.stream().anyMatch(p -> hasAtLeastOneGeoipProcessor(p, downloadDatabaseOnPipelineCreation));
286+
if (processors != null) {
287+
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
288+
for (Map<String, Object> processor : processors) {
289+
if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) {
290+
return true;
291+
}
292+
}
293+
}
294+
return false;
287295
}
288296

289297
/**
@@ -317,7 +325,7 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
317325
}
318326

319327
/**
320-
* Check if a processor config is has an on_failure clause containing at least a geoip processor.
328+
* Check if a processor config has an on_failure clause containing at least a geoip processor.
321329
* @param processor Processor config.
322330
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
323331
* @return true if a geoip processor is found in the processor list.
@@ -327,16 +335,17 @@ private static boolean isProcessorWithOnFailureGeoIpProcessor(
327335
Map<String, Object> processor,
328336
boolean downloadDatabaseOnPipelineCreation
329337
) {
330-
return processor != null
331-
&& processor.values()
332-
.stream()
333-
.anyMatch(
334-
value -> value instanceof Map
335-
&& hasAtLeastOneGeoipProcessor(
336-
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
337-
downloadDatabaseOnPipelineCreation
338-
)
339-
);
338+
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
339+
for (Object value : processor.values()) {
340+
if (value instanceof Map
341+
&& hasAtLeastOneGeoipProcessor(
342+
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
343+
downloadDatabaseOnPipelineCreation
344+
)) {
345+
return true;
346+
}
347+
}
348+
return false;
340349
}
341350

342351
/**

server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.ElasticsearchException;
1313
import org.elasticsearch.ElasticsearchParseException;
1414
import org.elasticsearch.ExceptionsHelper;
15+
import org.elasticsearch.common.Strings;
1516
import org.elasticsearch.common.bytes.BytesReference;
1617
import org.elasticsearch.common.util.Maps;
1718
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
@@ -239,7 +240,7 @@ private static Boolean readBoolean(String processorType, String processorTag, St
239240
processorType,
240241
processorTag,
241242
propertyName,
242-
"property isn't a boolean, but of type [" + value.getClass().getName() + "]"
243+
Strings.format("property isn't a boolean, but of type [%s]", value.getClass().getName())
243244
);
244245
}
245246

0 commit comments

Comments
 (0)