Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EstimatedHeapUsage;
import org.elasticsearch.cluster.EstimatedHeapUsageCollector;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.ShardHeapUsage;
import org.elasticsearch.cluster.ShardHeapUsageCollector;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
Expand Down Expand Up @@ -123,7 +123,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(InternalSettingsPlugin.class, BogusShardHeapUsagePlugin.class);
return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class);
}

public void testLockTryingToDelete() throws Exception {
Expand Down Expand Up @@ -264,31 +264,31 @@ public void testExpectedShardSizeIsPresent() throws InterruptedException {
public void testHeapUsageEstimateIsPresent() {
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
ClusterInfoServiceUtils.refresh(clusterInfoService);
Map<String, ShardHeapUsage> shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages();
assertNotNull(shardHeapUsages);
Map<String, EstimatedHeapUsage> estimatedHeapUsages = clusterInfoService.getClusterInfo().getEstimatedHeapUsages();
assertNotNull(estimatedHeapUsages);
// Not collecting yet because it is disabled
assertTrue(shardHeapUsages.isEmpty());
assertTrue(estimatedHeapUsages.isEmpty());

// Enable collection for shard heap usages
// Enable collection for estimated heap usages
updateClusterSettings(
Settings.builder()
.put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true)
.put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true)
.build()
);
try {
ClusterInfoServiceUtils.refresh(clusterInfoService);
ClusterState state = getInstanceFromNode(ClusterService.class).state();
shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages();
assertEquals(state.nodes().size(), shardHeapUsages.size());
estimatedHeapUsages = clusterInfoService.getClusterInfo().getEstimatedHeapUsages();
assertEquals(state.nodes().size(), estimatedHeapUsages.size());
for (DiscoveryNode node : state.nodes()) {
assertTrue(shardHeapUsages.containsKey(node.getId()));
ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId());
assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes()));
assertTrue(estimatedHeapUsages.containsKey(node.getId()));
EstimatedHeapUsage estimatedHeapUsage = estimatedHeapUsages.get(node.getId());
assertThat(estimatedHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(estimatedHeapUsage.totalBytes()));
}
} finally {
updateClusterSettings(
Settings.builder()
.putNull(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey())
.putNull(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey())
.build()
);
}
Expand Down Expand Up @@ -838,11 +838,11 @@ private static void assertAllIndicesRemovedAndDeletionCompleted(Iterable<Indices
}
}

public static class BogusShardShardHeapUsageCollector implements ShardHeapUsageCollector {
public static class BogusEstimatedEstimatedHeapUsageCollector implements EstimatedHeapUsageCollector {

private final BogusShardHeapUsagePlugin plugin;
private final BogusEstimatedHeapUsagePlugin plugin;

public BogusShardShardHeapUsageCollector(BogusShardHeapUsagePlugin plugin) {
public BogusEstimatedEstimatedHeapUsageCollector(BogusEstimatedHeapUsagePlugin plugin) {
this.plugin = plugin;
}

Expand All @@ -859,7 +859,7 @@ public void collectClusterHeapUsage(ActionListener<Map<String, Long>> listener)
}
}

public static class BogusShardHeapUsagePlugin extends Plugin implements ClusterPlugin {
public static class BogusEstimatedHeapUsagePlugin extends Plugin implements ClusterPlugin {

private final SetOnce<ClusterService> clusterService = new SetOnce<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
# License v3.0 only", or the "Server Side Public License, v 1".
#

org.elasticsearch.index.shard.IndexShardIT$BogusShardShardHeapUsageCollector
org.elasticsearch.index.shard.IndexShardIT$BogusEstimatedEstimatedHeapUsageCollector
20 changes: 10 additions & 10 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
final Map<ShardId, Long> shardDataSetSizes;
final Map<NodeAndShard, String> dataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, ShardHeapUsage> shardHeapUsages;
final Map<String, EstimatedHeapUsage> estimatedHeapUsages;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
Expand All @@ -72,7 +72,7 @@ protected ClusterInfo() {
* @param shardDataSetSizes a shard id to data set size in bytes mapping per shard
* @param dataPath the shard routing to datapath mapping
* @param reservedSpace reserved space per shard broken down by node and data path
* @param shardHeapUsages shard heap usage broken down by node
* @param estimatedHeapUsages estimated heap usage broken down by node
* @see #shardIdentifierFromRouting
*/
public ClusterInfo(
Expand All @@ -82,15 +82,15 @@ public ClusterInfo(
Map<ShardId, Long> shardDataSetSizes,
Map<NodeAndShard, String> dataPath,
Map<NodeAndPath, ReservedSpace> reservedSpace,
Map<String, ShardHeapUsage> shardHeapUsages
Map<String, EstimatedHeapUsage> estimatedHeapUsages
) {
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
this.shardSizes = Map.copyOf(shardSizes);
this.shardDataSetSizes = Map.copyOf(shardDataSetSizes);
this.dataPath = Map.copyOf(dataPath);
this.reservedSpace = Map.copyOf(reservedSpace);
this.shardHeapUsages = Map.copyOf(shardHeapUsages);
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages);
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -103,9 +103,9 @@ public ClusterInfo(StreamInput in) throws IOException {
: in.readImmutableMap(nested -> NodeAndShard.from(new ShardRouting(nested)), StreamInput::readString);
this.reservedSpace = in.readImmutableMap(NodeAndPath::new, ReservedSpace::new);
if (in.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
this.shardHeapUsages = in.readImmutableMap(ShardHeapUsage::new);
this.estimatedHeapUsages = in.readImmutableMap(EstimatedHeapUsage::new);
} else {
this.shardHeapUsages = Map.of();
this.estimatedHeapUsages = Map.of();
}
}

Expand All @@ -122,7 +122,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeMap(this.reservedSpace);
if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
out.writeMap(this.shardHeapUsages, StreamOutput::writeWriteable);
out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable);
}
}

Expand Down Expand Up @@ -204,7 +204,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
return builder.endObject(); // NodeAndPath
}),
endArray() // end "reserved_sizes"
// NOTE: We don't serialize shardHeapUsages at this stage, to avoid
// NOTE: We don't serialize estimatedHeapUsages at this stage, to avoid
// committing to API payloads until the feature is settled
);
}
Expand All @@ -216,8 +216,8 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
* Also note that the map may not be complete, it may contain none, or a subset of the nodes in
* the cluster at any time. It may also contain entries for nodes that have since left the cluster.
*/
public Map<String, ShardHeapUsage> getShardHeapUsages() {
return shardHeapUsages;
public Map<String, EstimatedHeapUsage> getEstimatedHeapUsages() {
return estimatedHeapUsages;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ClusterInfoSimulator {
private final CopyOnFirstWriteMap<String, Long> shardSizes;
private final Map<ShardId, Long> shardDataSetSizes;
private final Map<NodeAndShard, String> dataPath;
private final Map<String, ShardHeapUsage> shardHeapUsages;
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;

public ClusterInfoSimulator(RoutingAllocation allocation) {
this.allocation = allocation;
Expand All @@ -42,7 +42,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
this.shardSizes = new CopyOnFirstWriteMap<>(allocation.clusterInfo().shardSizes);
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
this.shardHeapUsages = allocation.clusterInfo().getShardHeapUsages();
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
}

/**
Expand Down Expand Up @@ -156,7 +156,7 @@ public ClusterInfo getClusterInfo() {
shardDataSetSizes,
dataPath,
Map.of(),
shardHeapUsages
estimatedHeapUsages
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
/**
* Record representing an estimate of the heap used by allocated shards and ongoing merges on a particular node
*/
public record ShardHeapUsage(String nodeId, long totalBytes, long estimatedUsageBytes) implements Writeable {
public record EstimatedHeapUsage(String nodeId, long totalBytes, long estimatedUsageBytes) implements Writeable {

public ShardHeapUsage {
public EstimatedHeapUsage {
assert totalBytes >= 0;
assert estimatedUsageBytes >= 0;
}

public ShardHeapUsage(StreamInput in) throws IOException {
public EstimatedHeapUsage(StreamInput in) throws IOException {
this(in.readString(), in.readVLong(), in.readVLong());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@
import java.util.Map;

/**
* Collect the shard heap usage for each node in the cluster.
* Collect the estimated heap usage for each node in the cluster.
* <p>
* Results are returned as a map of node ID to estimated heap usage in bytes
*
* @see ShardHeapUsage
* @see EstimatedHeapUsage
*/
public interface ShardHeapUsageCollector {
public interface EstimatedHeapUsageCollector {

/**
* This will be used when there is no ShardHeapUsageCollector available
* This will be used when there is no EstimatedHeapUsageCollector available
*/
ShardHeapUsageCollector EMPTY = listener -> listener.onResponse(Map.of());
EstimatedHeapUsageCollector EMPTY = listener -> listener.onResponse(Map.of());

/**
* Collect the shard heap usage for every node in the cluster
* Collect the estimated heap usage for every node in the cluster
*
* @param listener The listener which will receive the results
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,22 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
Property.NodeScope
);

public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED = Setting.boolSetting(
"cluster.routing.allocation.shard_heap.threshold_enabled",
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED = Setting.boolSetting(
"cluster.routing.allocation.estimated_heap.threshold_enabled",
false,
Property.Dynamic,
Property.NodeScope
);

private volatile boolean diskThresholdEnabled;
private volatile boolean shardHeapThresholdEnabled;
private volatile boolean estimatedHeapThresholdEnabled;
private volatile TimeValue updateFrequency;
private volatile TimeValue fetchTimeout;

private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile Map<String, ByteSizeValue> maxHeapPerNode;
private volatile Map<String, Long> shardHeapUsagePerNode;
private volatile Map<String, Long> estimatedHeapUsagePerNode;
private volatile IndicesStatsSummary indicesStatsSummary;

private final ThreadPool threadPool;
Expand All @@ -107,7 +107,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt

private final Object mutex = new Object();
private final List<ActionListener<ClusterInfo>> nextRefreshListeners = new ArrayList<>();
private final ShardHeapUsageCollector shardHeapUsageCollector;
private final EstimatedHeapUsageCollector estimatedHeapUsageCollector;

private AsyncRefresh currentRefresh;
private RefreshScheduler refreshScheduler;
Expand All @@ -118,16 +118,16 @@ public InternalClusterInfoService(
ClusterService clusterService,
ThreadPool threadPool,
Client client,
ShardHeapUsageCollector shardHeapUsageCollector
EstimatedHeapUsageCollector estimatedHeapUsageCollector
) {
this.leastAvailableSpaceUsages = Map.of();
this.mostAvailableSpaceUsages = Map.of();
this.maxHeapPerNode = Map.of();
this.shardHeapUsagePerNode = Map.of();
this.estimatedHeapUsagePerNode = Map.of();
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.threadPool = threadPool;
this.client = client;
this.shardHeapUsageCollector = shardHeapUsageCollector;
this.estimatedHeapUsageCollector = estimatedHeapUsageCollector;
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings);
this.diskThresholdEnabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
Expand All @@ -139,17 +139,17 @@ public InternalClusterInfoService(
this::setDiskThresholdEnabled
);
clusterSettings.initializeAndWatch(
CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED,
this::setShardHeapThresholdEnabled
CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED,
this::setEstimatedHeapThresholdEnabled
);
}

private void setDiskThresholdEnabled(boolean diskThresholdEnabled) {
this.diskThresholdEnabled = diskThresholdEnabled;
}

private void setShardHeapThresholdEnabled(boolean shardHeapThresholdEnabled) {
this.shardHeapThresholdEnabled = shardHeapThresholdEnabled;
private void setEstimatedHeapThresholdEnabled(boolean estimatedHeapThresholdEnabled) {
this.estimatedHeapThresholdEnabled = estimatedHeapThresholdEnabled;
}

private void setFetchTimeout(TimeValue fetchTimeout) {
Expand Down Expand Up @@ -202,8 +202,8 @@ void execute() {

try (var ignoredRefs = fetchRefs) {
maybeFetchIndicesStats(diskThresholdEnabled);
maybeFetchNodeStats(diskThresholdEnabled || shardHeapThresholdEnabled);
maybeFetchNodesHeapUsage(shardHeapThresholdEnabled);
maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled);
maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled);
}
}

Expand Down Expand Up @@ -231,28 +231,28 @@ private void maybeFetchNodeStats(boolean shouldFetch) {
}
}

private void maybeFetchNodesHeapUsage(boolean shouldFetch) {
private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) {
if (shouldFetch) {
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchNodesHeapUsage();
fetchNodesEstimatedHeapUsage();
}
} else {
logger.trace("skipping collecting shard heap usage from cluster, notifying listeners with empty shard heap usage");
shardHeapUsagePerNode = Map.of();
logger.trace("skipping collecting estimated heap usage from cluster, notifying listeners with empty estimated heap usage");
estimatedHeapUsagePerNode = Map.of();
}
}

private void fetchNodesHeapUsage() {
shardHeapUsageCollector.collectClusterHeapUsage(ActionListener.releaseAfter(new ActionListener<>() {
private void fetchNodesEstimatedHeapUsage() {
estimatedHeapUsageCollector.collectClusterHeapUsage(ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(Map<String, Long> currentShardHeapUsages) {
shardHeapUsagePerNode = currentShardHeapUsages;
public void onResponse(Map<String, Long> currentEstimatedHeapUsages) {
estimatedHeapUsagePerNode = currentEstimatedHeapUsages;
}

@Override
public void onFailure(Exception e) {
logger.warn("failed to fetch heap usage for nodes", e);
shardHeapUsagePerNode = Map.of();
estimatedHeapUsagePerNode = Map.of();
}
}, fetchRefs.acquire()));
}
Expand Down Expand Up @@ -479,11 +479,11 @@ private boolean shouldRefresh() {
@Override
public ClusterInfo getClusterInfo() {
final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read
final Map<String, ShardHeapUsage> shardHeapUsages = new HashMap<>();
final Map<String, EstimatedHeapUsage> estimatedHeapUsages = new HashMap<>();
maxHeapPerNode.forEach((nodeId, maxHeapSize) -> {
final Long estimatedHeapUsage = shardHeapUsagePerNode.get(nodeId);
final Long estimatedHeapUsage = estimatedHeapUsagePerNode.get(nodeId);
if (estimatedHeapUsage != null) {
shardHeapUsages.put(nodeId, new ShardHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage));
estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage));
}
});
return new ClusterInfo(
Expand All @@ -493,7 +493,7 @@ public ClusterInfo getClusterInfo() {
indicesStatsSummary.shardDataSetSizes,
indicesStatsSummary.dataPath,
indicesStatsSummary.reservedSpace,
shardHeapUsages
estimatedHeapUsages
);
}

Expand Down
Loading