Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132675.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132675
summary: Add second max queue latency stat to `ClusterInfo`
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -346,6 +347,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
// Manually control cluster info refreshes
.put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m")
.build();
var masterName = internalCluster().startMasterOnlyNode(settings);
var dataNodeName = internalCluster().startDataOnlyNode(settings);
Expand All @@ -369,11 +372,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
}
);

// Do some writes to create some write thread pool activity.
final String indexName = randomIdentifier();
for (int i = 0; i < randomIntBetween(1, 1000); i++) {
index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar"));
}
// Generate some writes to get some non-zero write thread pool stats.
doALotOfDataNodeWrites();

// Force a refresh of the ClusterInfo state to collect fresh info from the data nodes.
final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
Expand All @@ -387,7 +387,7 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {

final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collectedg
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected
var dataNodeId = getNodeId(dataNodeName);
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
assertNotNull(nodeUsageStatsForThreadPool);
Expand All @@ -400,4 +400,154 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
}

/**
* The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latency:
* {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()} and
* {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue()}. The latter looks at currently queued tasks, and
* the former tracks the queue latency of tasks when they are taken off of the queue to start execution.
*/
public void testMaxQueueLatenciesInClusterInfo() throws Exception {
var settings = Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
.put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m")
.build();
var masterName = internalCluster().startMasterOnlyNode(settings);
var dataNodeName = internalCluster().startDataOnlyNode(settings);
ensureStableCluster(2);
assertEquals(internalCluster().getMasterName(), masterName);
assertNotEquals(internalCluster().getMasterName(), dataNodeName);
logger.info("---> master node: " + masterName + ", data node: " + dataNodeName);

// Block indexing on the data node by submitting write thread pool tasks to equal the number of write threads.
var barrier = blockDataNodeIndexing(dataNodeName);
try {
// Arbitrary number of tasks to queue greater than one (only strictly need a single task to occupy the queue).
int randomInt = randomIntBetween(1, 5);
Thread[] threadsToJoin = new Thread[randomInt];
for (int i = 0; i < randomInt; ++i) {
threadsToJoin[i] = startParallelSingleWrite();
}

// Reach into the data node's write thread pool to check that tasks have reached the queue.
var dataNodeThreadPool = internalCluster().getInstance(ThreadPool.class, dataNodeName);
var writeExecutor = dataNodeThreadPool.executor(ThreadPool.Names.WRITE);
assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor;
var trackingWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor;
assertBusy(
// Wait for the parallel threads' writes to get queued in the write thread pool.
() -> assertThat(
"Write thread pool dump: " + trackingWriteExecutor,
trackingWriteExecutor.peekMaxQueueLatencyInQueue(),
greaterThan(0L)
)
);

// Ensure that some amount of time has passed on the thread pool.
long queuedElapsedMillis = 100;
ESIntegTestCase.waitForTimeToElapse(queuedElapsedMillis);

// Force a refresh of the ClusterInfo state to collect fresh info from the data nodes.
final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
InternalClusterInfoService.class,
internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class)
);
final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);

// Since tasks are actively queued right now, #peekMaxQueueLatencyInQueue called from the
// TransportNodeUsageStatsForThreadPoolsAction that ClusterInfoService refresh initiated should have returned a max queue
// latency >= queuedElapsedMillis.
{
final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected
var dataNodeId = getNodeId(dataNodeName);
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
assertNotNull(nodeUsageStatsForThreadPool);
logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);

assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0f));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis));
}

// Now release the data node's indexing, and drain the queued tasks. Max queue latency of executed (not queued) tasks is reset
// by each TransportNodeUsageStatsForThreadPoolsAction call (#getMaxQueueLatencyMillisSinceLastPollAndReset), so the new queue
// latencies will be present in the next call. There should be nothing in the queue now to peek at (or if there is some ambient
// node activity, it will have a relatively short queue time), so the result of the max queue latency result in
// TransportNodeUsageStatsForThreadPoolsAction should now reflect #getMaxQueueLatencyMillisSinceLastPollAndReset and not
// #peekMaxQueueLatencyInQueue.
barrier.await();
for (int i = 0; i < randomInt; ++i) {
threadsToJoin[i].join();
}
assertBusy(
// Wait for any other tasks that might have been queued.
// NB: cannot assert that the queue is immediately empty because of other ambient node activity and potentially a single
// write thread available due to a limited number of processors available on test machines.
() -> assertThat(
"Write thread pool dump: " + trackingWriteExecutor,
trackingWriteExecutor.peekMaxQueueLatencyInQueue(),
equalTo(0L)
)
);
final ClusterInfo nextClusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);
{
final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = nextClusterInfo
.getNodeUsageStatsForThreadPools();
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected
var dataNodeId = getNodeId(dataNodeName);
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
assertNotNull(nodeUsageStatsForThreadPool);
logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);

assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(queuedElapsedMillis));
}
} finally {
// Ensure that the write threads have been released by signalling an interrupt on any callers waiting on the barrier.
logger.info("---> Ensuring release of the barrier on write thread pool tasks");
barrier.reset();
}

// NB: cannot check for a max queue latency of 0 with no further write tasks submitted. This is because the test machine may be
// running as few as a single write thread, and queuing due to ambient node activity is a possibility.
}

/**
* Do some writes to create some write thread pool activity.
*/
private void doALotOfDataNodeWrites() {
final String indexName = randomIdentifier();
final int randomInt = randomIntBetween(500, 1000);
for (int i = 0; i < randomInt; i++) {
index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar"));
}
}

/**
* Starts a single index request on a parallel thread and returns the reference so {@link Thread#join()} can be called eventually.
*/
private Thread startParallelSingleWrite() {
Thread running = new Thread(() -> doSingleWrite());
running.start();
return running;
}

private void doSingleWrite() {
final String indexName = randomIdentifier();
final int randomId = randomIntBetween(500, 1000);
index(indexName, Integer.toString(randomId), Collections.singletonMap("foo", "bar"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -27,7 +26,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -410,7 +408,7 @@ public void onFailure(Exception e) {
});
}

waitForTimeToElapse();
ESIntegTestCase.waitForTimeToElapse(100);

pendingClusterTasks = clusterService.getMasterService().pendingTasks();
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5));
Expand All @@ -433,28 +431,4 @@ public void onFailure(Exception e) {
block2.countDown();
}
}

private static void waitForTimeToElapse() throws InterruptedException {
final ThreadPool[] threadPools = StreamSupport.stream(internalCluster().getInstances(ClusterService.class).spliterator(), false)
.map(ClusterService::threadPool)
.toArray(ThreadPool[]::new);
final long[] startTimes = Arrays.stream(threadPools).mapToLong(ThreadPool::relativeTimeInMillis).toArray();

final var startNanoTime = System.nanoTime();
while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 100) {
// noinspection BusyWait
Thread.sleep(100);
}

outer: do {
for (int i = 0; i < threadPools.length; i++) {
if (threadPools[i].relativeTimeInMillis() <= startTimes[i]) {
// noinspection BusyWait
Thread.sleep(100);
continue outer;
}
}
return;
} while (true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
(float) trackingForWriteExecutor.pollUtilization(
TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION
),
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset()
Math.max(
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(),
trackingForWriteExecutor.peekMaxQueueLatencyInQueue()
)
);

Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

package org.elasticsearch.common.util.concurrent;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -37,6 +39,8 @@
*/
public class EsExecutors {

private static final Logger logger = LogManager.getLogger(EsExecutors.class);

// although the available processors may technically change, for node sizing we use the number available at launch
private static final int MAX_NUM_PROCESSORS = Runtime.getRuntime().availableProcessors();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,13 @@ public long peekMaxQueueLatencyInQueue() {
if (queue.isEmpty()) {
return 0;
}
assert queue instanceof LinkedTransferQueue : "Not the type of queue expected: " + queue.getClass();
var linkedTransferQueue = (LinkedTransferQueue) queue;
assert queue instanceof LinkedTransferQueue || queue instanceof SizeBlockingQueue
: "Not the type of queue expected: " + queue.getClass();
var linkedTransferOrSizeBlockingQueue = queue instanceof LinkedTransferQueue
? (LinkedTransferQueue) queue
: (SizeBlockingQueue) queue;

var task = linkedTransferQueue.peek();
var task = linkedTransferOrSizeBlockingQueue.peek();
assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass();
var wrappedTask = ((WrappedRunnable) task).unwrap();
assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public String toString() {
return "Info[name="
+ name
+ ",type="
+ type
+ ",min="
+ min
+ ",max="
+ max
+ ",keepAlive="
+ keepAlive
+ ",queueSize="
+ queueSize
+ "]";
}

}

/**
Expand Down
Loading