Skip to content

Commit 69e2512

Browse files
authored
Allow allocating clones over low watermark (#137399) (#137427)
A new shard which is the result of a clone operation starts out on the same node as the shard being cloned and takes no additional space due to hard-linking, so we can safely allocate it even if the node is over the low watermark, and skip the checks that consider the additional space taken by the shard. Closes ES-13384
1 parent 8f091a5 commit 69e2512

File tree

4 files changed

+104
-14
lines changed

4 files changed

+104
-14
lines changed

docs/changelog/137399.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 137399
2+
summary: Allow allocating clones over low watermark
3+
area: Allocation
4+
type: bug
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@
1212
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
1313
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
1414
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
15+
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
16+
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
17+
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
1518
import org.elasticsearch.action.admin.indices.stats.ShardStats;
1619
import org.elasticsearch.action.support.ActionTestUtils;
20+
import org.elasticsearch.action.support.ActiveShardCount;
21+
import org.elasticsearch.action.support.SubscribableListener;
1722
import org.elasticsearch.cluster.ClusterInfoService;
1823
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
1924
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
2025
import org.elasticsearch.cluster.InternalClusterInfoService;
26+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2127
import org.elasticsearch.cluster.routing.IndexRoutingTable;
2228
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2329
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -53,6 +59,7 @@
5359
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
5460
import static org.elasticsearch.index.store.Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING;
5561
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
62+
import static org.hamcrest.Matchers.allOf;
5663
import static org.hamcrest.Matchers.contains;
5764
import static org.hamcrest.Matchers.empty;
5865
import static org.hamcrest.Matchers.equalTo;
@@ -103,6 +110,62 @@ public void testHighWatermarkNotExceeded() throws Exception {
103110
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getSmallestShardIds())));
104111
}
105112

113+
public void testAllocateCloneIgnoresLowWatermark() throws Exception {
114+
final var lowWatermarkBytes = randomLongBetween(WATERMARK_BYTES + 1, WATERMARK_BYTES * 5);
115+
116+
internalCluster().startMasterOnlyNode(
117+
Settings.builder()
118+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), lowWatermarkBytes + "b")
119+
.build()
120+
);
121+
final var dataNodeName = internalCluster().startDataOnlyNode();
122+
ensureStableCluster(2);
123+
124+
final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService();
125+
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
126+
ClusterInfoServiceUtils.refresh(clusterInfoService);
127+
});
128+
129+
final var sourceIndexName = "source-" + randomIdentifier();
130+
createIndex(sourceIndexName, indexSettings(1, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build());
131+
final var shardSizes = createReasonableSizedShards(sourceIndexName);
132+
133+
updateIndexSettings(Settings.builder().put("blocks.write", true), sourceIndexName);
134+
135+
final var totalSpace = randomLongBetween(
136+
/* do not exceed the high watermark */
137+
shardSizes.getSmallestShardSize() + WATERMARK_BYTES + 1,
138+
/* but make it so that naively duplicating the shard would exceed the low watermark, or else it's not a meaningful test */
139+
2 * shardSizes.getSmallestShardSize() + lowWatermarkBytes
140+
);
141+
142+
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
143+
refreshDiskUsage();
144+
145+
final var targetIndexName = "target-" + randomIdentifier();
146+
final var resizeRequest = new ResizeRequest(targetIndexName, sourceIndexName);
147+
resizeRequest.setResizeType(ResizeType.CLONE);
148+
resizeRequest.masterNodeTimeout(TEST_REQUEST_TIMEOUT);
149+
resizeRequest.ackTimeout(TEST_REQUEST_TIMEOUT);
150+
resizeRequest.setWaitForActiveShards(ActiveShardCount.ALL);
151+
resizeRequest.getTargetIndexRequest()
152+
.settings(
153+
Settings.builder().put(resizeRequest.getTargetIndexRequest().settings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
154+
);
155+
156+
safeAwait(
157+
SubscribableListener.<CreateIndexResponse>newForked(l -> indicesAdmin().resizeIndex(resizeRequest, l))
158+
.andThenAccept(
159+
createIndexResponse -> assertThat(
160+
true,
161+
allOf(equalTo(createIndexResponse.isAcknowledged()), equalTo(createIndexResponse.isShardsAcknowledged()))
162+
)
163+
)
164+
);
165+
166+
ensureGreen(sourceIndexName, targetIndexName);
167+
}
168+
106169
public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception {
107170
internalCluster().startMasterOnlyNode();
108171
internalCluster().startDataOnlyNode();

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
219219
}
220220

221221
// flag that determines whether the low threshold checks below can be skipped. We use this for a primary shard that is freshly
222-
// allocated and empty.
223-
boolean skipLowThresholdChecks = shardRouting.primary()
222+
// allocated and either empty or the result of cloning another shard.
223+
final var isNewCloneTarget = isNewCloneTarget(shardRouting, allocation);
224+
final var skipLowThresholdChecks = shardRouting.primary()
224225
&& shardRouting.active() == false
225-
&& shardRouting.recoverySource().getType() == RecoverySource.Type.EMPTY_STORE;
226+
&& (isNewCloneTarget || shardRouting.recoverySource().getType() == RecoverySource.Type.EMPTY_STORE);
226227

227228
if (freeBytes < diskThresholdSettings.getFreeBytesThresholdLowStage(total).getBytes()) {
228229
if (skipLowThresholdChecks == false) {
@@ -283,6 +284,18 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
283284
}
284285

285286
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
287+
288+
if (isNewCloneTarget) {
289+
// The clone will be a hard-linked copy of the original shard so will not meaningfully increase disk usage
290+
return allocation.decision(
291+
Decision.YES,
292+
NAME,
293+
"enough disk for freshly-cloned shard on node, free: [%s], used: [%s]",
294+
freeBytesValue,
295+
Strings.format1Decimals(usedDiskPercentage, "%")
296+
);
297+
}
298+
286299
final long shardSize = getExpectedShardSize(shardRouting, 0L, allocation);
287300
assert shardSize >= 0 : shardSize;
288301
long freeBytesAfterShard = freeBytes - shardSize;
@@ -326,6 +339,23 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
326339
);
327340
}
328341

342+
private static boolean isNewCloneTarget(ShardRouting shardRouting, RoutingAllocation allocation) {
343+
if (shardRouting.unassigned() == false
344+
|| shardRouting.primary() == false
345+
|| shardRouting.recoverySource() != RecoverySource.LocalShardsRecoverySource.INSTANCE) {
346+
return false;
347+
}
348+
349+
final var targetMetadata = allocation.metadata().indexMetadata(shardRouting.index());
350+
final var sourceIndex = targetMetadata.getResizeSourceIndex();
351+
if (sourceIndex == null) {
352+
return false;
353+
}
354+
355+
final var sourceMetadata = allocation.metadata().indexMetadata(sourceIndex);
356+
return sourceMetadata != null && sourceMetadata.getNumberOfShards() == targetMetadata.getNumberOfShards();
357+
}
358+
329359
@Override
330360
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
331361
Map<String, DiskUsage> usages = allocation.clusterInfo().getNodeMostAvailableDiskUsages();

x-pack/plugin/autoscaling/src/internalClusterTest/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageIT.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.node.Node;
2727
import org.elasticsearch.test.ESIntegTestCase;
2828
import org.elasticsearch.test.NodeRoles;
29-
import org.elasticsearch.test.junit.annotations.TestLogging;
3029
import org.elasticsearch.transport.TransportService;
3130
import org.elasticsearch.xpack.autoscaling.action.GetAutoscalingCapacityAction;
3231
import org.elasticsearch.xpack.autoscaling.action.PutAutoscalingPolicyAction;
@@ -400,13 +399,7 @@ public void testScaleWhileShrinking() throws Exception {
400399
);
401400
}
402401

403-
@TestLogging(
404-
reason = "Debugging https://github.com/elastic/elasticsearch/issues/96764",
405-
value = "org.elasticsearch.cluster.InternalClusterInfoService:TRACE"
406-
+ ",org.elasticsearch.xpack.autoscaling.action:TRACE"
407-
+ ",org.elasticsearch.cluster.routing.allocation:DEBUG"
408-
)
409-
public void testScaleDuringSplitOrClone() throws Exception {
402+
public void testScaleDuringSplit() throws Exception {
410403
internalCluster().startMasterOnlyNode();
411404
final String dataNode1Name = internalCluster().startDataOnlyNode();
412405

@@ -460,9 +453,8 @@ public void testScaleDuringSplitOrClone() throws Exception {
460453
);
461454

462455
updateIndexSettings(Settings.builder().put("index.blocks.write", true), indexName);
463-
ResizeType resizeType = randomFrom(ResizeType.CLONE, ResizeType.SPLIT);
464456
String cloneName = "clone-" + indexName;
465-
int resizedShardCount = resizeType == ResizeType.CLONE ? 1 : between(2, 10);
457+
int resizedShardCount = between(2, 10);
466458
assertAcked(
467459
indicesAdmin().prepareResizeIndex(indexName, cloneName)
468460
.setSettings(
@@ -472,7 +464,7 @@ public void testScaleDuringSplitOrClone() throws Exception {
472464
.build()
473465
)
474466
.setWaitForActiveShards(ActiveShardCount.NONE)
475-
.setResizeType(resizeType)
467+
.setResizeType(ResizeType.SPLIT)
476468
);
477469

478470
// * 2 since worst case is no hard links, see DiskThresholdDecider.getExpectedShardSize.

0 commit comments

Comments
 (0)