From 9d9b6af14d5c3f9ff4cdcc72b3f488e2d2c563c8 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Fri, 3 Jan 2025 14:51:08 +0200 Subject: [PATCH 01/51] unmute tests --- muted-tests.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index ec47ec92bbac6..3e104e8a5b0bf 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -163,12 +163,6 @@ tests: - class: org.elasticsearch.xpack.ml.integration.RegressionIT method: testTwoJobsWithSameRandomizeSeedUseSameTrainingSet issue: https://github.com/elastic/elasticsearch/issues/117805 -- class: org.elasticsearch.upgrades.QueryBuilderBWCIT - method: testQueryBuilderBWC {cluster=UPGRADED} - issue: https://github.com/elastic/elasticsearch/issues/116990 -- class: org.elasticsearch.xpack.restart.QueryBuilderBWCIT - method: testQueryBuilderBWC {p0=UPGRADED} - issue: https://github.com/elastic/elasticsearch/issues/116989 - class: org.elasticsearch.xpack.remotecluster.CrossClusterEsqlRCS2UnavailableRemotesIT method: testEsqlRcs2UnavailableRemoteScenarios issue: https://github.com/elastic/elasticsearch/issues/117419 From 02ddf8205360b818ba18beba96dc457a6843e70e Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Fri, 3 Jan 2025 14:53:03 +0200 Subject: [PATCH 02/51] revert --- muted-tests.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/muted-tests.yml b/muted-tests.yml index 3e104e8a5b0bf..ec47ec92bbac6 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -163,6 +163,12 @@ tests: - class: org.elasticsearch.xpack.ml.integration.RegressionIT method: testTwoJobsWithSameRandomizeSeedUseSameTrainingSet issue: https://github.com/elastic/elasticsearch/issues/117805 +- class: org.elasticsearch.upgrades.QueryBuilderBWCIT + method: testQueryBuilderBWC {cluster=UPGRADED} + issue: https://github.com/elastic/elasticsearch/issues/116990 +- class: org.elasticsearch.xpack.restart.QueryBuilderBWCIT + method: testQueryBuilderBWC {p0=UPGRADED} + issue: https://github.com/elastic/elasticsearch/issues/116989 - class: org.elasticsearch.xpack.remotecluster.CrossClusterEsqlRCS2UnavailableRemotesIT method: testEsqlRcs2UnavailableRemoteScenarios issue: https://github.com/elastic/elasticsearch/issues/117419 From 2404c818757a7dc9c9b3ac479192169445e3215f Mon Sep 17 00:00:00 2001 From: piergm <134913285+piergm@users.noreply.github.com> Date: Mon, 10 Feb 2025 17:00:27 +0100 Subject: [PATCH 03/51] WIP --- ...utionTimeTrackingEsThreadPoolExecutor.java | 24 +++-- ...eTrackingPerIndexEsThreadPoolExecutor.java | 96 +++++++++++++++++++ .../elasticsearch/search/SearchService.java | 24 +++-- 3 files changed, 127 insertions(+), 17 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index bcef86f00b2a4..d4a57cbd40359 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -25,7 +25,7 @@ /** * An extension to thread pool executor, which tracks statistics for the task execution time. */ -public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor { +public class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor { private final Function runnableWrapper; private final ExponentiallyWeightedMovingAverage executionEWMA; @@ -115,15 +115,23 @@ protected void afterExecute(Runnable r, Throwable t) { + failedOrRejected; if (taskExecutionNanos != -1) { // taskExecutionNanos may be -1 if the task threw an exception - executionEWMA.addValue(taskExecutionNanos); - totalExecutionTime.add(taskExecutionNanos); + trackExecutionTime(r, taskExecutionNanos); } } finally { - // if trackOngoingTasks is false -> ongoingTasks must be empty - assert trackOngoingTasks || ongoingTasks.isEmpty(); - if (trackOngoingTasks) { - ongoingTasks.remove(r); - } + removeTrackedTask(r); + } + } + + protected void trackExecutionTime(Runnable r, long taskTime) { + executionEWMA.addValue(taskTime); + totalExecutionTime.add(taskTime); + } + + protected void removeTrackedTask(Runnable r) { + // if trackOngoingTasks is false -> ongoingTasks must be empty + assert trackOngoingTasks || ongoingTasks.isEmpty(); + if (trackOngoingTasks) { + ongoingTasks.remove(r); } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java new file mode 100644 index 0000000000000..b3b041a2be578 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.util.concurrent; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; + +// TODO MP add java doc +public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskExecutionTimeTrackingEsThreadPoolExecutor { + private final ConcurrentHashMap indexExecutionTime; + private final Map runnableToIndexName; + // TODO MP do we need also EWMA per-index or per-project? + + TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor( + String name, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + Function runnableWrapper, + ThreadFactory threadFactory, + RejectedExecutionHandler handler, + ThreadContext contextHolder, + EsExecutors.TaskTrackingConfig trackingConfig + ) { + super( + name, + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + workQueue, + runnableWrapper, + threadFactory, + handler, + contextHolder, + trackingConfig + ); + indexExecutionTime = new ConcurrentHashMap<>(); + runnableToIndexName = new HashMap<>(); + } + + public long getSearchLoadPerIndex(String indexName) { + // TODO MP check for null maybe we don't need getOrDefault + return indexExecutionTime.getOrDefault(indexName, new LongAdder()).sum(); + } + + public long getLoadEMWAPerIndex(String indexName) { + // TODO MP do we need to report load EMWA per index? + throw new UnsupportedOperationException("Not supported yet"); + } + + public long getSearchLoadPerProject() { + // TODO MP we probably need to report sl per project? + throw new UnsupportedOperationException("Not supported yet"); + } + + public long getLoadEMWAPerProject() { + // TODO MP we probably need to report load EMWA per project? + throw new UnsupportedOperationException("Not supported yet"); + } + + public void registerIndexNameForRunnable(String indexName, Runnable r) { + runnableToIndexName.put(r, indexName); + } + + private void trackExecutionTimePerIndex(Runnable r, long timeSpentExecuting) { + // TODO MP do we need a LongAdder here? + String indexName = runnableToIndexName.get(r); + if (indexName != null) { + indexExecutionTime.putIfAbsent(indexName, new LongAdder()); + indexExecutionTime.get(indexName).add(timeSpentExecuting); + } + } + + @Override + protected void trackExecutionTime(Runnable r, long taskTime) { + trackExecutionTimePerIndex(r, taskTime); + super.trackExecutionTime(r, taskTime); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index efa27b2f3448c..b1e935fb033ee 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -48,6 +48,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; @@ -620,7 +621,7 @@ private void ensureAfterSeqNoRefreshed( final Executor executor = getExecutor(shard); try { if (waitForCheckpoint <= UNASSIGNED_SEQ_NO) { - runAsync(executor, executable, listener); + runAsync(executor, executable, listener, shard); return; } if (shard.indexSettings().getRefreshInterval().getMillis() <= 0) { @@ -696,7 +697,7 @@ private void searchReady() { if (timeoutTask != null) { timeoutTask.cancel(); } - runAsync(executor, executable, listener); + runAsync(executor, executable, listener, shard); } } }); @@ -719,9 +720,14 @@ private IndexShard getShard(ShardSearchRequest request) { private static void runAsync( Executor executor, CheckedSupplier executable, - ActionListener listener + ActionListener listener, + IndexShard shard ) { - executor.execute(ActionRunnable.supplyAndDecRef(listener, executable)); + Runnable r = ActionRunnable.supplyAndDecRef(listener, executable); + if (executor instanceof TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor perIndexExecutor) { + perIndexExecutor.registerIndexNameForRunnable(shard.shardId().getIndexName(), r); + } + executor.execute(r); } /** @@ -798,7 +804,7 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard // we handle the failure in the failure listener below throw e; } - }, wrapFailureListener(listener, readerContext, markAsUsed)); + }, wrapFailureListener(listener, readerContext, markAsUsed), readerContext.indexShard()); } private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) { @@ -850,7 +856,7 @@ public void executeQueryPhase( // we handle the failure in the failure listener below throw e; } - }, wrapFailureListener(listener, readerContext, markAsUsed)); + }, wrapFailureListener(listener, readerContext, markAsUsed), readerContext.indexShard()); } /** @@ -899,7 +905,7 @@ public void executeQueryPhase( // we handle the failure in the failure listener below throw e; } - }, wrapFailureListener(l, readerContext, markAsUsed)); + }, wrapFailureListener(l, readerContext, markAsUsed), readerContext.indexShard()); })); } @@ -950,7 +956,7 @@ public void executeFetchPhase( // we handle the failure in the failure listener below throw e; } - }, wrapFailureListener(listener, readerContext, markAsUsed)); + }, wrapFailureListener(listener, readerContext, markAsUsed), readerContext.indexShard()); } public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { @@ -987,7 +993,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A // we handle the failure in the failure listener below throw e; } - }, wrapFailureListener(l, readerContext, markAsUsed)); + }, wrapFailureListener(l, readerContext, markAsUsed), readerContext.indexShard()); })); } From 37dec8b5da3aaacbcf3a23f6b176d99a2c215197 Mon Sep 17 00:00:00 2001 From: piergm <134913285+piergm@users.noreply.github.com> Date: Tue, 11 Feb 2025 15:07:21 +0100 Subject: [PATCH 04/51] Dimi's work :D --- .../common/util/concurrent/EsExecutors.java | 2 +- ...utionTimeTrackingEsThreadPoolExecutor.java | 2 +- ...eTrackingPerIndexEsThreadPoolExecutor.java | 28 +++++++++++++------ 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index b45b348376fdf..985a52a828c0d 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -181,7 +181,7 @@ public static EsThreadPoolExecutor newFixed( rejectedExecutionHandler = new EsAbortPolicy(); } if (config.trackExecutionTime()) { - return new TaskExecutionTimeTrackingEsThreadPoolExecutor( + return new TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor( name, size, size, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index d4a57cbd40359..30826898b3dd5 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -115,7 +115,7 @@ protected void afterExecute(Runnable r, Throwable t) { + failedOrRejected; if (taskExecutionNanos != -1) { // taskExecutionNanos may be -1 if the task threw an exception - trackExecutionTime(r, taskExecutionNanos); + trackExecutionTime(timedRunnable.unwrap(), taskExecutionNanos); } } finally { removeTrackedTask(r); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java index b3b041a2be578..fa5453d97e818 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java @@ -9,8 +9,6 @@ package org.elasticsearch.common.util.concurrent; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionHandler; @@ -22,7 +20,7 @@ // TODO MP add java doc public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskExecutionTimeTrackingEsThreadPoolExecutor { private final ConcurrentHashMap indexExecutionTime; - private final Map runnableToIndexName; + private final ConcurrentHashMap runnableToIndexName; // TODO MP do we need also EWMA per-index or per-project? TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor( @@ -52,12 +50,16 @@ public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskE trackingConfig ); indexExecutionTime = new ConcurrentHashMap<>(); - runnableToIndexName = new HashMap<>(); + runnableToIndexName = new ConcurrentHashMap<>(); } public long getSearchLoadPerIndex(String indexName) { // TODO MP check for null maybe we don't need getOrDefault - return indexExecutionTime.getOrDefault(indexName, new LongAdder()).sum(); + // return indexExecutionTime.getOrDefault(indexName, new LongAdder()).sum(); + + // Avoid creating LongAdder objects if indexName not present + LongAdder adder = indexExecutionTime.get(indexName); + return (adder != null) ? adder.sum() : 0; } public long getLoadEMWAPerIndex(String indexName) { @@ -83,14 +85,22 @@ private void trackExecutionTimePerIndex(Runnable r, long timeSpentExecuting) { // TODO MP do we need a LongAdder here? String indexName = runnableToIndexName.get(r); if (indexName != null) { - indexExecutionTime.putIfAbsent(indexName, new LongAdder()); - indexExecutionTime.get(indexName).add(timeSpentExecuting); + // Use of computeIfAbsent to avoid creating LongAdder objects if indexName not present + // This is preferable when working with Immutable objects + // Avoid possible race conditions + indexExecutionTime.computeIfAbsent(indexName, n -> new LongAdder()).add(timeSpentExecuting); } } @Override protected void trackExecutionTime(Runnable r, long taskTime) { - trackExecutionTimePerIndex(r, taskTime); - super.trackExecutionTime(r, taskTime); + try { + trackExecutionTimePerIndex(r, taskTime); + super.trackExecutionTime(r, taskTime); + } finally { + // Unregister the runnable from the map to avoid memory leaks + // assert runnableToIndexName.isEmpty(); + runnableToIndexName.remove(r); + } } } From 2ce21d8fec4342f4a8994e92c59a95367555e654 Mon Sep 17 00:00:00 2001 From: piergm <134913285+piergm@users.noreply.github.com> Date: Tue, 11 Feb 2025 15:36:11 +0100 Subject: [PATCH 05/51] small changes and java docs --- ...utionTimeTrackingEsThreadPoolExecutor.java | 2 + ...eTrackingPerIndexEsThreadPoolExecutor.java | 76 ++++++++++--------- .../search/query/QueryPhase.java | 2 +- 3 files changed, 43 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 30826898b3dd5..b073c5504c264 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -33,6 +33,7 @@ public class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolE private final boolean trackOngoingTasks; // The set of currently running tasks and the timestamp of when they started execution in the Executor. private final Map ongoingTasks = new ConcurrentHashMap<>(); + protected final TaskTrackingConfig trackingConfig; TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, @@ -49,6 +50,7 @@ public class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolE ) { super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder); this.runnableWrapper = runnableWrapper; + this.trackingConfig = trackingConfig; this.executionEWMA = new ExponentiallyWeightedMovingAverage(trackingConfig.getEwmaAlpha(), 0); this.trackOngoingTasks = trackingConfig.trackOngoingTasks(); } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java index fa5453d97e818..280b38465d2e5 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java @@ -9,6 +9,9 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; +import org.elasticsearch.core.Tuple; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionHandler; @@ -17,11 +20,13 @@ import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; -// TODO MP add java doc +/** + * A specialized thread pool executor that tracks the execution time of tasks per index. + * This executor provides detailed metrics on task execution times per index, which can be useful for performance monitoring and debugging. + */ public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskExecutionTimeTrackingEsThreadPoolExecutor { - private final ConcurrentHashMap indexExecutionTime; + private final ConcurrentHashMap> indexExecutionTime; private final ConcurrentHashMap runnableToIndexName; - // TODO MP do we need also EWMA per-index or per-project? TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor( String name, @@ -53,53 +58,52 @@ public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskE runnableToIndexName = new ConcurrentHashMap<>(); } + /** + * Gets the total execution time for tasks associated with a specific index. + * + * @param indexName the name of the index + * @return the total execution time for the index + */ public long getSearchLoadPerIndex(String indexName) { - // TODO MP check for null maybe we don't need getOrDefault - // return indexExecutionTime.getOrDefault(indexName, new LongAdder()).sum(); - - // Avoid creating LongAdder objects if indexName not present - LongAdder adder = indexExecutionTime.get(indexName); - return (adder != null) ? adder.sum() : 0; - } - - public long getLoadEMWAPerIndex(String indexName) { - // TODO MP do we need to report load EMWA per index? - throw new UnsupportedOperationException("Not supported yet"); + Tuple t = indexExecutionTime.get(indexName); + return (t != null) ? t.v1().sum() : 0; } - public long getSearchLoadPerProject() { - // TODO MP we probably need to report sl per project? - throw new UnsupportedOperationException("Not supported yet"); - } - - public long getLoadEMWAPerProject() { - // TODO MP we probably need to report load EMWA per project? - throw new UnsupportedOperationException("Not supported yet"); + /** + * Gets the exponentially weighted moving average (EWMA) of the execution time for tasks associated with a specific index name. + * + * @param indexName the name of the index + * @return the EWMA of the execution time for the index + */ + public double getLoadEMWAPerIndex(String indexName) { + Tuple t = indexExecutionTime.get(indexName); + return (t != null) ? t.v2().getAverage() : 0; } + /** + * Registers an index name for a given runnable task. + * + * @param indexName the name of the index + * @param r the runnable task + */ public void registerIndexNameForRunnable(String indexName, Runnable r) { runnableToIndexName.put(r, indexName); } - private void trackExecutionTimePerIndex(Runnable r, long timeSpentExecuting) { - // TODO MP do we need a LongAdder here? - String indexName = runnableToIndexName.get(r); - if (indexName != null) { - // Use of computeIfAbsent to avoid creating LongAdder objects if indexName not present - // This is preferable when working with Immutable objects - // Avoid possible race conditions - indexExecutionTime.computeIfAbsent(indexName, n -> new LongAdder()).add(timeSpentExecuting); - } - } - @Override protected void trackExecutionTime(Runnable r, long taskTime) { try { - trackExecutionTimePerIndex(r, taskTime); + String indexName = runnableToIndexName.get(r); + if (indexName != null) { + Tuple t = indexExecutionTime.computeIfAbsent( + indexName, + k -> new Tuple<>(new LongAdder(), new ExponentiallyWeightedMovingAverage(trackingConfig.getEwmaAlpha(), 0)) + ); + t.v1().add(taskTime); + t.v2().addValue(taskTime); + } super.trackExecutionTime(r, taskTime); } finally { - // Unregister the runnable from the map to avoid memory leaks - // assert runnableToIndexName.isEmpty(); runnableToIndexName.remove(r); } } diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 3036a295d459a..6bf0b77414841 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -227,7 +227,7 @@ static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhas queryResult.terminatedEarly(queryPhaseResult.terminatedAfter()); } ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); - assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor + assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor // TODO MP check this || (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */) : "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass(); if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) { From 28c9677f9098807b586e38ec8e892bdde3736bf0 Mon Sep 17 00:00:00 2001 From: piergm <134913285+piergm@users.noreply.github.com> Date: Tue, 11 Feb 2025 16:02:31 +0100 Subject: [PATCH 06/51] add method to stop traking index execution --- ...utionTimeTrackingPerIndexEsThreadPoolExecutor.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java index 280b38465d2e5..1c3ab54aefbe5 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java @@ -9,6 +9,8 @@ package org.elasticsearch.common.util.concurrent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; import org.elasticsearch.core.Tuple; @@ -25,6 +27,7 @@ * This executor provides detailed metrics on task execution times per index, which can be useful for performance monitoring and debugging. */ public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskExecutionTimeTrackingEsThreadPoolExecutor { + private static final Logger logger = LogManager.getLogger(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.class); private final ConcurrentHashMap> indexExecutionTime; private final ConcurrentHashMap runnableToIndexName; @@ -90,6 +93,14 @@ public void registerIndexNameForRunnable(String indexName, Runnable r) { runnableToIndexName.put(r, indexName); } + public void stopTrackingIndex(String indexName) { + try { + indexExecutionTime.remove(indexName); + } catch (NullPointerException e) { + logger.debug("Trying to stop tracking index [{}] that was never tracked", indexName); + } + } + @Override protected void trackExecutionTime(Runnable r, long taskTime) { try { From 4851a68571a4fa95b2fe5781cf4bf241ffcca082 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Tue, 11 Feb 2025 17:10:12 +0200 Subject: [PATCH 07/51] update --- .../TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java index 1c3ab54aefbe5..c771dc6fa45a4 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java @@ -24,7 +24,8 @@ /** * A specialized thread pool executor that tracks the execution time of tasks per index. - * This executor provides detailed metrics on task execution times per index, which can be useful for performance monitoring and debugging. + * This executor provides detailed metrics on task execution times per index, which can be useful for performance monitoring and debugging + * */ public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskExecutionTimeTrackingEsThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.class); From 6858dfcf35f38fba8e61cc33d23dda037d9f4c88 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 11 Feb 2025 15:17:48 +0000 Subject: [PATCH 08/51] [CI] Auto commit changes from spotless --- .../TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java index c771dc6fa45a4..b0c297af7094e 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java @@ -25,7 +25,7 @@ /** * A specialized thread pool executor that tracks the execution time of tasks per index. * This executor provides detailed metrics on task execution times per index, which can be useful for performance monitoring and debugging - * + * */ public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskExecutionTimeTrackingEsThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.class); From fa3a2fd6fb219f260191d1ae0257c1865afcde55 Mon Sep 17 00:00:00 2001 From: Matteo Piergiovanni <134913285+piergm@users.noreply.github.com> Date: Tue, 11 Feb 2025 16:36:44 +0100 Subject: [PATCH 09/51] Update docs/changelog/122262.yaml --- docs/changelog/122262.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/122262.yaml diff --git a/docs/changelog/122262.yaml b/docs/changelog/122262.yaml new file mode 100644 index 0000000000000..4a5a1f7286bf3 --- /dev/null +++ b/docs/changelog/122262.yaml @@ -0,0 +1,5 @@ +pr: 122262 +summary: WIP measure search load per index +area: Search +type: feature +issues: [] From 088c1e57f90418d12be5c7e764718fcd5dcc8ed3 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Wed, 12 Feb 2025 09:40:16 +0200 Subject: [PATCH 10/51] update --- .../TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java index b0c297af7094e..d2bad6170de74 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java @@ -25,7 +25,6 @@ /** * A specialized thread pool executor that tracks the execution time of tasks per index. * This executor provides detailed metrics on task execution times per index, which can be useful for performance monitoring and debugging - * */ public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskExecutionTimeTrackingEsThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.class); From 9cddba5ae8607d61ecf8db0e1004d5d134b2f05c Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Wed, 12 Feb 2025 10:56:35 +0200 Subject: [PATCH 11/51] Add smoke test for new ThreadPoolExecutor --- ...kingPerIndexEsThreadPoolExecutorTests.java | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java new file mode 100644 index 0000000000000..dc141381e82e6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EWMA_ALPHA; +import static org.hamcrest.Matchers.equalTo; + +public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests extends ESTestCase { + + String INDEX_NAME = "index"; + + public void testExecutionPerIndexStatistics() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + + TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor( + "test-threadpool", + 1, + 1, + 1000, + TimeUnit.MILLISECONDS, + ConcurrentCollections.newBlockingQueue(), + TimedRunnable::new, + EsExecutors.daemonThreadFactory("queuetest"), + new EsAbortPolicy(), + context, + new EsExecutors.TaskTrackingConfig(randomBoolean(), DEFAULT_EWMA_ALPHA) + ); + executor.prestartAllCoreThreads(); + + assertThat((long) executor.getLoadEMWAPerIndex(INDEX_NAME), equalTo(0L)); + assertThat(executor.getSearchLoadPerIndex(INDEX_NAME), equalTo(0L)); + + executeTask(executor, 1); + assertBusy(() -> { + assertTrue((long) executor.getLoadEMWAPerIndex(INDEX_NAME) > 0); + assertTrue(executor.getSearchLoadPerIndex(INDEX_NAME) > 0); + }); + + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + + /** Execute a blank task {@code times} times for the executor */ + private void executeTask(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor executor, int times) { + for (int i = 0; i < times; i++) { + Runnable runnable = () -> {}; + executor.registerIndexNameForRunnable(INDEX_NAME, runnable); + executor.execute(runnable); + } + } +} From c1f0b3d0a93d45aa607722a58dd4abdfdc0e253a Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Wed, 12 Feb 2025 11:01:41 +0200 Subject: [PATCH 12/51] Rework executor shutdown method --- ...meTrackingPerIndexEsThreadPoolExecutorTests.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java index dc141381e82e6..3941b3e1c4bc6 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EWMA_ALPHA; @@ -48,8 +49,7 @@ public void testExecutionPerIndexStatistics() throws Exception { assertTrue(executor.getSearchLoadPerIndex(INDEX_NAME) > 0); }); - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); + shutdownExecutor(executor); } /** Execute a blank task {@code times} times for the executor */ @@ -60,4 +60,13 @@ private void executeTask(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor e executor.execute(runnable); } } + + private void shutdownExecutor(EsThreadPoolExecutor executor) { + executor.shutdown(); + try { + if (executor.awaitTermination(5, TimeUnit.SECONDS) == false) executor.shutdownNow(); + } catch(InterruptedException e) { + executor.shutdownNow(); + } + } } From e50af6ffa7c8eaff3199d8b741708b20544efc25 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Wed, 12 Feb 2025 11:08:17 +0200 Subject: [PATCH 13/51] Delete unusesd import --- ...skExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java index 3941b3e1c4bc6..05ffec65be98d 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EWMA_ALPHA; From 23ad8c62b28d7fe54e814a7b40860eceefb1fdb4 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 12 Feb 2025 09:15:12 +0000 Subject: [PATCH 14/51] [CI] Auto commit changes from spotless --- ...cutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java index 05ffec65be98d..6fc3d0e67dafb 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests.java @@ -17,7 +17,7 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT_EWMA_ALPHA; import static org.hamcrest.Matchers.equalTo; -public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests extends ESTestCase { +public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutorTests extends ESTestCase { String INDEX_NAME = "index"; @@ -48,7 +48,7 @@ public void testExecutionPerIndexStatistics() throws Exception { assertTrue(executor.getSearchLoadPerIndex(INDEX_NAME) > 0); }); - shutdownExecutor(executor); + shutdownExecutor(executor); } /** Execute a blank task {@code times} times for the executor */ @@ -64,7 +64,7 @@ private void shutdownExecutor(EsThreadPoolExecutor executor) { executor.shutdown(); try { if (executor.awaitTermination(5, TimeUnit.SECONDS) == false) executor.shutdownNow(); - } catch(InterruptedException e) { + } catch (InterruptedException e) { executor.shutdownNow(); } } From 6a83b074075012e347af8873c83bf05feb44f534 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Wed, 12 Feb 2025 17:27:56 +0200 Subject: [PATCH 15/51] Add some metric logs --- ...SearchIndexTimeTrackingCleanupService.java | 30 ++++++++++++++++++ ...eTrackingPerIndexEsThreadPoolExecutor.java | 7 +++-- .../elasticsearch/node/NodeConstruction.java | 31 +++++++++++++++++++ 3 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/SearchIndexTimeTrackingCleanupService.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/SearchIndexTimeTrackingCleanupService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/SearchIndexTimeTrackingCleanupService.java new file mode 100644 index 0000000000000..d51f5eb40f73e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/SearchIndexTimeTrackingCleanupService.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor; +import org.elasticsearch.index.Index; + +public class SearchIndexTimeTrackingCleanupService implements ClusterStateListener { + private TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor executor; + + public SearchIndexTimeTrackingCleanupService(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor executor) { + this.executor = executor; + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + for (Index index : event.indicesDeleted()) { + executor.stopTrackingIndex(index.getName()); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java index d2bad6170de74..7745c572e36bd 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java @@ -11,8 +11,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.Index; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -28,8 +31,8 @@ */ public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskExecutionTimeTrackingEsThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.class); - private final ConcurrentHashMap> indexExecutionTime; - private final ConcurrentHashMap runnableToIndexName; + public final ConcurrentHashMap> indexExecutionTime; + public final ConcurrentHashMap runnableToIndexName; TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor( String name, diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index da568be30e0df..8a03aece91fdc 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.MasterHistoryService; +import org.elasticsearch.cluster.coordination.SearchIndexTimeTrackingCleanupService; import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService; import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings; @@ -79,6 +80,8 @@ import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.SuppressForbidden; @@ -235,6 +238,8 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.UnaryOperator; @@ -700,6 +705,12 @@ private void construct( ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager); clusterService.addStateApplier(scriptService); + // TODO DR - this is a bit of a hack to get the cluster service into the plugins + var executor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.SEARCH); + TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor perIndexEsThreadPoolExecutor = (TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor) executor; + searchLoadMetricsReporter(perIndexEsThreadPoolExecutor); + clusterService.addListener(new SearchIndexTimeTrackingCleanupService(perIndexEsThreadPoolExecutor)); + modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider); FailureStoreMetrics failureStoreMetrics = new FailureStoreMetrics(telemetryProvider.getMeterRegistry()); @@ -1253,6 +1264,26 @@ public Map searchFields() { postInjection(clusterModule, actionModule, clusterService, transportService, featureService); } + private void searchLoadMetricsReporter(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor executor) { + Timer timer = new Timer(); + TimerTask task = new TimerTask() { + @Override + public void run() { + if(executor.indexExecutionTime.size() > 0) { + logger.info("Number of reported indices: {}", executor.indexExecutionTime.size()); + logger.info("Number of runnables: {}", executor.runnableToIndexName.size()); + executor.indexExecutionTime.forEach((index, tuple) -> { + logger.info("Index: {}, Total execution time: {}, EWMA: {}", index, tuple.v1().sum(), tuple.v2().getAverage()); + }); + logger.info("Total task execution time: {}", executor.getTotalTaskExecutionTime()); + logger.info("----------------------------------------------------------------------------------"); + } + } + }; + + timer.scheduleAtFixedRate(task, 0, 4000); + } + /** * For each "component" (getter) c of a {@link Record}, * calls {@link org.elasticsearch.injection.Injector#addInstance(Object) Injector.addInstance} From 7af835ae09a5869da52f0be256f5b15547b594af Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 12 Feb 2025 15:34:42 +0000 Subject: [PATCH 16/51] [CI] Auto commit changes from spotless --- ...askExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java | 3 --- .../main/java/org/elasticsearch/node/NodeConstruction.java | 5 +++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java index 7745c572e36bd..256343fb65a0e 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java @@ -11,11 +11,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; import org.elasticsearch.core.Tuple; -import org.elasticsearch.index.Index; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 8a03aece91fdc..6a160a59ca8b0 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -707,7 +707,8 @@ private void construct( // TODO DR - this is a bit of a hack to get the cluster service into the plugins var executor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.SEARCH); - TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor perIndexEsThreadPoolExecutor = (TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor) executor; + TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor perIndexEsThreadPoolExecutor = + (TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor) executor; searchLoadMetricsReporter(perIndexEsThreadPoolExecutor); clusterService.addListener(new SearchIndexTimeTrackingCleanupService(perIndexEsThreadPoolExecutor)); @@ -1269,7 +1270,7 @@ private void searchLoadMetricsReporter(TaskExecutionTimeTrackingPerIndexEsThread TimerTask task = new TimerTask() { @Override public void run() { - if(executor.indexExecutionTime.size() > 0) { + if (executor.indexExecutionTime.size() > 0) { logger.info("Number of reported indices: {}", executor.indexExecutionTime.size()); logger.info("Number of runnables: {}", executor.runnableToIndexName.size()); executor.indexExecutionTime.forEach((index, tuple) -> { From 9f0ec336da79ff1e9cea0838fce1d0d63772c22f Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Thu, 13 Feb 2025 09:50:17 +0200 Subject: [PATCH 17/51] update --- ...SearchIndexTimeTrackingCleanupService.java | 14 +++++++++++ ...eTrackingPerIndexEsThreadPoolExecutor.java | 23 ++++++++++++++++-- .../elasticsearch/node/NodeConstruction.java | 24 +++++++++---------- 3 files changed, 46 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/SearchIndexTimeTrackingCleanupService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/SearchIndexTimeTrackingCleanupService.java index d51f5eb40f73e..c05f8e794eebe 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/SearchIndexTimeTrackingCleanupService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/SearchIndexTimeTrackingCleanupService.java @@ -14,13 +14,27 @@ import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor; import org.elasticsearch.index.Index; +/** + * Service responsible for cleaning up task execution time tracking for deleted indices. + * Implements the ClusterStateListener interface to listen for cluster state changes. + */ public class SearchIndexTimeTrackingCleanupService implements ClusterStateListener { private TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor executor; + /** + * Constructs a new SearchIndexTimeTrackingCleanupService. + * + * @param executor the executor that tracks task execution times per index + */ public SearchIndexTimeTrackingCleanupService(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor executor) { this.executor = executor; } + /** + * Called when the cluster state changes. Stops tracking execution time for deleted indices. + * + * @param event the cluster changed event + */ @Override public void clusterChanged(ClusterChangedEvent event) { for (Index index : event.indicesDeleted()) { diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java index 256343fb65a0e..8c5c65c36e45d 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java @@ -28,8 +28,8 @@ */ public class TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor extends TaskExecutionTimeTrackingEsThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.class); - public final ConcurrentHashMap> indexExecutionTime; - public final ConcurrentHashMap runnableToIndexName; + private final ConcurrentHashMap> indexExecutionTime; + private final ConcurrentHashMap runnableToIndexName; TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor( String name, @@ -118,4 +118,23 @@ protected void trackExecutionTime(Runnable r, long taskTime) { runnableToIndexName.remove(r); } } + + //TODO Remove these methods once the NodeConstruction#searchLoadMetricsReporter is deleted + /** + * Gets the map of index execution times. + * + * @return the map of index execution times + */ + public ConcurrentHashMap> getIndexExecutionTime() { + return indexExecutionTime; + } + + /** + * Gets the map of runnable tasks to index names. + * + * @return the map of runnable tasks to index names + */ + public ConcurrentHashMap getRunnableToIndexName() { + return runnableToIndexName; + } } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 6a160a59ca8b0..26b8617dd3655 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -705,12 +705,9 @@ private void construct( ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager); clusterService.addStateApplier(scriptService); - // TODO DR - this is a bit of a hack to get the cluster service into the plugins - var executor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.SEARCH); - TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor perIndexEsThreadPoolExecutor = - (TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor) executor; - searchLoadMetricsReporter(perIndexEsThreadPoolExecutor); - clusterService.addListener(new SearchIndexTimeTrackingCleanupService(perIndexEsThreadPoolExecutor)); + var executor = (TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.SEARCH); + clusterService.addListener(new SearchIndexTimeTrackingCleanupService(executor)); + if(logger.isDebugEnabled()) searchLoadMetricsReporter(executor); modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider); @@ -1270,18 +1267,19 @@ private void searchLoadMetricsReporter(TaskExecutionTimeTrackingPerIndexEsThread TimerTask task = new TimerTask() { @Override public void run() { - if (executor.indexExecutionTime.size() > 0) { - logger.info("Number of reported indices: {}", executor.indexExecutionTime.size()); - logger.info("Number of runnables: {}", executor.runnableToIndexName.size()); - executor.indexExecutionTime.forEach((index, tuple) -> { - logger.info("Index: {}, Total execution time: {}, EWMA: {}", index, tuple.v1().sum(), tuple.v2().getAverage()); + if (executor.getIndexExecutionTime().size() > 0) { + logger.debug("Number of reported indices: {}", executor.getIndexExecutionTime().size()); + logger.debug("Number of runnables: {}", executor.getRunnableToIndexName().size()); + executor.getIndexExecutionTime().forEach((index, tuple) -> { + logger.debug("Index: {}, Total execution time: {}, EWMA: {}", index, tuple.v1().sum(), tuple.v2().getAverage()); }); - logger.info("Total task execution time: {}", executor.getTotalTaskExecutionTime()); - logger.info("----------------------------------------------------------------------------------"); + logger.debug("Total task execution time: {}", executor.getTotalTaskExecutionTime()); + logger.debug("----------------------------------------------------------------------------------"); } } }; + timer.scheduleAtFixedRate(task, 0, 4000); } From 0d58081d1df75b3200d5bd2c98a817ee15ddb197 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 13 Feb 2025 07:59:14 +0000 Subject: [PATCH 18/51] [CI] Auto commit changes from spotless --- ...TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java | 2 +- .../main/java/org/elasticsearch/node/NodeConstruction.java | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java index 8c5c65c36e45d..4a9742ba3d57c 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java @@ -119,7 +119,7 @@ protected void trackExecutionTime(Runnable r, long taskTime) { } } - //TODO Remove these methods once the NodeConstruction#searchLoadMetricsReporter is deleted + // TODO Remove these methods once the NodeConstruction#searchLoadMetricsReporter is deleted /** * Gets the map of index execution times. * diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 26b8617dd3655..a6634456ab786 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -80,7 +80,6 @@ import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.IOUtils; @@ -707,7 +706,7 @@ private void construct( var executor = (TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.SEARCH); clusterService.addListener(new SearchIndexTimeTrackingCleanupService(executor)); - if(logger.isDebugEnabled()) searchLoadMetricsReporter(executor); + if (logger.isDebugEnabled()) searchLoadMetricsReporter(executor); modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider); @@ -1279,7 +1278,6 @@ public void run() { } }; - timer.scheduleAtFixedRate(task, 0, 4000); } From dab0de9df3c4822492400a0144bf3582f04349f2 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Thu, 13 Feb 2025 10:29:15 +0200 Subject: [PATCH 19/51] revert logging level --- .../org/elasticsearch/node/NodeConstruction.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 26b8617dd3655..c74c3ec93eabe 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -707,7 +707,7 @@ private void construct( var executor = (TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.SEARCH); clusterService.addListener(new SearchIndexTimeTrackingCleanupService(executor)); - if(logger.isDebugEnabled()) searchLoadMetricsReporter(executor); + searchLoadMetricsReporter(executor); modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider); @@ -1268,13 +1268,13 @@ private void searchLoadMetricsReporter(TaskExecutionTimeTrackingPerIndexEsThread @Override public void run() { if (executor.getIndexExecutionTime().size() > 0) { - logger.debug("Number of reported indices: {}", executor.getIndexExecutionTime().size()); - logger.debug("Number of runnables: {}", executor.getRunnableToIndexName().size()); + logger.info("Number of reported indices: {}", executor.getIndexExecutionTime().size()); + logger.info("Number of runnables: {}", executor.getRunnableToIndexName().size()); executor.getIndexExecutionTime().forEach((index, tuple) -> { - logger.debug("Index: {}, Total execution time: {}, EWMA: {}", index, tuple.v1().sum(), tuple.v2().getAverage()); + logger.info("Index: {}, Total execution time: {}, EWMA: {}", index, tuple.v1().sum(), tuple.v2().getAverage()); }); - logger.debug("Total task execution time: {}", executor.getTotalTaskExecutionTime()); - logger.debug("----------------------------------------------------------------------------------"); + logger.info("Total task execution time: {}", executor.getTotalTaskExecutionTime()); + logger.info("----------------------------------------------------------------------------------"); } } }; From 15311a330535858cfef2f552dd0a9f62f7ca5eff Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Thu, 13 Feb 2025 10:34:22 +0200 Subject: [PATCH 20/51] update --- .../src/main/java/org/elasticsearch/node/NodeConstruction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 0064c3b98a44a..c529564822755 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -707,7 +707,7 @@ private void construct( var executor = (TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.SEARCH); clusterService.addListener(new SearchIndexTimeTrackingCleanupService(executor)); - if(logger.isDebugEnabled()) searchLoadMetricsReporter(executor); + searchLoadMetricsReporter(executor); modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider); From 51252766b88cb8acabd6a3afdbc5b9b3c39d3d38 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 13 Feb 2025 08:42:09 +0000 Subject: [PATCH 21/51] [CI] Auto commit changes from spotless --- .../src/main/java/org/elasticsearch/node/NodeConstruction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index c529564822755..d3fda6b4ffb83 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -80,7 +80,6 @@ import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.IOUtils; From f563dcd6232ea879e2bda2e8d828ad218351f3cb Mon Sep 17 00:00:00 2001 From: piergm <134913285+piergm@users.noreply.github.com> Date: Mon, 3 Mar 2025 09:25:48 +0100 Subject: [PATCH 22/51] removes unnecessary debug output --- ...eTrackingPerIndexEsThreadPoolExecutor.java | 19 ----------------- .../elasticsearch/node/NodeConstruction.java | 21 ------------------- 2 files changed, 40 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java index 4a9742ba3d57c..d2bad6170de74 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor.java @@ -118,23 +118,4 @@ protected void trackExecutionTime(Runnable r, long taskTime) { runnableToIndexName.remove(r); } } - - // TODO Remove these methods once the NodeConstruction#searchLoadMetricsReporter is deleted - /** - * Gets the map of index execution times. - * - * @return the map of index execution times - */ - public ConcurrentHashMap> getIndexExecutionTime() { - return indexExecutionTime; - } - - /** - * Gets the map of runnable tasks to index names. - * - * @return the map of runnable tasks to index names - */ - public ConcurrentHashMap getRunnableToIndexName() { - return runnableToIndexName; - } } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 51ac76de2d6ad..136076313f455 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -716,7 +716,6 @@ private void construct( var executor = (TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.SEARCH); clusterService.addListener(new SearchIndexTimeTrackingCleanupService(executor)); - searchLoadMetricsReporter(executor); modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider); @@ -1282,26 +1281,6 @@ public Map searchFields() { postInjection(clusterModule, actionModule, clusterService, transportService, featureService); } - private void searchLoadMetricsReporter(TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor executor) { - Timer timer = new Timer(); - TimerTask task = new TimerTask() { - @Override - public void run() { - if (executor.getIndexExecutionTime().size() > 0) { - logger.info("Number of reported indices: {}", executor.getIndexExecutionTime().size()); - logger.info("Number of runnables: {}", executor.getRunnableToIndexName().size()); - executor.getIndexExecutionTime().forEach((index, tuple) -> { - logger.info("Index: {}, Total execution time: {}, EWMA: {}", index, tuple.v1().sum(), tuple.v2().getAverage()); - }); - logger.info("Total task execution time: {}", executor.getTotalTaskExecutionTime()); - logger.info("----------------------------------------------------------------------------------"); - } - } - }; - - timer.scheduleAtFixedRate(task, 0, 4000); - } - /** * For each "component" (getter) c of a {@link Record}, * calls {@link org.elasticsearch.injection.Injector#addInstance(Object) Injector.addInstance} From a8a5efaa93ae15e0aa8380b60cc0d7f5c2cb24ef Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 3 Mar 2025 08:36:36 +0000 Subject: [PATCH 23/51] [CI] Auto commit changes from spotless --- .../src/main/java/org/elasticsearch/node/NodeConstruction.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 136076313f455..0530a629e110c 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -241,8 +241,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.UnaryOperator; From 3454904fec9b916a79fe03725bc50676d2816b33 Mon Sep 17 00:00:00 2001 From: piergm <134913285+piergm@users.noreply.github.com> Date: Mon, 3 Mar 2025 10:40:53 +0100 Subject: [PATCH 24/51] iter --- docs/changelog/122262.yaml | 2 +- .../TaskExecutionTimeTrackingEsThreadPoolExecutor.java | 2 +- .../common/util/concurrent/EsExecutorsTests.java | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/changelog/122262.yaml b/docs/changelog/122262.yaml index 4a5a1f7286bf3..8639bb8a72090 100644 --- a/docs/changelog/122262.yaml +++ b/docs/changelog/122262.yaml @@ -1,5 +1,5 @@ pr: 122262 -summary: WIP measure search load per index +summary: Measure search load per index area: Search type: feature issues: [] diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index b073c5504c264..a1c169024d1f3 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -33,7 +33,7 @@ public class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolE private final boolean trackOngoingTasks; // The set of currently running tasks and the timestamp of when they started execution in the Executor. private final Map ongoingTasks = new ConcurrentHashMap<>(); - protected final TaskTrackingConfig trackingConfig; + final TaskTrackingConfig trackingConfig; TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 2867c9e007937..e024dc5e88153 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -313,6 +313,8 @@ public String toString() { message, either(containsString("on EsThreadPoolExecutor[name = " + getName())).or( containsString("on TaskExecutionTimeTrackingEsThreadPoolExecutor[name = " + getName()) + ).or( + containsString("on TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor[name = " + getName()) ) ); assertThat(message, containsString("queue capacity = " + queue)); @@ -358,6 +360,8 @@ public String toString() { message, either(containsString("on EsThreadPoolExecutor[name = " + getName())).or( containsString("on TaskExecutionTimeTrackingEsThreadPoolExecutor[name = " + getName()) + ).or( + containsString("on TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor[name = " + getName()) ) ); assertThat(message, containsString("queue capacity = " + queue)); From 68c78f2c9c58e222b3455c5f943bd949951bdb87 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 3 Mar 2025 09:47:53 +0000 Subject: [PATCH 25/51] [CI] Auto commit changes from spotless --- .../common/util/concurrent/EsExecutorsTests.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index e024dc5e88153..d666453baf147 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -313,9 +313,7 @@ public String toString() { message, either(containsString("on EsThreadPoolExecutor[name = " + getName())).or( containsString("on TaskExecutionTimeTrackingEsThreadPoolExecutor[name = " + getName()) - ).or( - containsString("on TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor[name = " + getName()) - ) + ).or(containsString("on TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor[name = " + getName())) ); assertThat(message, containsString("queue capacity = " + queue)); assertThat(message, containsString("[Running")); @@ -360,9 +358,7 @@ public String toString() { message, either(containsString("on EsThreadPoolExecutor[name = " + getName())).or( containsString("on TaskExecutionTimeTrackingEsThreadPoolExecutor[name = " + getName()) - ).or( - containsString("on TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor[name = " + getName()) - ) + ).or(containsString("on TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor[name = " + getName())) ); assertThat(message, containsString("queue capacity = " + queue)); assertThat(message, containsString("[Terminated")); From d8477e9da4d73dce2abc7572d4f38d9f12fde815 Mon Sep 17 00:00:00 2001 From: piergm <134913285+piergm@users.noreply.github.com> Date: Mon, 3 Mar 2025 10:53:25 +0100 Subject: [PATCH 26/51] iter --- .../main/java/org/elasticsearch/node/NodeConstruction.java | 7 ++++--- .../java/org/elasticsearch/search/query/QueryPhase.java | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 0530a629e110c..056dd7805babc 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -712,9 +712,10 @@ private void construct( ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager); clusterService.addStateApplier(scriptService); - var executor = (TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.SEARCH); - clusterService.addListener(new SearchIndexTimeTrackingCleanupService(executor)); - + var executor = threadPool.executor(ThreadPool.Names.SEARCH); + if (executor instanceof TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor perIndexExecutor) { + clusterService.addListener(new SearchIndexTimeTrackingCleanupService(perIndexExecutor)); + } modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider); FailureStoreMetrics failureStoreMetrics = new FailureStoreMetrics(telemetryProvider.getMeterRegistry()); diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 313f742dd3beb..5fcfb2b9766cd 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -226,7 +226,7 @@ static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhas queryResult.terminatedEarly(queryPhaseResult.terminatedAfter()); } ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); - assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor // TODO MP check this + assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor || (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */) : "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass(); if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) { From 1e47dde1be2402b6fa0f34bbbb3febff8785196e Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 3 Mar 2025 10:01:14 +0000 Subject: [PATCH 27/51] [CI] Auto commit changes from spotless --- .../src/main/java/org/elasticsearch/node/NodeConstruction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 056dd7805babc..5d000b3da6506 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -713,7 +713,7 @@ private void construct( clusterService.addStateApplier(scriptService); var executor = threadPool.executor(ThreadPool.Names.SEARCH); - if (executor instanceof TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor perIndexExecutor) { + if (executor instanceof TaskExecutionTimeTrackingPerIndexEsThreadPoolExecutor perIndexExecutor) { clusterService.addListener(new SearchIndexTimeTrackingCleanupService(perIndexExecutor)); } modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider); From 936a08df287df933b0e3f72e223ee511451de4a7 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Wed, 5 Mar 2025 15:33:06 +0200 Subject: [PATCH 28/51] Update code after review --- .../Debug_Elasticsearch__node_2_.xml | 4 + .../Debug_Elasticsearch__node_3_.xml | 4 + ...eTrackingPerIndexEsThreadPoolExecutor.java | 1 + ...hardSearchPerIndexTimeTrackingMetrics.java | 84 +++++++++++++++++++ .../elasticsearch/node/NodeConstruction.java | 4 +- .../elasticsearch/search/SearchService.java | 2 + 6 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchPerIndexTimeTrackingMetrics.java diff --git a/.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml b/.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml index 94bb079398ffd..76bb8f72eafc9 100644 --- a/.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml +++ b/.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml @@ -6,6 +6,10 @@