Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ static TransportVersion def(int id) {
public static final TransportVersion RERANK_SNIPPETS = def(9_130_0_00);
public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_131_0_00);
public static final TransportVersion COMPONENT_TEMPLATE_TRACKING_INFO = def(9_132_0_00);
public static final TransportVersion UTILIZATION_IN_THREAD_POOL_NODE_STATS = def(9_133_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public double pollUtilization(UtilizationTrackingPurpose utilizationTrackingPurp
}
}

public float getUtilization() {
return (float) apmUtilizationTracker.getLastValue();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a stop-gap until we have the singular utilisation value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:'-)


@Override
protected void beforeExecute(Thread t, Runnable r) {
if (trackOngoingTasks) {
Expand Down Expand Up @@ -262,6 +266,7 @@ public boolean trackingMaxQueueLatency() {
private class UtilizationTracker {
long lastPollTime = System.nanoTime();
long lastTotalExecutionTime = 0;
volatile double lastValue = Float.NaN;

public synchronized double pollUtilization() {
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
Expand All @@ -275,8 +280,18 @@ public synchronized double pollUtilization() {

lastTotalExecutionTime = currentTotalExecutionTimeNanos;
lastPollTime = currentPollTimeNanos;
lastValue = utilizationSinceLastPoll;

return utilizationSinceLastPoll;
}

/**
* Get the most recently calculated value
*
* @return the most recently calculated value, or NaN if it's never been calculated
*/
public double getLastValue() {
return lastValue;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ public ThreadPoolStats stats() {
long rejected = -1;
int largest = -1;
long completed = -1;
float utilization = Float.NaN;
if (holder.executor() instanceof ThreadPoolExecutor threadPoolExecutor) {
threads = threadPoolExecutor.getPoolSize();
queue = threadPoolExecutor.getQueue().size();
Expand All @@ -462,7 +463,10 @@ public ThreadPoolStats stats() {
rejected = handler.rejected();
}
}
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed));
if (holder.executor() instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor taskExecutionTimeTrackingEsThreadPoolExecutor) {
utilization = taskExecutionTimeTrackingEsThreadPoolExecutor.getUtilization();
}
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, utilization));
}
return new ThreadPoolStats(stats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.threadpool;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -25,6 +26,7 @@
import java.util.Iterator;
import java.util.List;

import static java.lang.Float.NaN;
import static java.util.Collections.emptyIterator;
import static org.elasticsearch.common.collect.Iterators.single;

Expand All @@ -41,14 +43,27 @@ public static ThreadPoolStats merge(ThreadPoolStats first, ThreadPoolStats secon
return new ThreadPoolStats(mergedThreadPools.values());
}

public record Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed)
public record Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed, float utilization)
implements
Writeable,
ChunkedToXContent,
Comparable<Stats> {

public Stats(StreamInput in) throws IOException {
this(in.readString(), in.readInt(), in.readInt(), in.readInt(), in.readLong(), in.readInt(), in.readLong());
public static Stats readFrom(StreamInput in) throws IOException {
final String name = in.readString();
final int threads = in.readInt();
final int queue = in.readInt();
final int active = in.readInt();
final long rejected = in.readLong();
final int largest = in.readInt();
final long completed = in.readLong();
final float utilization;
if (in.getTransportVersion().onOrAfter(TransportVersions.UTILIZATION_IN_THREAD_POOL_NODE_STATS)) {
utilization = in.readFloat();
} else {
utilization = NaN;
}
return new Stats(name, threads, queue, active, rejected, largest, completed, utilization);
}

static Stats merge(Stats firstStats, Stats secondStats) {
Expand All @@ -59,7 +74,8 @@ static Stats merge(Stats firstStats, Stats secondStats) {
sumStat(firstStats.active, secondStats.active),
sumStat(firstStats.rejected, secondStats.rejected),
sumStat(firstStats.largest, secondStats.largest),
sumStat(firstStats.completed, secondStats.completed)
sumStat(firstStats.completed, secondStats.completed),
NaN // Don't sum utilization, it makes no sense
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used by the get cluster info REST api to merge the stats for each thread pool received from each node. I don't think it makes sense to merge utilization here, and in any case we don't render it in toXContent. Perhaps if/when we start rendering it we can implement a sensible merge?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the indication that we don't really want to do this here. Although this might seem simpler than #131480 in terms of lines-of-code added, IMO that's because this is a bit of a short-cut. Fields in the transport messages which aren't represented in the REST API are tech debt.

I'd rather we moved towards having the allocator use dedicated messages for all these things rather than relying on stats APIs - the stats APIs we use today return a bunch of stuff about which we don't care.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a few like this now for features that aren't settled. Not because it was too hard, but rather to allow the freedom to change them before we commit to something that would be considered a breaking change.
Point taken about moving to dedicated messages though, this approach was an attempt to remain consistent with existing code, but it sounds like we don't want to do that.

);
}

Expand Down Expand Up @@ -96,6 +112,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(rejected);
out.writeInt(largest);
out.writeLong(completed);
if (out.getTransportVersion().onOrAfter(TransportVersions.UTILIZATION_IN_THREAD_POOL_NODE_STATS)) {
out.writeFloat(utilization);
}
}

@Override
Expand Down Expand Up @@ -125,6 +144,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
rejected != -1 ? single((builder, params) -> builder.field(Fields.REJECTED, rejected)) : emptyIterator(),
largest != -1 ? single((builder, params) -> builder.field(Fields.LARGEST, largest)) : emptyIterator(),
completed != -1 ? single((builder, params) -> builder.field(Fields.COMPLETED, completed)) : emptyIterator(),
// TODO: Leaving out thread pool utilization until we've settled on its final form
ChunkedToXContentHelper.endObject()
);
}
Expand All @@ -137,7 +157,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
}

public ThreadPoolStats(StreamInput in) throws IOException {
this(in.readCollectionAsList(Stats::new));
this(in.readCollectionAsList(Stats::readFrom));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
public class ThreadPoolStatsTests extends ESTestCase {
public void testThreadPoolStatsConstructorSortTheStats() {
var unorderedStats = List.of(
new ThreadPoolStats.Stats("z", 7, 0, 0, 0, 0, 0L),
new ThreadPoolStats.Stats("m", 5, 0, 0, 0, 0, 0L),
new ThreadPoolStats.Stats("m", -3, 0, 0, 0, 0, 0L),
new ThreadPoolStats.Stats("d", 2, 0, 0, 0, 0, 0L),
new ThreadPoolStats.Stats("m", 4, 0, 0, 0, 0, 0L),
new ThreadPoolStats.Stats("t", 6, 0, 0, 0, 0, 0L),
new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L)
new ThreadPoolStats.Stats("z", 7, 0, 0, 0, 0, 0L, Float.NaN),
new ThreadPoolStats.Stats("m", 5, 0, 0, 0, 0, 0L, Float.NaN),
new ThreadPoolStats.Stats("m", -3, 0, 0, 0, 0, 0L, Float.NaN),
new ThreadPoolStats.Stats("d", 2, 0, 0, 0, 0, 0L, Float.NaN),
new ThreadPoolStats.Stats("m", 4, 0, 0, 0, 0, 0L, Float.NaN),
new ThreadPoolStats.Stats("t", 6, 0, 0, 0, 0, 0L, Float.NaN),
new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L, Float.NaN)
);

var copy = new ArrayList<>(unorderedStats);
Expand Down Expand Up @@ -71,7 +71,7 @@ public void testStatsMerge() {
}

private static ThreadPoolStats.Stats stats(int value) {
return new ThreadPoolStats.Stats("a", value, value, value, value, value, value);
return new ThreadPoolStats.Stats("a", value, value, value, value, value, value, Float.NaN);
}

public void testSerialization() throws IOException {
Expand Down Expand Up @@ -100,7 +100,8 @@ public static ThreadPoolStats.Stats randomStats(String name) {
randomMinusOneOrOther(),
randomMinusOneOrOther(),
randomMinusOneOrOther(),
randomMinusOneOrOther()
randomMinusOneOrOther(),
randomBoolean() ? Float.NaN : randomFloatBetween(0.0f, 1.0f, true)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,12 @@ private static NodeStats mockNodeStats() {

// Threadpools
final List<ThreadPoolStats.Stats> threadpools = new ArrayList<>();
threadpools.add(new ThreadPoolStats.Stats("generic", (int) ++iota, (int) ++iota, (int) no, ++iota, (int) no, no));
threadpools.add(new ThreadPoolStats.Stats("get", (int) ++iota, (int) ++iota, (int) no, ++iota, (int) no, no));
threadpools.add(new ThreadPoolStats.Stats("management", (int) ++iota, (int) ++iota, (int) no, ++iota, (int) no, no));
threadpools.add(new ThreadPoolStats.Stats("search", (int) ++iota, (int) ++iota, (int) no, ++iota, (int) no, no));
threadpools.add(new ThreadPoolStats.Stats("watcher", (int) ++iota, (int) ++iota, (int) no, ++iota, (int) no, no));
threadpools.add(new ThreadPoolStats.Stats("write", (int) ++iota, (int) ++iota, (int) no, ++iota, (int) no, no));
threadpools.add(new ThreadPoolStats.Stats("generic", (int) ++iota, (int) ++iota, (int) no, ++iota, (int) no, no, no));
threadpools.add(new ThreadPoolStats.Stats("get", (int) ++iota, (int) ++iota, (int) no, ++iota, (int) no, no, no));
threadpools.add(new ThreadPoolStats.Stats("management", (int) ++iota, (int) ++iota, (int) no, ++iota, (int) no, no, no));
threadpools.add(new ThreadPoolStats.Stats("search", (int) ++iota, (int) ++iota, (int) no, ++iota, (int) no, no, no));
threadpools.add(new ThreadPoolStats.Stats("watcher", (int) ++iota, (int) ++iota, (int) no, ++iota, (int) no, no, no));
threadpools.add(new ThreadPoolStats.Stats("write", (int) ++iota, (int) ++iota, (int) no, ++iota, (int) no, no, no));
final ThreadPoolStats threadPool = new ThreadPoolStats(threadpools);

final DiscoveryNode discoveryNode = DiscoveryNodeUtils.builder("_node_id")
Expand Down