diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 6fd3133686b64..0ee30ca3b260f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; @@ -346,6 +347,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED ) + // Manually control cluster info refreshes + .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m") .build(); var masterName = internalCluster().startMasterOnlyNode(settings); var dataNodeName = internalCluster().startDataOnlyNode(settings); @@ -369,11 +372,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { } ); - // Do some writes to create some write thread pool activity. - final String indexName = randomIdentifier(); - for (int i = 0; i < randomIntBetween(1, 1000); i++) { - index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar")); - } + // Generate some writes to get some non-zero write thread pool stats. + doALotOfDataNodeWrites(); // Force a refresh of the ClusterInfo state to collect fresh info from the data nodes. final InternalClusterInfoService masterClusterInfoService = asInstanceOf( @@ -387,7 +387,7 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); - assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collectedg + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected var dataNodeId = getNodeId(dataNodeName); var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); assertNotNull(nodeUsageStatsForThreadPool); @@ -400,4 +400,154 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f)); assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); } + + /** + * The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latency: + * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()} and + * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue()}. The latter looks at currently queued tasks, and + * the former tracks the queue latency of tasks when they are taken off of the queue to start execution. + */ + public void testMaxQueueLatenciesInClusterInfo() throws Exception { + var settings = Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m") + .build(); + var masterName = internalCluster().startMasterOnlyNode(settings); + var dataNodeName = internalCluster().startDataOnlyNode(settings); + ensureStableCluster(2); + assertEquals(internalCluster().getMasterName(), masterName); + assertNotEquals(internalCluster().getMasterName(), dataNodeName); + logger.info("---> master node: " + masterName + ", data node: " + dataNodeName); + + // Block indexing on the data node by submitting write thread pool tasks to equal the number of write threads. + var barrier = blockDataNodeIndexing(dataNodeName); + try { + // Arbitrary number of tasks to queue greater than one (only strictly need a single task to occupy the queue). + int randomInt = randomIntBetween(1, 5); + Thread[] threadsToJoin = new Thread[randomInt]; + for (int i = 0; i < randomInt; ++i) { + threadsToJoin[i] = startParallelSingleWrite(); + } + + // Reach into the data node's write thread pool to check that tasks have reached the queue. + var dataNodeThreadPool = internalCluster().getInstance(ThreadPool.class, dataNodeName); + var writeExecutor = dataNodeThreadPool.executor(ThreadPool.Names.WRITE); + assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor; + var trackingWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor; + assertBusy( + // Wait for the parallel threads' writes to get queued in the write thread pool. + () -> assertThat( + "Write thread pool dump: " + trackingWriteExecutor, + trackingWriteExecutor.peekMaxQueueLatencyInQueue(), + greaterThan(0L) + ) + ); + + // Ensure that some amount of time has passed on the thread pool. + long queuedElapsedMillis = 100; + ESIntegTestCase.waitForTimeToElapse(queuedElapsedMillis); + + // Force a refresh of the ClusterInfo state to collect fresh info from the data nodes. + final InternalClusterInfoService masterClusterInfoService = asInstanceOf( + InternalClusterInfoService.class, + internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class) + ); + final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); + + // Since tasks are actively queued right now, #peekMaxQueueLatencyInQueue called from the + // TransportNodeUsageStatsForThreadPoolsAction that ClusterInfoService refresh initiated should have returned a max queue + // latency >= queuedElapsedMillis. + { + final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); + logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected + var dataNodeId = getNodeId(dataNodeName); + var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); + assertNotNull(nodeUsageStatsForThreadPool); + logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool); + + assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId()); + var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis)); + } + + // Now release the data node's indexing, and drain the queued tasks. Max queue latency of executed (not queued) tasks is reset + // by each TransportNodeUsageStatsForThreadPoolsAction call (#getMaxQueueLatencyMillisSinceLastPollAndReset), so the new queue + // latencies will be present in the next call. There should be nothing in the queue now to peek at (or if there is some ambient + // node activity, it will have a relatively short queue time), so the result of the max queue latency result in + // TransportNodeUsageStatsForThreadPoolsAction should now reflect #getMaxQueueLatencyMillisSinceLastPollAndReset and not + // #peekMaxQueueLatencyInQueue. + barrier.await(); + for (int i = 0; i < randomInt; ++i) { + threadsToJoin[i].join(); + } + assertBusy( + // Wait for any other tasks that might have been queued. + // NB: cannot assert that the queue is immediately empty because of other ambient node activity and potentially a single + // write thread available due to a limited number of processors available on test machines. + () -> assertThat( + "Write thread pool dump: " + trackingWriteExecutor, + trackingWriteExecutor.peekMaxQueueLatencyInQueue(), + equalTo(0L) + ) + ); + final ClusterInfo nextClusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); + { + final Map usageStatsForThreadPools = nextClusterInfo + .getNodeUsageStatsForThreadPools(); + logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected + var dataNodeId = getNodeId(dataNodeName); + var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); + assertNotNull(nodeUsageStatsForThreadPool); + logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool); + + assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId()); + var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis)); + } + } finally { + // Ensure that the write threads have been released by signalling an interrupt on any callers waiting on the barrier. + logger.info("---> Ensuring release of the barrier on write thread pool tasks"); + barrier.reset(); + } + + // NB: cannot check for a max queue latency of 0 with no further write tasks submitted. This is because the test machine may be + // running as few as a single write thread, and queuing due to ambient node activity is a possibility. + } + + /** + * Do some writes to create some write thread pool activity. + */ + private void doALotOfDataNodeWrites() { + final String indexName = randomIdentifier(); + final int randomInt = randomIntBetween(500, 1000); + for (int i = 0; i < randomInt; i++) { + index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar")); + } + } + + /** + * Starts a single index request on a parallel thread and returns the reference so {@link Thread#join()} can be called eventually. + */ + private Thread startParallelSingleWrite() { + Thread running = new Thread(() -> doSingleWrite()); + running.start(); + return running; + } + + private void doSingleWrite() { + final String indexName = randomIdentifier(); + final int randomId = randomIntBetween(500, 1000); + index(indexName, Integer.toString(randomId), Collections.singletonMap("foo", "bar")); + } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index af35dc1a6731c..d6e7b2712d5aa 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -18,7 +18,6 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.HashSet; @@ -27,7 +26,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -410,7 +408,7 @@ public void onFailure(Exception e) { }); } - waitForTimeToElapse(); + ESIntegTestCase.waitForTimeToElapse(100); pendingClusterTasks = clusterService.getMasterService().pendingTasks(); assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5)); @@ -433,28 +431,4 @@ public void onFailure(Exception e) { block2.countDown(); } } - - private static void waitForTimeToElapse() throws InterruptedException { - final ThreadPool[] threadPools = StreamSupport.stream(internalCluster().getInstances(ClusterService.class).spliterator(), false) - .map(ClusterService::threadPool) - .toArray(ThreadPool[]::new); - final long[] startTimes = Arrays.stream(threadPools).mapToLong(ThreadPool::relativeTimeInMillis).toArray(); - - final var startNanoTime = System.nanoTime(); - while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 100) { - // noinspection BusyWait - Thread.sleep(100); - } - - outer: do { - for (int i = 0; i < threadPools.length; i++) { - if (threadPools[i].relativeTimeInMillis() <= startTimes[i]) { - // noinspection BusyWait - Thread.sleep(100); - continue outer; - } - } - return; - } while (true); - } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java index 29bc8efbbb192..11a71b790f16d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -104,7 +104,10 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( (float) trackingForWriteExecutor.pollUtilization( TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION ), - trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset() + Math.max( + trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(), + trackingForWriteExecutor.peekMaxQueueLatencyInQueue() + ) ); Map perThreadPool = new HashMap<>(); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index c39ce209bf875..bc61e14781859 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -9,6 +9,8 @@ package org.elasticsearch.common.util.concurrent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -37,6 +39,8 @@ */ public class EsExecutors { + private static final Logger logger = LogManager.getLogger(EsExecutors.class); + // although the available processors may technically change, for node sizing we use the number available at launch private static final int MAX_NUM_PROCESSORS = Runtime.getRuntime().availableProcessors(); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 20f480af172bd..7518d5423b36f 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -172,10 +172,13 @@ public long peekMaxQueueLatencyInQueue() { if (queue.isEmpty()) { return 0; } - assert queue instanceof LinkedTransferQueue : "Not the type of queue expected: " + queue.getClass(); - var linkedTransferQueue = (LinkedTransferQueue) queue; + assert queue instanceof LinkedTransferQueue || queue instanceof SizeBlockingQueue + : "Not the type of queue expected: " + queue.getClass(); + var linkedTransferOrSizeBlockingQueue = queue instanceof LinkedTransferQueue + ? (LinkedTransferQueue) queue + : (SizeBlockingQueue) queue; - var task = linkedTransferQueue.peek(); + var task = linkedTransferOrSizeBlockingQueue.peek(); assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass(); var wrappedTask = ((WrappedRunnable) task).unwrap(); assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass(); diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 58ac4635b2a4e..2a8c42c95cd0e 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -1024,6 +1024,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + @Override + public String toString() { + return "Info[name=" + + name + + ",type=" + + type + + ",min=" + + min + + ",max=" + + max + + ",keepAlive=" + + keepAlive + + ",queueSize=" + + queueSize + + "]"; + } + } /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 5c2f9646f03b3..33e8da80ff26c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -163,6 +163,7 @@ import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; @@ -204,17 +205,21 @@ import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; @@ -2915,4 +2920,63 @@ protected static void deletePipeline(String id) { ) ); } + + public static void waitForTimeToElapse(long elapsedMillis) throws InterruptedException { + final ThreadPool[] threadPools = StreamSupport.stream(internalCluster().getInstances(ClusterService.class).spliterator(), false) + .map(ClusterService::threadPool) + .toArray(ThreadPool[]::new); + final long[] startTimes = Arrays.stream(threadPools).mapToLong(ThreadPool::relativeTimeInMillis).toArray(); + + final var startNanoTime = System.nanoTime(); + while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= elapsedMillis) { + // noinspection BusyWait + Thread.sleep(elapsedMillis); + } + + outer: do { + for (int i = 0; i < threadPools.length; i++) { + if (threadPools[i].relativeTimeInMillis() <= startTimes[i]) { + // noinspection BusyWait + Thread.sleep(elapsedMillis); + continue outer; + } + } + return; + } while (true); + } + + /** + * Submits as many tasks to the given data node's write thread pool as there are write threads. These tasks will wait on the barrier + * that is returned, which waits for total-write-threads + 1 callers. The caller can release the tasks by calling + * {@code barrier.await()} or interrupt them with {@code barrier.reset()}. + */ + public CyclicBarrier blockDataNodeIndexing(String dataNodeName) { + // Block the executor workers to simulate long-running write tasks + var threadpool = internalCluster().getInstance(ThreadPool.class, dataNodeName); + var executor = threadpool.executor(ThreadPool.Names.WRITE); + final var executorInfo = threadpool.info(ThreadPool.Names.WRITE); + final var executorThreads = executorInfo.getMax(); + var barrier = new CyclicBarrier(executorThreads + 1); + for (int i = 0; i < executorThreads; i++) { + executor.execute(() -> longAwait(barrier)); + } + logger.info( + "---> Submitted [" + + executorThreads + + "] tasks to the write thread pool that will wait on a barrier until released. Write thread pool info: " + + executorInfo + ); + return barrier; + } + + private static void longAwait(CyclicBarrier barrier) { + try { + barrier.await(30, TimeUnit.SECONDS); + } catch (BrokenBarrierException | TimeoutException e) { + throw new AssertionError(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + } }