Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132675.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132675
summary: Add second max queue latency stat to `ClusterInfo`
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -59,6 +60,8 @@
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.util.set.Sets.newHashSet;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -346,6 +349,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
// Manually control cluster info refreshes
.put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m")
.build();
var masterName = internalCluster().startMasterOnlyNode(settings);
var dataNodeName = internalCluster().startDataOnlyNode(settings);
Expand All @@ -369,11 +374,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
}
);

// Do some writes to create some write thread pool activity.
final String indexName = randomIdentifier();
for (int i = 0; i < randomIntBetween(1, 1000); i++) {
index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar"));
}
// Generate some writes to get some non-zero write thread pool stats.
doALotOfDataNodeWrites();

// Force a refresh of the ClusterInfo state to collect fresh info from the data nodes.
final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
Expand All @@ -387,7 +389,7 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {

final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collectedg
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected
var dataNodeId = getNodeId(dataNodeName);
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
assertNotNull(nodeUsageStatsForThreadPool);
Expand All @@ -400,4 +402,174 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
}

/**
* The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latencies:
* {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()} and
* {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueueMillis()}. The latter looks at currently queued tasks,
* and the former tracks the queue latency of tasks when they are taken off of the queue to start execution.
*/
public void testMaxQueueLatenciesInClusterInfo() throws Exception {
var settings = Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
// Manually control cluster info refreshes
.put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m")
.build();
var masterName = internalCluster().startMasterOnlyNode(settings);
var dataNodeName = internalCluster().startDataOnlyNode(settings);
ensureStableCluster(2);
assertEquals(internalCluster().getMasterName(), masterName);
assertNotEquals(internalCluster().getMasterName(), dataNodeName);
logger.info("---> master node: " + masterName + ", data node: " + dataNodeName);

// Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads.
var barrier = blockDataNodeIndexing(dataNodeName);
try {
// Arbitrary number of tasks, which will queue because all the write threads are occupied already, greater than one: only
// strictly need a single task to occupy the queue.
int numberOfTasks = randomIntBetween(1, 5);
Thread[] threadsToJoin = new Thread[numberOfTasks];
String indexName = randomIdentifier();
createIndex(
indexName,
// NB: Set 0 replicas so that there aren't any stray GlobalCheckpointSyncAction tasks on the write thread pool.
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5)).put(SETTING_NUMBER_OF_REPLICAS, 0).build()
);
for (int i = 0; i < numberOfTasks; ++i) {
threadsToJoin[i] = startParallelSingleWrite(indexName);
}

// Reach into the data node's write thread pool to check that tasks have reached the queue.
var dataNodeThreadPool = internalCluster().getInstance(ThreadPool.class, dataNodeName);
var writeExecutor = dataNodeThreadPool.executor(ThreadPool.Names.WRITE);
assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor;
var trackingWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor;
assertBusy(
// Wait for the parallel threads' writes to get queued in the write thread pool.
() -> assertThat(
"Write thread pool dump: " + trackingWriteExecutor,
trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(),
greaterThan(0L)
)
);

// Force a refresh of the ClusterInfo state to collect fresh info from the data node.
final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
InternalClusterInfoService.class,
internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class)
);
final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);

// Since tasks are actively queued right now, #peekMaxQueueLatencyInQueue, which is called from the
// TransportNodeUsageStatsForThreadPoolsAction that a ClusterInfoService refresh initiates, should return a max queue
// latency > 0;
{
final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data node should be collected
var dataNodeId = getNodeId(dataNodeName);
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
assertNotNull(nodeUsageStatsForThreadPool);
logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);

assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0f));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThan(0L));
}

// Now release the data node's indexing, and drain the queued tasks. Max queue latency of executed (not queued) tasks is reset
// by each TransportNodeUsageStatsForThreadPoolsAction call (#getMaxQueueLatencyMillisSinceLastPollAndReset), so the new queue
// latencies will be present in the next call. There will be nothing in the queue to peek at now, so the result of the max
// queue latency result in TransportNodeUsageStatsForThreadPoolsAction will reflect
// #getMaxQueueLatencyMillisSinceLastPollAndReset and not #peekMaxQueueLatencyInQueue.
barrier.await();
for (int i = 0; i < numberOfTasks; ++i) {
threadsToJoin[i].join();
}
assertThat(
"Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor,
trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(),
equalTo(0L)
);

final ClusterInfo nextClusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);
{
final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = nextClusterInfo
.getNodeUsageStatsForThreadPools();
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected
var dataNodeId = getNodeId(dataNodeName);
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
assertNotNull(nodeUsageStatsForThreadPool);
logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);

assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
}
} finally {
// Ensure that the write threads have been released by signalling an interrupt on any callers waiting on the barrier. If the
// callers have already all been successfully released, then there will be nothing left to interrupt.
logger.info("---> Ensuring release of the barrier on write thread pool tasks");
barrier.reset();
}

// Now that there's nothing in the queue, and no activity since the last ClusterInfo refresh, the max latency returned should be
// zero. Verify this.
final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
InternalClusterInfoService.class,
internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class)
);
final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);
{
final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected
var dataNodeId = getNodeId(dataNodeName);
var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
assertNotNull(nodeUsageStatsForThreadPool);
logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);

assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), equalTo(0f));
assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(0L));
}
}

/**
* Do some writes to create some write thread pool activity.
*/
private void doALotOfDataNodeWrites() {
final String indexName = randomIdentifier();
final int randomInt = randomIntBetween(500, 1000);
for (int i = 0; i < randomInt; i++) {
index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar"));
}
}

/**
* Starts a single index request on a parallel thread and returns the reference so {@link Thread#join()} can be called eventually.
*/
private Thread startParallelSingleWrite(String indexName) {
Thread running = new Thread(() -> doSingleWrite(indexName));
running.start();
return running;
}

private void doSingleWrite(String indexName) {
final int randomId = randomIntBetween(500, 1000);
index(indexName, Integer.toString(randomId), Collections.singletonMap("foo", "bar"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
(float) trackingForWriteExecutor.pollUtilization(
TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION
),
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset()
Math.max(
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(),
trackingForWriteExecutor.peekMaxQueueLatencyInQueueMillis()
)
);

Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public int getCurrentQueueSize() {
* Returns the max queue latency seen since the last time that this method was called. Every call will reset the max seen back to zero.
* Latencies are only observed as tasks are taken off of the queue. This means that tasks in the queue will not contribute to the max
* latency until they are unqueued and handed to a thread to execute. To see the latency of tasks still in the queue, use
* {@link #peekMaxQueueLatencyInQueue}. If there have been no tasks in the queue since the last call, then zero latency is returned.
* {@link #peekMaxQueueLatencyInQueueMillis}. If there have been no tasks in the queue since the last call, then zero latency is
* returned.
*/
public long getMaxQueueLatencyMillisSinceLastPollAndReset() {
if (trackMaxQueueLatency == false) {
Expand All @@ -164,23 +165,29 @@ public long getMaxQueueLatencyMillisSinceLastPollAndReset() {
* Returns the queue latency of the next task to be executed that is still in the task queue. Essentially peeks at the front of the
* queue and calculates how long it has been there. Returns zero if there is no queue.
*/
public long peekMaxQueueLatencyInQueue() {
public long peekMaxQueueLatencyInQueueMillis() {
if (trackMaxQueueLatency == false) {
return 0;
}

var queue = getQueue();
if (queue.isEmpty()) {
assert queue instanceof LinkedTransferQueue || queue instanceof SizeBlockingQueue
: "Not the type of queue expected: " + queue.getClass();
var linkedTransferOrSizeBlockingQueue = queue instanceof LinkedTransferQueue
? (LinkedTransferQueue) queue
: (SizeBlockingQueue) queue;

var task = linkedTransferOrSizeBlockingQueue.peek();
if (task == null) {
// There's nothing in the queue right now.
return 0;
}
assert queue instanceof LinkedTransferQueue : "Not the type of queue expected: " + queue.getClass();
var linkedTransferQueue = (LinkedTransferQueue) queue;

var task = linkedTransferQueue.peek();
assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass();
var wrappedTask = ((WrappedRunnable) task).unwrap();
assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass();
var timedTask = (TimedRunnable) wrappedTask;
return timedTask.getTimeSinceCreationNanos();
return TimeUnit.NANOSECONDS.toMillis(timedTask.getTimeSinceCreationNanos());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public String toString() {
return "Info[name="
+ name
+ ",type="
+ type
+ ",min="
+ min
+ ",max="
+ max
+ ",keepAlive="
+ keepAlive
+ ",queueSize="
+ queueSize
+ "]";
}

}

/**
Expand Down
Loading