Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() {
updateIndexSettings(indexSettings);
}

private void updateClusterSettings(Settings settings) {
@Override
protected void updateClusterSettings(Settings settings) {
clusterAdmin().updateSettings(
new ClusterUpdateSettingsRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).persistentSettings(settings)
).actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,33 @@ public void testExpectedShardSizeIsPresent() throws InterruptedException {
public void testHeapUsageEstimateIsPresent() {
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
ClusterInfoServiceUtils.refresh(clusterInfoService);
ClusterState state = getInstanceFromNode(ClusterService.class).state();
Map<String, ShardHeapUsage> shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages();
assertNotNull(shardHeapUsages);
assertEquals(state.nodes().size(), shardHeapUsages.size());
for (DiscoveryNode node : state.nodes()) {
assertTrue(shardHeapUsages.containsKey(node.getId()));
ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId());
assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes()));
// Not collecting yet because it is disabled
assertTrue(shardHeapUsages.isEmpty());

// Enable collection for shard heap usages
updateClusterSettings(
Settings.builder()
.put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true)
.build()
);
try {
ClusterInfoServiceUtils.refresh(clusterInfoService);
ClusterState state = getInstanceFromNode(ClusterService.class).state();
shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages();
assertEquals(state.nodes().size(), shardHeapUsages.size());
for (DiscoveryNode node : state.nodes()) {
assertTrue(shardHeapUsages.containsKey(node.getId()));
ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId());
assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes()));
}
} finally {
updateClusterSettings(
Settings.builder()
.putNull(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey())
.build()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ClusterInfoSimulator {
private final CopyOnFirstWriteMap<String, Long> shardSizes;
private final Map<ShardId, Long> shardDataSetSizes;
private final Map<NodeAndShard, String> dataPath;
private final Map<String, ShardHeapUsage> shardHeapUsages;

public ClusterInfoSimulator(RoutingAllocation allocation) {
this.allocation = allocation;
Expand All @@ -41,6 +42,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
this.shardSizes = new CopyOnFirstWriteMap<>(allocation.clusterInfo().shardSizes);
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
this.shardHeapUsages = allocation.clusterInfo().getShardHeapUsages();
}

/**
Expand Down Expand Up @@ -154,7 +156,7 @@ public ClusterInfo getClusterInfo() {
shardDataSetSizes,
dataPath,
Map.of(),
Map.of()
shardHeapUsages
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,15 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
Property.NodeScope
);

public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED = Setting.boolSetting(
"cluster.routing.allocation.shard_heap.threshold_enabled",
false,
Property.Dynamic,
Property.NodeScope
);

private volatile boolean diskThresholdEnabled;
private volatile boolean shardHeapThresholdEnabled;
private volatile TimeValue updateFrequency;
private volatile TimeValue fetchTimeout;

Expand Down Expand Up @@ -130,12 +138,20 @@ public InternalClusterInfoService(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
this::setDiskThresholdEnabled
);
clusterSettings.initializeAndWatch(
CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED,
this::setShardHeapThresholdEnabled
);
}

private void setDiskThresholdEnabled(boolean diskThresholdEnabled) {
this.diskThresholdEnabled = diskThresholdEnabled;
}

private void setShardHeapThresholdEnabled(boolean shardHeapThresholdEnabled) {
this.shardHeapThresholdEnabled = shardHeapThresholdEnabled;
}

private void setFetchTimeout(TimeValue fetchTimeout) {
this.fetchTimeout = fetchTimeout;
}
Expand Down Expand Up @@ -193,11 +209,25 @@ void execute() {
logger.trace("skipping collecting disk usage info from cluster, notifying listeners with empty cluster info");
indicesStatsSummary = IndicesStatsSummary.EMPTY;
}
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchNodeStats();

if (diskThresholdEnabled || shardHeapThresholdEnabled) {
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchNodeStats();
}
} else {
logger.trace("skipping collecting node stats from cluster, notifying listeners with empty node stats");
leastAvailableSpaceUsages = Map.of();
mostAvailableSpaceUsages = Map.of();
maxHeapPerNode = Map.of();
}
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchNodesHeapUsage();

if (shardHeapThresholdEnabled) {
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchNodesHeapUsage();
}
} else {
logger.trace("skipping collecting shard heap usage from cluster, notifying listeners with empty shard heap usage");
shardHeapUsagePerNode = Map.of();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This complexity looks a bit nasty, but I can't think of a way to make it nicer.

I wonder if it would look better if we made the decisions up front e.g.

var fetchIndicesStats = diskThresholdEnabled;
var fetchNodeStates = diskThresholdEnabled || shardThresholdEnabled;
var fetchShardHeapUsage = shardThresholdEnaled;

probably wouldn't make much difference.

What about if we had something like...

enum Requirement {
   NodeStats,
   IndicesStats,
   ShardHeapUsage
}

record Feature(Set<Requirement> requirements) {
}

Feature DISK_THRESHOLD = new Feature(Set.of(NodeStats, IndicesStats));
Feature SHARD_HEAP = new Feature(Set.if(NodeStats, ShardHeapUsage));

// then

    // ...
    Set<Requirement> requirements = enabledFeatures.stream().flatMap(feat -> feat.requirements().stream()).toSet();
    // ...
    if (requirements.contains(NodeStats) {
       try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
             fetchNodeStats();
       }
    }

You could argue its overkill now, but if we keep doing this stuff it'll make things easier to read.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the 2nd suggestion. At this point, it seems adding to the complexity, e.g. it still needs the three if/else statements at the end. Maybe it can be better hidden if Feature somehow handles the fetches internally. But that seems a bit too much for what we need here. So I pushed d3dd95a to extract each if/else branch into its own method. This is somewhat aligned with your first suggestion. Though it does not reduce the complexity, I feel the readability is better since it seems clearer about when a particular stats need to be fetched. Please let me know if that works for you. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that improves the readability, thanks.

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public double estimatedFreeBytesAsPercentage() {
}

public double estimatedUsageAsPercentage() {
return 100.0 * estimatedUsageBytes / (double) totalBytes;
return 100.0 * estimatedUsageAsRatio();
}

public double estimatedUsageAsRatio() {
return estimatedUsageBytes / (double) totalBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED,
SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING,
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING,
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public void testScheduling() {
final DiscoveryNodes noMaster = DiscoveryNodes.builder().add(discoveryNode).localNodeId(discoveryNode.getId()).build();
final DiscoveryNodes localMaster = noMaster.withMasterNodeId(discoveryNode.getId());

final Settings.Builder settingsBuilder = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName());
final Settings.Builder settingsBuilder = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName())
.put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true);
if (randomBoolean()) {
settingsBuilder.put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), randomIntBetween(10000, 60000) + "ms");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,4 +536,8 @@ protected final void deletePipeline(String id) {
)
);
}

protected void updateClusterSettings(Settings settings) {
safeGet(clusterAdmin().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(settings).execute());
}
}