Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 3 additions & 24 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
Expand Down Expand Up @@ -45,7 +46,7 @@
* <code>InternalClusterInfoService.shardIdentifierFromRouting(String)</code>
* for the key used in the shardSizes map
*/
public class ClusterInfo implements ChunkedToXContent, Writeable {
public class ClusterInfo implements ChunkedToXContent, Writeable, ExpectedShardSizeEstimator.ShardSizeProvider {

public static final ClusterInfo EMPTY = new ClusterInfo();

Expand Down Expand Up @@ -289,33 +290,11 @@ public Map<ShardId, Double> getShardWriteLoads() {
/**
* Returns the shard size for the given shardId or <code>null</code> if that metric is not available.
*/
@Override
public Long getShardSize(ShardId shardId, boolean primary) {
return shardSizes.get(shardIdentifierFromRouting(shardId, primary));
}

/**
* Returns the shard size for the given shard routing or <code>null</code> if that metric is not available.
*/
public Long getShardSize(ShardRouting shardRouting) {
return getShardSize(shardRouting.shardId(), shardRouting.primary());
}

/**
* Returns the shard size for the given shard routing or <code>defaultValue</code> it that metric is not available.
*/
public long getShardSize(ShardRouting shardRouting, long defaultValue) {
Long shardSize = getShardSize(shardRouting);
return shardSize == null ? defaultValue : shardSize;
}

/**
* Returns the shard size for the given shard routing or <code>defaultValue</code> it that metric is not available.
*/
public long getShardSize(ShardId shardId, boolean primary, long defaultValue) {
Long shardSize = getShardSize(shardId, primary);
return shardSize == null ? defaultValue : shardSize;
}

/**
* Returns the nodes absolute data-path the given shard is allocated on or <code>null</code> if the information is not available.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void simulateShardStarted(ShardRouting shard) {
var size = getExpectedShardSize(
shard,
shard.getExpectedShardSize(),
getClusterInfo(),
(shardId, primary) -> shardSizes.get(shardIdentifierFromRouting(shardId, primary)),
allocation.snapshotShardSizeInfo(),
project,
allocation.routingTable(project.id())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
Expand All @@ -22,6 +21,36 @@

public class ExpectedShardSizeEstimator {

public interface ShardSizeProvider {
/**
* Returns the shard size for the given shardId or <code>null</code> if that metric is not available.
*/
Long getShardSize(ShardId shardId, boolean primary);

/**
* Returns the shard size for the given shard routing or <code>null</code> if that metric is not available.
*/
default Long getShardSize(ShardRouting shardRouting) {
return getShardSize(shardRouting.shardId(), shardRouting.primary());
}

/**
* Returns the shard size for the given shard routing or <code>defaultValue</code> it that metric is not available.
*/
default long getShardSize(ShardRouting shardRouting, long defaultValue) {
final var shardSize = getShardSize(shardRouting);
return shardSize == null ? defaultValue : shardSize;
}

/**
* Returns the shard size for the given shard routing or <code>defaultValue</code> it that metric is not available.
*/
default long getShardSize(ShardId shardId, boolean primary, long defaultValue) {
final var shardSize = getShardSize(shardId, primary);
return shardSize == null ? defaultValue : shardSize;
}
}

public static boolean shouldReserveSpaceForInitializingShard(ShardRouting shard, RoutingAllocation allocation) {
return shouldReserveSpaceForInitializingShard(shard, allocation.metadata());
}
Expand Down Expand Up @@ -69,7 +98,7 @@ public static boolean shouldReserveSpaceForInitializingShard(ShardRouting shard,
public static long getExpectedShardSize(
ShardRouting shard,
long defaultValue,
ClusterInfo clusterInfo,
ShardSizeProvider shardSizeProvider,
SnapshotShardSizeInfo snapshotShardSizeInfo,
ProjectMetadata projectMetadata,
RoutingTable routingTable
Expand All @@ -79,15 +108,15 @@ public static long getExpectedShardSize(
&& shard.active() == false
&& shard.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert shard.primary() : "All replica shards are recovering from " + RecoverySource.Type.PEER;
return getExpectedSizeOfResizedShard(shard, defaultValue, indexMetadata, clusterInfo, projectMetadata, routingTable);
return getExpectedSizeOfResizedShard(shard, defaultValue, indexMetadata, shardSizeProvider, projectMetadata, routingTable);
} else if (shard.active() == false && shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
assert shard.primary() : "All replica shards are recovering from " + RecoverySource.Type.PEER;
return snapshotShardSizeInfo.getShardSize(shard, defaultValue);
} else {
var shardSize = clusterInfo.getShardSize(shard.shardId(), shard.primary());
var shardSize = shardSizeProvider.getShardSize(shard.shardId(), shard.primary());
if (shardSize == null && shard.primary() == false) {
// derive replica size from corresponding primary
shardSize = clusterInfo.getShardSize(shard.shardId(), true);
shardSize = shardSizeProvider.getShardSize(shard.shardId(), true);
}
return shardSize == null ? defaultValue : shardSize;
}
Expand All @@ -97,7 +126,7 @@ private static long getExpectedSizeOfResizedShard(
ShardRouting shard,
long defaultValue,
IndexMetadata indexMetadata,
ClusterInfo clusterInfo,
ShardSizeProvider shardSizeProvider,
ProjectMetadata projectMetadata,
RoutingTable routingTable
) {
Expand All @@ -120,7 +149,7 @@ private static long getExpectedSizeOfResizedShard(
for (int i = 0; i < indexRoutingTable.size(); i++) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(i);
if (shardIds.contains(shardRoutingTable.shardId())) {
targetShardSize += clusterInfo.getShardSize(shardRoutingTable.primaryShard(), 0);
targetShardSize += shardSizeProvider.getShardSize(shardRoutingTable.primaryShard(), 0);
}
}
}
Expand Down