From eefc57f7425466e93e6eba7c6ac1dd8556bbdaac Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 11 Aug 2025 10:21:58 -0700 Subject: [PATCH 01/10] Add second max queue latency stat to ClusterInfo The TransportNodeUsageStatsForThreadPoolsAction now takes the max latency of any task currently queued in the write thread pool queue AND the previously collected max queue latency of any task dequeued since the last call. This covers the possibility that queue times can rise greatly before being reflected in execution: imagine all the write threads are stalled. Adds additional IT testing to exercise both forms of queue latency, a followup for ES-12316. --- .../cluster/ClusterInfoServiceIT.java | 162 +++++++++++++++++- .../cluster/service/ClusterServiceIT.java | 28 +-- ...ortNodeUsageStatsForThreadPoolsAction.java | 5 +- .../common/util/concurrent/EsExecutors.java | 4 + ...utionTimeTrackingEsThreadPoolExecutor.java | 9 +- .../elasticsearch/threadpool/ThreadPool.java | 17 ++ .../elasticsearch/test/ESIntegTestCase.java | 64 +++++++ 7 files changed, 252 insertions(+), 37 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 6fd3133686b64..0ee30ca3b260f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; @@ -346,6 +347,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED ) + // Manually control cluster info refreshes + .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m") .build(); var masterName = internalCluster().startMasterOnlyNode(settings); var dataNodeName = internalCluster().startDataOnlyNode(settings); @@ -369,11 +372,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { } ); - // Do some writes to create some write thread pool activity. - final String indexName = randomIdentifier(); - for (int i = 0; i < randomIntBetween(1, 1000); i++) { - index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar")); - } + // Generate some writes to get some non-zero write thread pool stats. + doALotOfDataNodeWrites(); // Force a refresh of the ClusterInfo state to collect fresh info from the data nodes. final InternalClusterInfoService masterClusterInfoService = asInstanceOf( @@ -387,7 +387,7 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); - assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collectedg + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected var dataNodeId = getNodeId(dataNodeName); var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); assertNotNull(nodeUsageStatsForThreadPool); @@ -400,4 +400,154 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f)); assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); } + + /** + * The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latency: + * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()} and + * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue()}. The latter looks at currently queued tasks, and + * the former tracks the queue latency of tasks when they are taken off of the queue to start execution. + */ + public void testMaxQueueLatenciesInClusterInfo() throws Exception { + var settings = Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m") + .build(); + var masterName = internalCluster().startMasterOnlyNode(settings); + var dataNodeName = internalCluster().startDataOnlyNode(settings); + ensureStableCluster(2); + assertEquals(internalCluster().getMasterName(), masterName); + assertNotEquals(internalCluster().getMasterName(), dataNodeName); + logger.info("---> master node: " + masterName + ", data node: " + dataNodeName); + + // Block indexing on the data node by submitting write thread pool tasks to equal the number of write threads. + var barrier = blockDataNodeIndexing(dataNodeName); + try { + // Arbitrary number of tasks to queue greater than one (only strictly need a single task to occupy the queue). + int randomInt = randomIntBetween(1, 5); + Thread[] threadsToJoin = new Thread[randomInt]; + for (int i = 0; i < randomInt; ++i) { + threadsToJoin[i] = startParallelSingleWrite(); + } + + // Reach into the data node's write thread pool to check that tasks have reached the queue. + var dataNodeThreadPool = internalCluster().getInstance(ThreadPool.class, dataNodeName); + var writeExecutor = dataNodeThreadPool.executor(ThreadPool.Names.WRITE); + assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor; + var trackingWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor; + assertBusy( + // Wait for the parallel threads' writes to get queued in the write thread pool. + () -> assertThat( + "Write thread pool dump: " + trackingWriteExecutor, + trackingWriteExecutor.peekMaxQueueLatencyInQueue(), + greaterThan(0L) + ) + ); + + // Ensure that some amount of time has passed on the thread pool. + long queuedElapsedMillis = 100; + ESIntegTestCase.waitForTimeToElapse(queuedElapsedMillis); + + // Force a refresh of the ClusterInfo state to collect fresh info from the data nodes. + final InternalClusterInfoService masterClusterInfoService = asInstanceOf( + InternalClusterInfoService.class, + internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class) + ); + final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); + + // Since tasks are actively queued right now, #peekMaxQueueLatencyInQueue called from the + // TransportNodeUsageStatsForThreadPoolsAction that ClusterInfoService refresh initiated should have returned a max queue + // latency >= queuedElapsedMillis. + { + final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); + logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected + var dataNodeId = getNodeId(dataNodeName); + var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); + assertNotNull(nodeUsageStatsForThreadPool); + logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool); + + assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId()); + var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis)); + } + + // Now release the data node's indexing, and drain the queued tasks. Max queue latency of executed (not queued) tasks is reset + // by each TransportNodeUsageStatsForThreadPoolsAction call (#getMaxQueueLatencyMillisSinceLastPollAndReset), so the new queue + // latencies will be present in the next call. There should be nothing in the queue now to peek at (or if there is some ambient + // node activity, it will have a relatively short queue time), so the result of the max queue latency result in + // TransportNodeUsageStatsForThreadPoolsAction should now reflect #getMaxQueueLatencyMillisSinceLastPollAndReset and not + // #peekMaxQueueLatencyInQueue. + barrier.await(); + for (int i = 0; i < randomInt; ++i) { + threadsToJoin[i].join(); + } + assertBusy( + // Wait for any other tasks that might have been queued. + // NB: cannot assert that the queue is immediately empty because of other ambient node activity and potentially a single + // write thread available due to a limited number of processors available on test machines. + () -> assertThat( + "Write thread pool dump: " + trackingWriteExecutor, + trackingWriteExecutor.peekMaxQueueLatencyInQueue(), + equalTo(0L) + ) + ); + final ClusterInfo nextClusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); + { + final Map usageStatsForThreadPools = nextClusterInfo + .getNodeUsageStatsForThreadPools(); + logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected + var dataNodeId = getNodeId(dataNodeName); + var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); + assertNotNull(nodeUsageStatsForThreadPool); + logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool); + + assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId()); + var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis)); + } + } finally { + // Ensure that the write threads have been released by signalling an interrupt on any callers waiting on the barrier. + logger.info("---> Ensuring release of the barrier on write thread pool tasks"); + barrier.reset(); + } + + // NB: cannot check for a max queue latency of 0 with no further write tasks submitted. This is because the test machine may be + // running as few as a single write thread, and queuing due to ambient node activity is a possibility. + } + + /** + * Do some writes to create some write thread pool activity. + */ + private void doALotOfDataNodeWrites() { + final String indexName = randomIdentifier(); + final int randomInt = randomIntBetween(500, 1000); + for (int i = 0; i < randomInt; i++) { + index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar")); + } + } + + /** + * Starts a single index request on a parallel thread and returns the reference so {@link Thread#join()} can be called eventually. + */ + private Thread startParallelSingleWrite() { + Thread running = new Thread(() -> doSingleWrite()); + running.start(); + return running; + } + + private void doSingleWrite() { + final String indexName = randomIdentifier(); + final int randomId = randomIntBetween(500, 1000); + index(indexName, Integer.toString(randomId), Collections.singletonMap("foo", "bar")); + } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index af35dc1a6731c..d6e7b2712d5aa 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -18,7 +18,6 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.HashSet; @@ -27,7 +26,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -410,7 +408,7 @@ public void onFailure(Exception e) { }); } - waitForTimeToElapse(); + ESIntegTestCase.waitForTimeToElapse(100); pendingClusterTasks = clusterService.getMasterService().pendingTasks(); assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5)); @@ -433,28 +431,4 @@ public void onFailure(Exception e) { block2.countDown(); } } - - private static void waitForTimeToElapse() throws InterruptedException { - final ThreadPool[] threadPools = StreamSupport.stream(internalCluster().getInstances(ClusterService.class).spliterator(), false) - .map(ClusterService::threadPool) - .toArray(ThreadPool[]::new); - final long[] startTimes = Arrays.stream(threadPools).mapToLong(ThreadPool::relativeTimeInMillis).toArray(); - - final var startNanoTime = System.nanoTime(); - while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 100) { - // noinspection BusyWait - Thread.sleep(100); - } - - outer: do { - for (int i = 0; i < threadPools.length; i++) { - if (threadPools[i].relativeTimeInMillis() <= startTimes[i]) { - // noinspection BusyWait - Thread.sleep(100); - continue outer; - } - } - return; - } while (true); - } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java index 29bc8efbbb192..11a71b790f16d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -104,7 +104,10 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( (float) trackingForWriteExecutor.pollUtilization( TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION ), - trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset() + Math.max( + trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(), + trackingForWriteExecutor.peekMaxQueueLatencyInQueue() + ) ); Map perThreadPool = new HashMap<>(); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index c39ce209bf875..bc61e14781859 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -9,6 +9,8 @@ package org.elasticsearch.common.util.concurrent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -37,6 +39,8 @@ */ public class EsExecutors { + private static final Logger logger = LogManager.getLogger(EsExecutors.class); + // although the available processors may technically change, for node sizing we use the number available at launch private static final int MAX_NUM_PROCESSORS = Runtime.getRuntime().availableProcessors(); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 20f480af172bd..7518d5423b36f 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -172,10 +172,13 @@ public long peekMaxQueueLatencyInQueue() { if (queue.isEmpty()) { return 0; } - assert queue instanceof LinkedTransferQueue : "Not the type of queue expected: " + queue.getClass(); - var linkedTransferQueue = (LinkedTransferQueue) queue; + assert queue instanceof LinkedTransferQueue || queue instanceof SizeBlockingQueue + : "Not the type of queue expected: " + queue.getClass(); + var linkedTransferOrSizeBlockingQueue = queue instanceof LinkedTransferQueue + ? (LinkedTransferQueue) queue + : (SizeBlockingQueue) queue; - var task = linkedTransferQueue.peek(); + var task = linkedTransferOrSizeBlockingQueue.peek(); assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass(); var wrappedTask = ((WrappedRunnable) task).unwrap(); assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass(); diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 58ac4635b2a4e..2a8c42c95cd0e 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -1024,6 +1024,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + @Override + public String toString() { + return "Info[name=" + + name + + ",type=" + + type + + ",min=" + + min + + ",max=" + + max + + ",keepAlive=" + + keepAlive + + ",queueSize=" + + queueSize + + "]"; + } + } /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 5c2f9646f03b3..33e8da80ff26c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -163,6 +163,7 @@ import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; @@ -204,17 +205,21 @@ import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; @@ -2915,4 +2920,63 @@ protected static void deletePipeline(String id) { ) ); } + + public static void waitForTimeToElapse(long elapsedMillis) throws InterruptedException { + final ThreadPool[] threadPools = StreamSupport.stream(internalCluster().getInstances(ClusterService.class).spliterator(), false) + .map(ClusterService::threadPool) + .toArray(ThreadPool[]::new); + final long[] startTimes = Arrays.stream(threadPools).mapToLong(ThreadPool::relativeTimeInMillis).toArray(); + + final var startNanoTime = System.nanoTime(); + while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= elapsedMillis) { + // noinspection BusyWait + Thread.sleep(elapsedMillis); + } + + outer: do { + for (int i = 0; i < threadPools.length; i++) { + if (threadPools[i].relativeTimeInMillis() <= startTimes[i]) { + // noinspection BusyWait + Thread.sleep(elapsedMillis); + continue outer; + } + } + return; + } while (true); + } + + /** + * Submits as many tasks to the given data node's write thread pool as there are write threads. These tasks will wait on the barrier + * that is returned, which waits for total-write-threads + 1 callers. The caller can release the tasks by calling + * {@code barrier.await()} or interrupt them with {@code barrier.reset()}. + */ + public CyclicBarrier blockDataNodeIndexing(String dataNodeName) { + // Block the executor workers to simulate long-running write tasks + var threadpool = internalCluster().getInstance(ThreadPool.class, dataNodeName); + var executor = threadpool.executor(ThreadPool.Names.WRITE); + final var executorInfo = threadpool.info(ThreadPool.Names.WRITE); + final var executorThreads = executorInfo.getMax(); + var barrier = new CyclicBarrier(executorThreads + 1); + for (int i = 0; i < executorThreads; i++) { + executor.execute(() -> longAwait(barrier)); + } + logger.info( + "---> Submitted [" + + executorThreads + + "] tasks to the write thread pool that will wait on a barrier until released. Write thread pool info: " + + executorInfo + ); + return barrier; + } + + private static void longAwait(CyclicBarrier barrier) { + try { + barrier.await(30, TimeUnit.SECONDS); + } catch (BrokenBarrierException | TimeoutException e) { + throw new AssertionError(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + } } From 10504369d43dcea2c4562f395d404ea2edf7aaf1 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 11 Aug 2025 10:24:20 -0700 Subject: [PATCH 02/10] Update docs/changelog/132675.yaml --- docs/changelog/132675.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/132675.yaml diff --git a/docs/changelog/132675.yaml b/docs/changelog/132675.yaml new file mode 100644 index 0000000000000..a451a27334be7 --- /dev/null +++ b/docs/changelog/132675.yaml @@ -0,0 +1,5 @@ +pr: 132675 +summary: Add second max queue latency stat to `ClusterInfo` +area: Allocation +type: enhancement +issues: [] From d4631d25e93d362eea72ab9293b1069627ef83ab Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 11 Aug 2025 10:58:03 -0700 Subject: [PATCH 03/10] tidying up, some self review --- .../cluster/ClusterInfoServiceIT.java | 25 +++++++++++-------- .../common/util/concurrent/EsExecutors.java | 4 --- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 0ee30ca3b260f..f0e23c02f12a5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -402,7 +402,7 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { } /** - * The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latency: + * The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latencies: * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()} and * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue()}. The latter looks at currently queued tasks, and * the former tracks the queue latency of tasks when they are taken off of the queue to start execution. @@ -422,10 +422,11 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { assertNotEquals(internalCluster().getMasterName(), dataNodeName); logger.info("---> master node: " + masterName + ", data node: " + dataNodeName); - // Block indexing on the data node by submitting write thread pool tasks to equal the number of write threads. + // Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads. var barrier = blockDataNodeIndexing(dataNodeName); try { - // Arbitrary number of tasks to queue greater than one (only strictly need a single task to occupy the queue). + // Arbitrary number of tasks, which will queue because all the write threads are occupied already, greater than one: only + // strictly need a single task to occupy the queue. int randomInt = randomIntBetween(1, 5); Thread[] threadsToJoin = new Thread[randomInt]; for (int i = 0; i < randomInt; ++i) { @@ -447,23 +448,23 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { ); // Ensure that some amount of time has passed on the thread pool. - long queuedElapsedMillis = 100; + long queuedElapsedMillis = 1; ESIntegTestCase.waitForTimeToElapse(queuedElapsedMillis); - // Force a refresh of the ClusterInfo state to collect fresh info from the data nodes. + // Force a refresh of the ClusterInfo state to collect fresh info from the data node. final InternalClusterInfoService masterClusterInfoService = asInstanceOf( InternalClusterInfoService.class, internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class) ); final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); - // Since tasks are actively queued right now, #peekMaxQueueLatencyInQueue called from the - // TransportNodeUsageStatsForThreadPoolsAction that ClusterInfoService refresh initiated should have returned a max queue + // Since tasks are actively queued right now, #peekMaxQueueLatencyInQueue, which is called from the + // TransportNodeUsageStatsForThreadPoolsAction that a ClusterInfoService refresh initiates, should have returned a max queue // latency >= queuedElapsedMillis. { final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); - assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data node should be collected var dataNodeId = getNodeId(dataNodeName); var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); assertNotNull(nodeUsageStatsForThreadPool); @@ -489,8 +490,9 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { } assertBusy( // Wait for any other tasks that might have been queued. - // NB: cannot assert that the queue is immediately empty because of other ambient node activity and potentially a single - // write thread available due to a limited number of processors available on test machines. + // NB: cannot assert here that the queue is immediately empty because there could be other ambient node activity. This test + // could be running data nodes with a single write pool thread due to a limited number of processors available on test + // machines, and it's reasonable to queue a little in that situation. () -> assertThat( "Write thread pool dump: " + trackingWriteExecutor, trackingWriteExecutor.peekMaxQueueLatencyInQueue(), @@ -516,7 +518,8 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis)); } } finally { - // Ensure that the write threads have been released by signalling an interrupt on any callers waiting on the barrier. + // Ensure that the write threads have been released by signalling an interrupt on any callers waiting on the barrier. If the + // callers have already all been successfully released, then there will be nothing left to interrupt. logger.info("---> Ensuring release of the barrier on write thread pool tasks"); barrier.reset(); } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index bc61e14781859..c39ce209bf875 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -9,8 +9,6 @@ package org.elasticsearch.common.util.concurrent; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -39,8 +37,6 @@ */ public class EsExecutors { - private static final Logger logger = LogManager.getLogger(EsExecutors.class); - // although the available processors may technically change, for node sizing we use the number available at launch private static final int MAX_NUM_PROCESSORS = Runtime.getRuntime().availableProcessors(); From a213160daf5eadcc4264adc4d72f6f67e730b4ec Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 11 Aug 2025 10:59:51 -0700 Subject: [PATCH 04/10] add comment --- .../java/org/elasticsearch/cluster/ClusterInfoServiceIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index f0e23c02f12a5..2061702920d62 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -413,6 +413,7 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED ) + // Manually control cluster info refreshes .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m") .build(); var masterName = internalCluster().startMasterOnlyNode(settings); From ee2159a6def8e113c66d27c4672dcd781d027991 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 13 Aug 2025 16:44:51 -0700 Subject: [PATCH 05/10] undo waitForTimeToElapse, ensure non-zero instead; rename counter variable --- .../cluster/ClusterInfoServiceIT.java | 16 ++++------- .../cluster/service/ClusterServiceIT.java | 28 ++++++++++++++++++- .../elasticsearch/test/ESIntegTestCase.java | 24 ---------------- 3 files changed, 33 insertions(+), 35 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 2061702920d62..f1d9dba0454b5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -428,9 +428,9 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { try { // Arbitrary number of tasks, which will queue because all the write threads are occupied already, greater than one: only // strictly need a single task to occupy the queue. - int randomInt = randomIntBetween(1, 5); - Thread[] threadsToJoin = new Thread[randomInt]; - for (int i = 0; i < randomInt; ++i) { + int numberOfTasks = randomIntBetween(1, 5); + Thread[] threadsToJoin = new Thread[numberOfTasks]; + for (int i = 0; i < numberOfTasks; ++i) { threadsToJoin[i] = startParallelSingleWrite(); } @@ -448,10 +448,6 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { ) ); - // Ensure that some amount of time has passed on the thread pool. - long queuedElapsedMillis = 1; - ESIntegTestCase.waitForTimeToElapse(queuedElapsedMillis); - // Force a refresh of the ClusterInfo state to collect fresh info from the data node. final InternalClusterInfoService masterClusterInfoService = asInstanceOf( InternalClusterInfoService.class, @@ -476,7 +472,7 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0f)); - assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThan(0L)); } // Now release the data node's indexing, and drain the queued tasks. Max queue latency of executed (not queued) tasks is reset @@ -486,7 +482,7 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { // TransportNodeUsageStatsForThreadPoolsAction should now reflect #getMaxQueueLatencyMillisSinceLastPollAndReset and not // #peekMaxQueueLatencyInQueue. barrier.await(); - for (int i = 0; i < randomInt; ++i) { + for (int i = 0; i < numberOfTasks; ++i) { threadsToJoin[i].join(); } assertBusy( @@ -516,7 +512,7 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f)); - assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); } } finally { // Ensure that the write threads have been released by signalling an interrupt on any callers waiting on the barrier. If the diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index d6e7b2712d5aa..af35dc1a6731c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.HashSet; @@ -26,6 +27,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -408,7 +410,7 @@ public void onFailure(Exception e) { }); } - ESIntegTestCase.waitForTimeToElapse(100); + waitForTimeToElapse(); pendingClusterTasks = clusterService.getMasterService().pendingTasks(); assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5)); @@ -431,4 +433,28 @@ public void onFailure(Exception e) { block2.countDown(); } } + + private static void waitForTimeToElapse() throws InterruptedException { + final ThreadPool[] threadPools = StreamSupport.stream(internalCluster().getInstances(ClusterService.class).spliterator(), false) + .map(ClusterService::threadPool) + .toArray(ThreadPool[]::new); + final long[] startTimes = Arrays.stream(threadPools).mapToLong(ThreadPool::relativeTimeInMillis).toArray(); + + final var startNanoTime = System.nanoTime(); + while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 100) { + // noinspection BusyWait + Thread.sleep(100); + } + + outer: do { + for (int i = 0; i < threadPools.length; i++) { + if (threadPools[i].relativeTimeInMillis() <= startTimes[i]) { + // noinspection BusyWait + Thread.sleep(100); + continue outer; + } + } + return; + } while (true); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 33e8da80ff26c..ccf01793db8e3 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -2921,30 +2921,6 @@ protected static void deletePipeline(String id) { ); } - public static void waitForTimeToElapse(long elapsedMillis) throws InterruptedException { - final ThreadPool[] threadPools = StreamSupport.stream(internalCluster().getInstances(ClusterService.class).spliterator(), false) - .map(ClusterService::threadPool) - .toArray(ThreadPool[]::new); - final long[] startTimes = Arrays.stream(threadPools).mapToLong(ThreadPool::relativeTimeInMillis).toArray(); - - final var startNanoTime = System.nanoTime(); - while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= elapsedMillis) { - // noinspection BusyWait - Thread.sleep(elapsedMillis); - } - - outer: do { - for (int i = 0; i < threadPools.length; i++) { - if (threadPools[i].relativeTimeInMillis() <= startTimes[i]) { - // noinspection BusyWait - Thread.sleep(elapsedMillis); - continue outer; - } - } - return; - } while (true); - } - /** * Submits as many tasks to the given data node's write thread pool as there are write threads. These tasks will wait on the barrier * that is returned, which waits for total-write-threads + 1 callers. The caller can release the tasks by calling From e8491a26c26c2386dd75c8279a57ca1ebbdd35b3 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 13 Aug 2025 16:56:34 -0700 Subject: [PATCH 06/10] fix peekMaxQueueLatencyInQueue() code to check for null --- .../TaskExecutionTimeTrackingEsThreadPoolExecutor.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 7518d5423b36f..ef30ae8533649 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -168,10 +168,8 @@ public long peekMaxQueueLatencyInQueue() { if (trackMaxQueueLatency == false) { return 0; } + var queue = getQueue(); - if (queue.isEmpty()) { - return 0; - } assert queue instanceof LinkedTransferQueue || queue instanceof SizeBlockingQueue : "Not the type of queue expected: " + queue.getClass(); var linkedTransferOrSizeBlockingQueue = queue instanceof LinkedTransferQueue @@ -179,6 +177,11 @@ public long peekMaxQueueLatencyInQueue() { : (SizeBlockingQueue) queue; var task = linkedTransferOrSizeBlockingQueue.peek(); + if (task == null) { + // There's nothing in the queue right now. + return 0; + } + assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass(); var wrappedTask = ((WrappedRunnable) task).unwrap(); assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass(); From b6f4a2b98a2ccd7f8bfed1f57074362058370b29 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 14 Aug 2025 00:15:31 +0000 Subject: [PATCH 07/10] [CI] Auto commit changes from spotless --- .../src/main/java/org/elasticsearch/test/ESIntegTestCase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index ccf01793db8e3..7b36165a4f5a3 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -219,7 +219,6 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; From a23c71610e55418d3b112d1f0dea42a108c1a52b Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 13 Aug 2025 17:26:27 -0700 Subject: [PATCH 08/10] fix: peekMaxQueueLatencyInQueue() was returning nanos when millis are wanted --- .../elasticsearch/cluster/ClusterInfoServiceIT.java | 7 ++++--- .../TransportNodeUsageStatsForThreadPoolsAction.java | 2 +- .../TaskExecutionTimeTrackingEsThreadPoolExecutor.java | 7 ++++--- ...ExecutionTimeTrackingEsThreadPoolExecutorTests.java | 10 +++++----- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index f1d9dba0454b5..1bd0bdb76e002 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -404,7 +404,7 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { /** * The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latencies: * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()} and - * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue()}. The latter looks at currently queued tasks, and + * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueueMillis()}. The latter looks at currently queued tasks, and * the former tracks the queue latency of tasks when they are taken off of the queue to start execution. */ public void testMaxQueueLatenciesInClusterInfo() throws Exception { @@ -443,7 +443,7 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { // Wait for the parallel threads' writes to get queued in the write thread pool. () -> assertThat( "Write thread pool dump: " + trackingWriteExecutor, - trackingWriteExecutor.peekMaxQueueLatencyInQueue(), + trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(), greaterThan(0L) ) ); @@ -492,10 +492,11 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { // machines, and it's reasonable to queue a little in that situation. () -> assertThat( "Write thread pool dump: " + trackingWriteExecutor, - trackingWriteExecutor.peekMaxQueueLatencyInQueue(), + trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(), equalTo(0L) ) ); + final ClusterInfo nextClusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); { final Map usageStatsForThreadPools = nextClusterInfo diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java index 11a71b790f16d..93a5c6f7dad88 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -106,7 +106,7 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( ), Math.max( trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(), - trackingForWriteExecutor.peekMaxQueueLatencyInQueue() + trackingForWriteExecutor.peekMaxQueueLatencyInQueueMillis() ) ); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index ef30ae8533649..e37f411789fd0 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -151,7 +151,8 @@ public int getCurrentQueueSize() { * Returns the max queue latency seen since the last time that this method was called. Every call will reset the max seen back to zero. * Latencies are only observed as tasks are taken off of the queue. This means that tasks in the queue will not contribute to the max * latency until they are unqueued and handed to a thread to execute. To see the latency of tasks still in the queue, use - * {@link #peekMaxQueueLatencyInQueue}. If there have been no tasks in the queue since the last call, then zero latency is returned. + * {@link #peekMaxQueueLatencyInQueueMillis}. If there have been no tasks in the queue since the last call, then zero latency is + * returned. */ public long getMaxQueueLatencyMillisSinceLastPollAndReset() { if (trackMaxQueueLatency == false) { @@ -164,7 +165,7 @@ public long getMaxQueueLatencyMillisSinceLastPollAndReset() { * Returns the queue latency of the next task to be executed that is still in the task queue. Essentially peeks at the front of the * queue and calculates how long it has been there. Returns zero if there is no queue. */ - public long peekMaxQueueLatencyInQueue() { + public long peekMaxQueueLatencyInQueueMillis() { if (trackMaxQueueLatency == false) { return 0; } @@ -186,7 +187,7 @@ public long peekMaxQueueLatencyInQueue() { var wrappedTask = ((WrappedRunnable) task).unwrap(); assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass(); var timedTask = (TimedRunnable) wrappedTask; - return timedTask.getTimeSinceCreationNanos(); + return TimeUnit.NANOSECONDS.toMillis(timedTask.getTimeSinceCreationNanos()); } /** diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index b4b33d1265bcb..257f2d44c03ce 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -95,7 +95,7 @@ public void testExecutionEWMACalculation() throws Exception { /** * Verifies that we can peek at the task in front of the task queue to fetch the duration that the oldest task has been queued. - * Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue}. + * Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueueMillis}. */ public void testFrontOfQueueLatency() throws Exception { ThreadContext context = new ThreadContext(Settings.EMPTY); @@ -135,7 +135,7 @@ public void testFrontOfQueueLatency() throws Exception { logger.info("--> executor: {}", executor); // Check that the peeking at a non-existence queue returns zero. - assertEquals("Zero should be returned when there is no queue", 0, executor.peekMaxQueueLatencyInQueue()); + assertEquals("Zero should be returned when there is no queue", 0, executor.peekMaxQueueLatencyInQueueMillis()); // Submit two tasks, into the thread pool with a single worker thread. The second one will be queued (because the pool only has // one thread) and can be peeked at. @@ -143,10 +143,10 @@ public void testFrontOfQueueLatency() throws Exception { executor.execute(() -> {}); waitForTimeToElapse(); - var frontOfQueueDuration = executor.peekMaxQueueLatencyInQueue(); + var frontOfQueueDuration = executor.peekMaxQueueLatencyInQueueMillis(); assertThat("Expected a task to be queued", frontOfQueueDuration, greaterThan(0L)); waitForTimeToElapse(); - var updatedFrontOfQueueDuration = executor.peekMaxQueueLatencyInQueue(); + var updatedFrontOfQueueDuration = executor.peekMaxQueueLatencyInQueueMillis(); assertThat( "Expected a second peek to report a longer duration", updatedFrontOfQueueDuration, @@ -156,7 +156,7 @@ public void testFrontOfQueueLatency() throws Exception { // Release the first task that's running, and wait for the second to start -- then it is ensured that the queue will be empty. safeAwait(barrier); safeAwait(barrier); - assertEquals("Queue should be emptied", 0, executor.peekMaxQueueLatencyInQueue()); + assertEquals("Queue should be emptied", 0, executor.peekMaxQueueLatencyInQueueMillis()); } finally { ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); } From 4241df06abfb2bb25d3857a1bdb814bbeecda123 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 13 Aug 2025 18:11:20 -0700 Subject: [PATCH 09/10] remove stray concurrent GlobalCheckpointSyncAction.Request task on the write thread pool: eliminate replicas requiring sync --- .../cluster/ClusterInfoServiceIT.java | 71 ++++++++++++------- 1 file changed, 46 insertions(+), 25 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 1bd0bdb76e002..958bfd8756dac 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -60,6 +60,8 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; import static java.util.Collections.unmodifiableSet; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.CoreMatchers.equalTo; @@ -404,8 +406,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { /** * The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latencies: * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()} and - * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueueMillis()}. The latter looks at currently queued tasks, and - * the former tracks the queue latency of tasks when they are taken off of the queue to start execution. + * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueueMillis()}. The latter looks at currently queued tasks, + * and the former tracks the queue latency of tasks when they are taken off of the queue to start execution. */ public void testMaxQueueLatenciesInClusterInfo() throws Exception { var settings = Settings.builder() @@ -430,8 +432,14 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { // strictly need a single task to occupy the queue. int numberOfTasks = randomIntBetween(1, 5); Thread[] threadsToJoin = new Thread[numberOfTasks]; + String indexName = randomIdentifier(); + createIndex( + indexName, + // NB: Set 0 replicas so that there aren't any stray GlobalCheckpointSyncAction tasks on the write thread pool. + Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5)).put(SETTING_NUMBER_OF_REPLICAS, 0).build() + ); for (int i = 0; i < numberOfTasks; ++i) { - threadsToJoin[i] = startParallelSingleWrite(); + threadsToJoin[i] = startParallelSingleWrite(indexName); } // Reach into the data node's write thread pool to check that tasks have reached the queue. @@ -456,8 +464,8 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); // Since tasks are actively queued right now, #peekMaxQueueLatencyInQueue, which is called from the - // TransportNodeUsageStatsForThreadPoolsAction that a ClusterInfoService refresh initiates, should have returned a max queue - // latency >= queuedElapsedMillis. + // TransportNodeUsageStatsForThreadPoolsAction that a ClusterInfoService refresh initiates, should return a max queue + // latency > 0; { final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); @@ -477,24 +485,17 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { // Now release the data node's indexing, and drain the queued tasks. Max queue latency of executed (not queued) tasks is reset // by each TransportNodeUsageStatsForThreadPoolsAction call (#getMaxQueueLatencyMillisSinceLastPollAndReset), so the new queue - // latencies will be present in the next call. There should be nothing in the queue now to peek at (or if there is some ambient - // node activity, it will have a relatively short queue time), so the result of the max queue latency result in - // TransportNodeUsageStatsForThreadPoolsAction should now reflect #getMaxQueueLatencyMillisSinceLastPollAndReset and not - // #peekMaxQueueLatencyInQueue. + // latencies will be present in the next call. There will be nothing in the queue to peek at now, so the result of the max + // queue latency result in TransportNodeUsageStatsForThreadPoolsAction will reflect + // #getMaxQueueLatencyMillisSinceLastPollAndReset and not #peekMaxQueueLatencyInQueue. barrier.await(); for (int i = 0; i < numberOfTasks; ++i) { threadsToJoin[i].join(); } - assertBusy( - // Wait for any other tasks that might have been queued. - // NB: cannot assert here that the queue is immediately empty because there could be other ambient node activity. This test - // could be running data nodes with a single write pool thread due to a limited number of processors available on test - // machines, and it's reasonable to queue a little in that situation. - () -> assertThat( - "Write thread pool dump: " + trackingWriteExecutor, - trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(), - equalTo(0L) - ) + assertThat( + "Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor, + trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(), + equalTo(0L) ); final ClusterInfo nextClusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); @@ -522,8 +523,29 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { barrier.reset(); } - // NB: cannot check for a max queue latency of 0 with no further write tasks submitted. This is because the test machine may be - // running as few as a single write thread, and queuing due to ambient node activity is a possibility. + // Now that there's nothing in the queue, and no activity since the last ClusterInfo refresh, the max latency returned should be + // zero. Verify this. + final InternalClusterInfoService masterClusterInfoService = asInstanceOf( + InternalClusterInfoService.class, + internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class) + ); + final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); + { + final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); + logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected + var dataNodeId = getNodeId(dataNodeName); + var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); + assertNotNull(nodeUsageStatsForThreadPool); + logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool); + + assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId()); + var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), equalTo(0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(0L)); + } } /** @@ -540,14 +562,13 @@ private void doALotOfDataNodeWrites() { /** * Starts a single index request on a parallel thread and returns the reference so {@link Thread#join()} can be called eventually. */ - private Thread startParallelSingleWrite() { - Thread running = new Thread(() -> doSingleWrite()); + private Thread startParallelSingleWrite(String indexName) { + Thread running = new Thread(() -> doSingleWrite(indexName)); running.start(); return running; } - private void doSingleWrite() { - final String indexName = randomIdentifier(); + private void doSingleWrite(String indexName) { final int randomId = randomIntBetween(500, 1000); index(indexName, Integer.toString(randomId), Collections.singletonMap("foo", "bar")); } From 58eedcdcdb84885d2b3cbc5fa37a38c845c06ceb Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 14 Aug 2025 08:22:04 -0700 Subject: [PATCH 10/10] Fix test after *peek() method was changed to millis from nanos; test must wait for 1ms, not 1ns, to see a change in queue latency --- .../TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index 257f2d44c03ce..408050b01453d 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -463,8 +463,8 @@ long getQueueTimeNanos() { */ private static void waitForTimeToElapse() throws InterruptedException { final var startNanoTime = System.nanoTime(); - while ((System.nanoTime() - startNanoTime) < 1) { - Thread.sleep(Duration.ofNanos(1)); + while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) < 1) { + Thread.sleep(Duration.ofMillis(1)); } } }