Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -346,6 +347,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
// Manually control cluster info refreshes
.put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m")
.build();
var masterName = internalCluster().startMasterOnlyNode(settings);
var dataNodeName = internalCluster().startDataOnlyNode(settings);
Expand All @@ -369,11 +372,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
}
);

// Do some writes to create some write thread pool activity.
final String indexName = randomIdentifier();
for (int i = 0; i < randomIntBetween(1, 1000); i++) {
index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar"));
}
// Generate some writes to get some non-zero write thread pool stats.
doALotOfDataNodeWrites();

// Force a refresh of the ClusterInfo state to collect fresh info from the data nodes.
final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
Expand All @@ -387,7 +387,7 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {

final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collectedg
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected
var dataNodeId = getNodeId(dataNodeName);
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
assertNotNull(nodeUsageStatsForThreadPool);
Expand All @@ -400,4 +400,154 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
}

/**
* The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latency:
* {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()} and
* {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue()}. The latter looks at currently queued tasks, and
* the former tracks the queue latency of tasks when they are taken off of the queue to start execution.
*/
public void testMaxQueueLatenciesInClusterInfo() throws Exception {
var settings = Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
.put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m")
.build();
var masterName = internalCluster().startMasterOnlyNode(settings);
var dataNodeName = internalCluster().startDataOnlyNode(settings);
ensureStableCluster(2);
assertEquals(internalCluster().getMasterName(), masterName);
assertNotEquals(internalCluster().getMasterName(), dataNodeName);
logger.info("---> master node: " + masterName + ", data node: " + dataNodeName);

// Block indexing on the data node by submitting write thread pool tasks to equal the number of write threads.
var barrier = blockDataNodeIndexing(dataNodeName);
try {
// Arbitrary number of tasks to queue greater than one (only strictly need a single task to occupy the queue).
int randomInt = randomIntBetween(1, 5);
Thread[] threadsToJoin = new Thread[randomInt];
for (int i = 0; i < randomInt; ++i) {
threadsToJoin[i] = startParallelSingleWrite();
}

// Reach into the data node's write thread pool to check that tasks have reached the queue.
var dataNodeThreadPool = internalCluster().getInstance(ThreadPool.class, dataNodeName);
var writeExecutor = dataNodeThreadPool.executor(ThreadPool.Names.WRITE);
assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor;
var trackingWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor;
assertBusy(
// Wait for the parallel threads' writes to get queued in the write thread pool.
() -> assertThat(
"Write thread pool dump: " + trackingWriteExecutor,
trackingWriteExecutor.peekMaxQueueLatencyInQueue(),
greaterThan(0L)
)
);

// Ensure that some amount of time has passed on the thread pool.
long queuedElapsedMillis = 100;
ESIntegTestCase.waitForTimeToElapse(queuedElapsedMillis);

// Force a refresh of the ClusterInfo state to collect fresh info from the data nodes.
final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
InternalClusterInfoService.class,
internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class)
);
final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);

// Since tasks are actively queued right now, #peekMaxQueueLatencyInQueue called from the
// TransportNodeUsageStatsForThreadPoolsAction that ClusterInfoService refresh initiated should have returned a max queue
// latency >= queuedElapsedMillis.
{
final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected
var dataNodeId = getNodeId(dataNodeName);
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
assertNotNull(nodeUsageStatsForThreadPool);
logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);

assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0f));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis));
}

// Now release the data node's indexing, and drain the queued tasks. Max queue latency of executed (not queued) tasks is reset
// by each TransportNodeUsageStatsForThreadPoolsAction call (#getMaxQueueLatencyMillisSinceLastPollAndReset), so the new queue
// latencies will be present in the next call. There should be nothing in the queue now to peek at (or if there is some ambient
// node activity, it will have a relatively short queue time), so the result of the max queue latency result in
// TransportNodeUsageStatsForThreadPoolsAction should now reflect #getMaxQueueLatencyMillisSinceLastPollAndReset and not
// #peekMaxQueueLatencyInQueue.
barrier.await();
for (int i = 0; i < randomInt; ++i) {
threadsToJoin[i].join();
}
assertBusy(
// Wait for any other tasks that might have been queued.
// NB: cannot assert that the queue is immediately empty because of other ambient node activity and potentially a single
// write thread available due to a limited number of processors available on test machines.
() -> assertThat(
"Write thread pool dump: " + trackingWriteExecutor,
trackingWriteExecutor.peekMaxQueueLatencyInQueue(),
equalTo(0L)
)
);
final ClusterInfo nextClusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);
{
final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = nextClusterInfo
.getNodeUsageStatsForThreadPools();
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected
var dataNodeId = getNodeId(dataNodeName);
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
assertNotNull(nodeUsageStatsForThreadPool);
logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);

assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis));
}
} finally {
// Ensure that the write threads have been released by signalling an interrupt on any callers waiting on the barrier.
logger.info("---> Ensuring release of the barrier on write thread pool tasks");
barrier.reset();
}

// NB: cannot check for a max queue latency of 0 with no further write tasks submitted. This is because the test machine may be
// running as few as a single write thread, and queuing due to ambient node activity is a possibility.
}

/**
* Do some writes to create some write thread pool activity.
*/
private void doALotOfDataNodeWrites() {
final String indexName = randomIdentifier();
final int randomInt = randomIntBetween(500, 1000);
for (int i = 0; i < randomInt; i++) {
index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar"));
}
}

/**
* Starts a single index request on a parallel thread and returns the reference so {@link Thread#join()} can be called eventually.
*/
private Thread startParallelSingleWrite() {
Thread running = new Thread(() -> doSingleWrite());
running.start();
return running;
}

private void doSingleWrite() {
final String indexName = randomIdentifier();
final int randomId = randomIntBetween(500, 1000);
index(indexName, Integer.toString(randomId), Collections.singletonMap("foo", "bar"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -27,7 +26,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -410,7 +408,7 @@ public void onFailure(Exception e) {
});
}

waitForTimeToElapse();
ESIntegTestCase.waitForTimeToElapse(100);

pendingClusterTasks = clusterService.getMasterService().pendingTasks();
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5));
Expand All @@ -433,28 +431,4 @@ public void onFailure(Exception e) {
block2.countDown();
}
}

private static void waitForTimeToElapse() throws InterruptedException {
final ThreadPool[] threadPools = StreamSupport.stream(internalCluster().getInstances(ClusterService.class).spliterator(), false)
.map(ClusterService::threadPool)
.toArray(ThreadPool[]::new);
final long[] startTimes = Arrays.stream(threadPools).mapToLong(ThreadPool::relativeTimeInMillis).toArray();

final var startNanoTime = System.nanoTime();
while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 100) {
// noinspection BusyWait
Thread.sleep(100);
}

outer: do {
for (int i = 0; i < threadPools.length; i++) {
if (threadPools[i].relativeTimeInMillis() <= startTimes[i]) {
// noinspection BusyWait
Thread.sleep(100);
continue outer;
}
}
return;
} while (true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
(float) trackingForWriteExecutor.pollUtilization(
TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION
),
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset()
Math.max(
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(),
trackingForWriteExecutor.peekMaxQueueLatencyInQueue()
)
);

Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

package org.elasticsearch.common.util.concurrent;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -37,6 +39,8 @@
*/
public class EsExecutors {

private static final Logger logger = LogManager.getLogger(EsExecutors.class);

// although the available processors may technically change, for node sizing we use the number available at launch
private static final int MAX_NUM_PROCESSORS = Runtime.getRuntime().availableProcessors();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,13 @@ public long peekMaxQueueLatencyInQueue() {
if (queue.isEmpty()) {
return 0;
}
assert queue instanceof LinkedTransferQueue : "Not the type of queue expected: " + queue.getClass();
var linkedTransferQueue = (LinkedTransferQueue) queue;
assert queue instanceof LinkedTransferQueue || queue instanceof SizeBlockingQueue
: "Not the type of queue expected: " + queue.getClass();
var linkedTransferOrSizeBlockingQueue = queue instanceof LinkedTransferQueue
? (LinkedTransferQueue) queue
: (SizeBlockingQueue) queue;

var task = linkedTransferQueue.peek();
var task = linkedTransferOrSizeBlockingQueue.peek();
assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass();
var wrappedTask = ((WrappedRunnable) task).unwrap();
assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass();
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public String toString() {
return "Info[name="
+ name
+ ",type="
+ type
+ ",min="
+ min
+ ",max="
+ max
+ ",keepAlive="
+ keepAlive
+ ",queueSize="
+ queueSize
+ "]";
}

}

/**
Expand Down
Loading