-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Implement WriteLoadConstraintMonitor #132917
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
2dd9eac
471bc19
0e4db55
7ab66a7
018e100
6bd054f
2484a77
899ba00
7049a5e
c57935e
6971468
d3308d2
b816a15
8ad414b
62bdc83
a326745
ee54418
d8ff9e5
996ac06
2f7123f
1789a51
6e5d393
8a66257
8a2b43b
8b23b03
1cd4154
1a4979a
98ba24d
1d63962
3510567
2abd946
06beb49
81eb6e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,26 +15,33 @@ | |
import org.elasticsearch.cluster.ClusterInfo; | ||
import org.elasticsearch.cluster.ClusterInfoService; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; | ||
import org.elasticsearch.cluster.routing.RerouteService; | ||
import org.elasticsearch.cluster.routing.RoutingNodes; | ||
import org.elasticsearch.cluster.routing.ShardRoutingState; | ||
import org.elasticsearch.common.Priority; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.settings.ClusterSettings; | ||
import org.elasticsearch.common.util.set.Sets; | ||
import org.elasticsearch.gateway.GatewayService; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
|
||
import java.util.Set; | ||
import java.util.function.LongSupplier; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Monitors the node-level write thread pool usage across the cluster and initiates (coming soon) a rebalancing round (via | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
* {@link RerouteService#reroute}) whenever a node crosses the node-level write load thresholds. | ||
* | ||
* TODO (ES-11992): implement | ||
*/ | ||
public class WriteLoadConstraintMonitor { | ||
private static final Logger logger = LogManager.getLogger(WriteLoadConstraintMonitor.class); | ||
private final WriteLoadConstraintSettings writeLoadConstraintSettings; | ||
private final Supplier<ClusterState> clusterStateSupplier; | ||
private final LongSupplier currentTimeMillisSupplier; | ||
private final RerouteService rerouteService; | ||
private volatile long lastRerouteTimeMillis = 0; | ||
private volatile Set<String> lastSetOfHotSpottedNodes = Set.of(); | ||
|
||
public WriteLoadConstraintMonitor( | ||
ClusterSettings clusterSettings, | ||
|
@@ -60,29 +67,76 @@ public void onNewInfo(ClusterInfo clusterInfo) { | |
return; | ||
} | ||
|
||
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() == WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED) { | ||
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() == WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED) { | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
logger.trace("skipping monitor because the write load decider is disabled"); | ||
return; | ||
} | ||
|
||
logger.trace("processing new cluster info"); | ||
|
||
boolean reroute = false; | ||
String explanation = ""; | ||
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); | ||
final int numberOfNodes = clusterInfo.getNodeUsageStatsForThreadPools().size(); | ||
final Set<String> nodeIdsExceedingLatencyThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes); | ||
final Set<String> nodeIdsBelowUtilizationThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes); | ||
clusterInfo.getNodeUsageStatsForThreadPools().forEach((nodeId, usageStats) -> { | ||
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = usageStats.threadPoolUsageStatsMap() | ||
.get(ThreadPool.Names.WRITE); | ||
DiannaHohensee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() > writeLoadConstraintSettings.getQueueLatencyThreshold().millis()) { | ||
nodeIdsExceedingLatencyThreshold.add(nodeId); | ||
} | ||
if (writeThreadPoolStats.averageThreadPoolUtilization() < writeLoadConstraintSettings.getHighUtilizationThreshold() | ||
.getAsPercent()) { | ||
nodeIdsBelowUtilizationThreshold.add(nodeId); | ||
} | ||
}); | ||
|
||
if (nodeIdsExceedingLatencyThreshold.isEmpty()) { | ||
logger.debug("No nodes exceeding latency threshold"); | ||
DiannaHohensee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return; | ||
} | ||
|
||
// Remove any over-threshold nodes that already have shards relocating away | ||
|
||
final RoutingNodes routingNodes = state.getRoutingNodes(); | ||
nodeIdsExceedingLatencyThreshold.removeIf( | ||
nodeId -> routingNodes.node(nodeId).numberOfShardsWithState(ShardRoutingState.RELOCATING) > 0 | ||
|
||
); | ||
|
||
// TODO (ES-11992): implement | ||
if (nodeIdsExceedingLatencyThreshold.isEmpty()) { | ||
logger.debug("All nodes over threshold have relocation in progress"); | ||
return; | ||
} | ||
|
||
if (reroute) { | ||
logger.debug("rerouting shards: [{}]", explanation); | ||
rerouteService.reroute("disk threshold monitor", Priority.NORMAL, ActionListener.wrap(ignored -> { | ||
final var reroutedClusterState = clusterStateSupplier.get(); | ||
if (Sets.difference(nodeIdsBelowUtilizationThreshold, nodeIdsExceedingLatencyThreshold).isEmpty()) { | ||
|
||
logger.debug("No nodes below utilization threshold that are not exceeding latency threshold"); | ||
return; | ||
} | ||
|
||
// TODO (ES-11992): implement | ||
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); | ||
final long timeSinceLastRerouteMillis = currentTimeMillis - lastRerouteTimeMillis; | ||
final boolean haveCalledRerouteRecently = timeSinceLastRerouteMillis < writeLoadConstraintSettings.getMinimumRerouteInterval() | ||
DiannaHohensee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.millis(); | ||
|
||
}, e -> logger.debug("reroute failed", e))); | ||
if (haveCalledRerouteRecently == false | ||
|| Sets.difference(nodeIdsExceedingLatencyThreshold, lastSetOfHotSpottedNodes).isEmpty() == false) { | ||
mhl-b marked this conversation as resolved.
Show resolved
Hide resolved
|
||
callReroute(nodeIdsExceedingLatencyThreshold); | ||
} else { | ||
logger.trace("no reroute required"); | ||
logger.debug("Not calling reroute because we called reroute recently and there are no new hot spots"); | ||
} | ||
} | ||
|
||
private void callReroute(Set<String> hotSpottedNodes) { | ||
final String reason = Strings.format( | ||
"write load constraint monitor: Found %d node(s) exceeding the write thread pool queue latency threshold", | ||
|
||
hotSpottedNodes.size() | ||
); | ||
rerouteService.reroute( | ||
reason, | ||
Priority.NORMAL, | ||
ActionListener.wrap( | ||
ignored -> logger.trace("{} reroute successful", reason), | ||
e -> logger.debug(() -> Strings.format("reroute failed, reason: %s", reason), e) | ||
) | ||
); | ||
lastRerouteTimeMillis = currentTimeMillisSupplier.getAsLong(); | ||
lastSetOfHotSpottedNodes = hotSpottedNodes; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a record and these looked like the default equals/hashCode/toString