From 7322e1c35b40e001e32dc90dea29b1216dfe2d83 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 5 Aug 2025 18:27:04 -0700 Subject: [PATCH] Add second max queue latency stat to ClusterInfo The TransportNodeUsageStatsForThreadPoolsAction now takes the max latency of any task currently queued in the write thread pool queue AND the previously collected max queue latency of any task dequeued since the last call. This covers the possibility that queue times can rise greatly before being reflected in execution: imagine all the write threads are stalled. Adds additional IT testing to exercise both forms of queue latency, a followup for ES-12316. --- .../cluster/ClusterInfoServiceIT.java | 162 +++++++++++++++++- .../cluster/service/ClusterServiceIT.java | 28 +-- ...ortNodeUsageStatsForThreadPoolsAction.java | 5 +- .../common/util/concurrent/EsExecutors.java | 4 + ...utionTimeTrackingEsThreadPoolExecutor.java | 9 +- .../elasticsearch/threadpool/ThreadPool.java | 17 ++ .../elasticsearch/test/ESIntegTestCase.java | 64 +++++++ 7 files changed, 252 insertions(+), 37 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 6fd3133686b64..0ee30ca3b260f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; @@ -346,6 +347,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED ) + // Manually control cluster info refreshes + .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m") .build(); var masterName = internalCluster().startMasterOnlyNode(settings); var dataNodeName = internalCluster().startDataOnlyNode(settings); @@ -369,11 +372,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { } ); - // Do some writes to create some write thread pool activity. - final String indexName = randomIdentifier(); - for (int i = 0; i < randomIntBetween(1, 1000); i++) { - index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar")); - } + // Generate some writes to get some non-zero write thread pool stats. + doALotOfDataNodeWrites(); // Force a refresh of the ClusterInfo state to collect fresh info from the data nodes. final InternalClusterInfoService masterClusterInfoService = asInstanceOf( @@ -387,7 +387,7 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); - assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collectedg + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected var dataNodeId = getNodeId(dataNodeName); var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); assertNotNull(nodeUsageStatsForThreadPool); @@ -400,4 +400,154 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f)); assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); } + + /** + * The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latency: + * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()} and + * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue()}. The latter looks at currently queued tasks, and + * the former tracks the queue latency of tasks when they are taken off of the queue to start execution. + */ + public void testMaxQueueLatenciesInClusterInfo() throws Exception { + var settings = Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m") + .build(); + var masterName = internalCluster().startMasterOnlyNode(settings); + var dataNodeName = internalCluster().startDataOnlyNode(settings); + ensureStableCluster(2); + assertEquals(internalCluster().getMasterName(), masterName); + assertNotEquals(internalCluster().getMasterName(), dataNodeName); + logger.info("---> master node: " + masterName + ", data node: " + dataNodeName); + + // Block indexing on the data node by submitting write thread pool tasks to equal the number of write threads. + var barrier = blockDataNodeIndexing(dataNodeName); + try { + // Arbitrary number of tasks to queue greater than one (only strictly need a single task to occupy the queue). + int randomInt = randomIntBetween(1, 5); + Thread[] threadsToJoin = new Thread[randomInt]; + for (int i = 0; i < randomInt; ++i) { + threadsToJoin[i] = startParallelSingleWrite(); + } + + // Reach into the data node's write thread pool to check that tasks have reached the queue. + var dataNodeThreadPool = internalCluster().getInstance(ThreadPool.class, dataNodeName); + var writeExecutor = dataNodeThreadPool.executor(ThreadPool.Names.WRITE); + assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor; + var trackingWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor; + assertBusy( + // Wait for the parallel threads' writes to get queued in the write thread pool. + () -> assertThat( + "Write thread pool dump: " + trackingWriteExecutor, + trackingWriteExecutor.peekMaxQueueLatencyInQueue(), + greaterThan(0L) + ) + ); + + // Ensure that some amount of time has passed on the thread pool. + long queuedElapsedMillis = 100; + ESIntegTestCase.waitForTimeToElapse(queuedElapsedMillis); + + // Force a refresh of the ClusterInfo state to collect fresh info from the data nodes. + final InternalClusterInfoService masterClusterInfoService = asInstanceOf( + InternalClusterInfoService.class, + internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class) + ); + final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); + + // Since tasks are actively queued right now, #peekMaxQueueLatencyInQueue called from the + // TransportNodeUsageStatsForThreadPoolsAction that ClusterInfoService refresh initiated should have returned a max queue + // latency >= queuedElapsedMillis. + { + final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); + logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected + var dataNodeId = getNodeId(dataNodeName); + var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); + assertNotNull(nodeUsageStatsForThreadPool); + logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool); + + assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId()); + var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis)); + } + + // Now release the data node's indexing, and drain the queued tasks. Max queue latency of executed (not queued) tasks is reset + // by each TransportNodeUsageStatsForThreadPoolsAction call (#getMaxQueueLatencyMillisSinceLastPollAndReset), so the new queue + // latencies will be present in the next call. There should be nothing in the queue now to peek at (or if there is some ambient + // node activity, it will have a relatively short queue time), so the result of the max queue latency result in + // TransportNodeUsageStatsForThreadPoolsAction should now reflect #getMaxQueueLatencyMillisSinceLastPollAndReset and not + // #peekMaxQueueLatencyInQueue. + barrier.await(); + for (int i = 0; i < randomInt; ++i) { + threadsToJoin[i].join(); + } + assertBusy( + // Wait for any other tasks that might have been queued. + // NB: cannot assert that the queue is immediately empty because of other ambient node activity and potentially a single + // write thread available due to a limited number of processors available on test machines. + () -> assertThat( + "Write thread pool dump: " + trackingWriteExecutor, + trackingWriteExecutor.peekMaxQueueLatencyInQueue(), + equalTo(0L) + ) + ); + final ClusterInfo nextClusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); + { + final Map usageStatsForThreadPools = nextClusterInfo + .getNodeUsageStatsForThreadPools(); + logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected + var dataNodeId = getNodeId(dataNodeName); + var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); + assertNotNull(nodeUsageStatsForThreadPool); + logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool); + + assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId()); + var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis)); + } + } finally { + // Ensure that the write threads have been released by signalling an interrupt on any callers waiting on the barrier. + logger.info("---> Ensuring release of the barrier on write thread pool tasks"); + barrier.reset(); + } + + // NB: cannot check for a max queue latency of 0 with no further write tasks submitted. This is because the test machine may be + // running as few as a single write thread, and queuing due to ambient node activity is a possibility. + } + + /** + * Do some writes to create some write thread pool activity. + */ + private void doALotOfDataNodeWrites() { + final String indexName = randomIdentifier(); + final int randomInt = randomIntBetween(500, 1000); + for (int i = 0; i < randomInt; i++) { + index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar")); + } + } + + /** + * Starts a single index request on a parallel thread and returns the reference so {@link Thread#join()} can be called eventually. + */ + private Thread startParallelSingleWrite() { + Thread running = new Thread(() -> doSingleWrite()); + running.start(); + return running; + } + + private void doSingleWrite() { + final String indexName = randomIdentifier(); + final int randomId = randomIntBetween(500, 1000); + index(indexName, Integer.toString(randomId), Collections.singletonMap("foo", "bar")); + } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index af35dc1a6731c..d6e7b2712d5aa 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -18,7 +18,6 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.HashSet; @@ -27,7 +26,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -410,7 +408,7 @@ public void onFailure(Exception e) { }); } - waitForTimeToElapse(); + ESIntegTestCase.waitForTimeToElapse(100); pendingClusterTasks = clusterService.getMasterService().pendingTasks(); assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5)); @@ -433,28 +431,4 @@ public void onFailure(Exception e) { block2.countDown(); } } - - private static void waitForTimeToElapse() throws InterruptedException { - final ThreadPool[] threadPools = StreamSupport.stream(internalCluster().getInstances(ClusterService.class).spliterator(), false) - .map(ClusterService::threadPool) - .toArray(ThreadPool[]::new); - final long[] startTimes = Arrays.stream(threadPools).mapToLong(ThreadPool::relativeTimeInMillis).toArray(); - - final var startNanoTime = System.nanoTime(); - while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 100) { - // noinspection BusyWait - Thread.sleep(100); - } - - outer: do { - for (int i = 0; i < threadPools.length; i++) { - if (threadPools[i].relativeTimeInMillis() <= startTimes[i]) { - // noinspection BusyWait - Thread.sleep(100); - continue outer; - } - } - return; - } while (true); - } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java index 29bc8efbbb192..11a71b790f16d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -104,7 +104,10 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( (float) trackingForWriteExecutor.pollUtilization( TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION ), - trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset() + Math.max( + trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(), + trackingForWriteExecutor.peekMaxQueueLatencyInQueue() + ) ); Map perThreadPool = new HashMap<>(); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index c39ce209bf875..bc61e14781859 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -9,6 +9,8 @@ package org.elasticsearch.common.util.concurrent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -37,6 +39,8 @@ */ public class EsExecutors { + private static final Logger logger = LogManager.getLogger(EsExecutors.class); + // although the available processors may technically change, for node sizing we use the number available at launch private static final int MAX_NUM_PROCESSORS = Runtime.getRuntime().availableProcessors(); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 20f480af172bd..7518d5423b36f 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -172,10 +172,13 @@ public long peekMaxQueueLatencyInQueue() { if (queue.isEmpty()) { return 0; } - assert queue instanceof LinkedTransferQueue : "Not the type of queue expected: " + queue.getClass(); - var linkedTransferQueue = (LinkedTransferQueue) queue; + assert queue instanceof LinkedTransferQueue || queue instanceof SizeBlockingQueue + : "Not the type of queue expected: " + queue.getClass(); + var linkedTransferOrSizeBlockingQueue = queue instanceof LinkedTransferQueue + ? (LinkedTransferQueue) queue + : (SizeBlockingQueue) queue; - var task = linkedTransferQueue.peek(); + var task = linkedTransferOrSizeBlockingQueue.peek(); assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass(); var wrappedTask = ((WrappedRunnable) task).unwrap(); assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass(); diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 58ac4635b2a4e..2a8c42c95cd0e 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -1024,6 +1024,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + @Override + public String toString() { + return "Info[name=" + + name + + ",type=" + + type + + ",min=" + + min + + ",max=" + + max + + ",keepAlive=" + + keepAlive + + ",queueSize=" + + queueSize + + "]"; + } + } /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 5c2f9646f03b3..33e8da80ff26c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -163,6 +163,7 @@ import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; @@ -204,17 +205,21 @@ import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; @@ -2915,4 +2920,63 @@ protected static void deletePipeline(String id) { ) ); } + + public static void waitForTimeToElapse(long elapsedMillis) throws InterruptedException { + final ThreadPool[] threadPools = StreamSupport.stream(internalCluster().getInstances(ClusterService.class).spliterator(), false) + .map(ClusterService::threadPool) + .toArray(ThreadPool[]::new); + final long[] startTimes = Arrays.stream(threadPools).mapToLong(ThreadPool::relativeTimeInMillis).toArray(); + + final var startNanoTime = System.nanoTime(); + while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= elapsedMillis) { + // noinspection BusyWait + Thread.sleep(elapsedMillis); + } + + outer: do { + for (int i = 0; i < threadPools.length; i++) { + if (threadPools[i].relativeTimeInMillis() <= startTimes[i]) { + // noinspection BusyWait + Thread.sleep(elapsedMillis); + continue outer; + } + } + return; + } while (true); + } + + /** + * Submits as many tasks to the given data node's write thread pool as there are write threads. These tasks will wait on the barrier + * that is returned, which waits for total-write-threads + 1 callers. The caller can release the tasks by calling + * {@code barrier.await()} or interrupt them with {@code barrier.reset()}. + */ + public CyclicBarrier blockDataNodeIndexing(String dataNodeName) { + // Block the executor workers to simulate long-running write tasks + var threadpool = internalCluster().getInstance(ThreadPool.class, dataNodeName); + var executor = threadpool.executor(ThreadPool.Names.WRITE); + final var executorInfo = threadpool.info(ThreadPool.Names.WRITE); + final var executorThreads = executorInfo.getMax(); + var barrier = new CyclicBarrier(executorThreads + 1); + for (int i = 0; i < executorThreads; i++) { + executor.execute(() -> longAwait(barrier)); + } + logger.info( + "---> Submitted [" + + executorThreads + + "] tasks to the write thread pool that will wait on a barrier until released. Write thread pool info: " + + executorInfo + ); + return barrier; + } + + private static void longAwait(CyclicBarrier barrier) { + try { + barrier.await(30, TimeUnit.SECONDS); + } catch (BrokenBarrierException | TimeoutException e) { + throw new AssertionError(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + } }