Skip to content

Commit 65c6ea6

Browse files
authored
[8.18] Fix shard size of initializing restored shard (#126783) (#127169)
* Fix shard size of initializing restored shard (#126783) For shards being restored from a snapshot we use `SnapshotShardSizeInfo` to track their sizes while they're unassigned, and then use `ShardRouting#expectedShardSize` when they start to recover. However we were incorrectly ignoring the `ShardRouting#expectedShardSize` value when accounting for the movements of shards in the `ClusterInfoSimulator`, which would sometimes cause us to assign more shards to a node than its disk space should have allowed. Closes #105331 * Backport utils from 4009599 * Missing throws
1 parent cbf36bf commit 65c6ea6

File tree

4 files changed

+123
-32
lines changed

4 files changed

+123
-32
lines changed

docs/changelog/126783.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 126783
2+
summary: Fix shard size of initializing restored shard
3+
area: Allocation
4+
type: bug
5+
issues:
6+
- 105331

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
1414
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
1515
import org.elasticsearch.action.admin.indices.stats.ShardStats;
16+
import org.elasticsearch.action.support.ActionTestUtils;
1617
import org.elasticsearch.cluster.ClusterInfoService;
1718
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
1819
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
@@ -34,15 +35,16 @@
3435
import org.elasticsearch.snapshots.RestoreInfo;
3536
import org.elasticsearch.snapshots.SnapshotInfo;
3637
import org.elasticsearch.snapshots.SnapshotState;
38+
import org.elasticsearch.test.ClusterServiceUtils;
3739
import org.elasticsearch.test.ESIntegTestCase;
38-
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
3940
import org.hamcrest.Matcher;
4041

4142
import java.util.Arrays;
4243
import java.util.Comparator;
4344
import java.util.HashSet;
4445
import java.util.List;
4546
import java.util.Set;
47+
import java.util.concurrent.CountDownLatch;
4648
import java.util.concurrent.TimeUnit;
4749
import java.util.concurrent.atomic.AtomicBoolean;
4850

@@ -54,8 +56,10 @@
5456
import static org.hamcrest.Matchers.contains;
5557
import static org.hamcrest.Matchers.empty;
5658
import static org.hamcrest.Matchers.equalTo;
59+
import static org.hamcrest.Matchers.hasSize;
5760
import static org.hamcrest.Matchers.in;
5861
import static org.hamcrest.Matchers.is;
62+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
5963

6064
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
6165
public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
@@ -163,20 +167,10 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
163167
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getSmallestShardIds())));
164168
}
165169

166-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105331")
167-
@TestIssueLogging(
168-
value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer:TRACE,"
169-
+ "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler:DEBUG,"
170-
+ "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:TRACE,"
171-
+ "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator:TRACE,"
172-
+ "org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders:TRACE,"
173-
+ "org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider:TRACE",
174-
issueUrl = "https://github.com/elastic/elasticsearch/issues/105331"
175-
)
176-
public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards() throws Exception {
170+
public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleRestores() throws Exception {
177171
internalCluster().startMasterOnlyNode();
178-
internalCluster().startDataOnlyNode();
179172
final String dataNodeName = internalCluster().startDataOnlyNode();
173+
internalCluster().startDataOnlyNode();
180174
ensureStableCluster(3);
181175

182176
assertAcked(
@@ -185,26 +179,16 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard
185179
.setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean()))
186180
);
187181

188-
final AtomicBoolean allowRelocations = new AtomicBoolean(true);
189182
final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService();
190-
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
191-
ClusterInfoServiceUtils.refresh(clusterInfoService);
192-
if (allowRelocations.get() == false) {
193-
assertThat(
194-
"Expects no relocating shards but got: " + event.state().getRoutingNodes(),
195-
numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING),
196-
equalTo(0)
197-
);
198-
}
199-
});
200-
201-
final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
183+
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
184+
.addListener(event -> ClusterInfoServiceUtils.refresh(clusterInfoService));
202185

203186
final String indexName = randomIdentifier();
204187
createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build());
205-
var shardSizes = createReasonableSizedShards(indexName);
188+
final var shardSizes = createReasonableSizedShards(indexName);
206189

207190
final CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap")
191+
.setIndices(indexName)
208192
.setWaitForCompletion(true)
209193
.get();
210194
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
@@ -213,21 +197,82 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard
213197

214198
assertAcked(indicesAdmin().prepareDelete(indexName).get());
215199
updateClusterSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString()));
216-
allowRelocations.set(false);
217200

218-
// reduce disk size of node 0 so that only 1 of 2 smallest shards can be allocated
219-
var usableSpace = shardSizes.sizes().get(1).size();
201+
// Verify that from this point on we do not do any rebalancing
202+
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
203+
assertThat(
204+
"Expects no relocating shards but got: " + event.state().getRoutingNodes(),
205+
numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING),
206+
equalTo(0)
207+
);
208+
});
209+
210+
// reduce disk size of one data node so that only one shard copy fits there, forcing all the other shards to be assigned to the
211+
// other data node
212+
final var usableSpace = randomLongBetween(shardSizes.getSmallestShardSize(), shardSizes.getSmallestShardSize() * 2 - 1L);
220213
getTestFileStore(dataNodeName).setTotalSpace(usableSpace + WATERMARK_BYTES);
221214
refreshDiskUsage();
222215

216+
// We're going to restore the index twice in quick succession and verify that we don't assign more than one shard in total to the
217+
// chosen node, but to do this we have to work backwards: first we have to set up listeners to react to events and then finally we
218+
// trigger the whole chain by starting the first restore.
219+
final var copyIndexName = indexName + "-copy";
220+
221+
// set up a listener that explicitly forbids more than one shard to be assigned to the tiny node
222+
final var dataNodeId = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
223+
final var allShardsActiveListener = ClusterServiceUtils.addTemporaryStateListener(cs -> {
224+
assertThat(cs.getRoutingNodes().toString(), cs.getRoutingNodes().node(dataNodeId).size(), lessThanOrEqualTo(1));
225+
var seenCopy = false;
226+
for (final IndexRoutingTable indexRoutingTable : cs.routingTable()) {
227+
if (indexRoutingTable.getIndex().getName().equals(copyIndexName)) {
228+
seenCopy = true;
229+
}
230+
if (indexRoutingTable.allShardsActive() == false) {
231+
return false;
232+
}
233+
}
234+
return seenCopy; // only remove this listener when we've started both restores and all the resulting shards are complete
235+
});
236+
237+
// set up a listener which waits for the shards from the first restore to start initializing and then kick off another restore
238+
final var secondRestoreCompleteLatch = new CountDownLatch(1);
239+
final var secondRestoreStartedListener = ClusterServiceUtils.addTemporaryStateListener(cs -> {
240+
final var indexRoutingTable = cs.routingTable().index(indexName);
241+
if (indexRoutingTable != null && indexRoutingTable.shardsWithState(ShardRoutingState.INITIALIZING).isEmpty() == false) {
242+
clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap")
243+
.setWaitForCompletion(true)
244+
.setRenamePattern(indexName)
245+
.setRenameReplacement(indexName + "-copy")
246+
.execute(ActionTestUtils.assertNoFailureListener(restoreSnapshotResponse -> {
247+
final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
248+
assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards()));
249+
assertThat(restoreInfo.failedShards(), is(0));
250+
secondRestoreCompleteLatch.countDown();
251+
}));
252+
return true;
253+
}
254+
return false;
255+
});
256+
257+
// now set the ball rolling by doing the first restore, waiting for it to complete
223258
final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap")
224259
.setWaitForCompletion(true)
225260
.get();
226261
final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
227262
assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards()));
228263
assertThat(restoreInfo.failedShards(), is(0));
229264

230-
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace))));
265+
// wait for the second restore to complete too
266+
safeAwait(secondRestoreStartedListener);
267+
safeAwait(secondRestoreCompleteLatch);
268+
269+
// wait for all the shards to finish moving
270+
safeAwait(allShardsActiveListener);
271+
ensureGreen(indexName, indexName + "-copy");
272+
273+
final var tinyNodeShardIds = getShardIds(dataNodeId, indexName);
274+
assertThat(tinyNodeShardIds, hasSize(1));
275+
assertThat(tinyNodeShardIds.iterator().next(), in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace)));
231276
}
232277

233278
private Set<ShardId> getShardIds(final String nodeId, final String indexName) {

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void simulateShardStarted(ShardRouting shard) {
9191

9292
var size = getExpectedShardSize(
9393
shard,
94-
UNAVAILABLE_EXPECTED_SHARD_SIZE,
94+
shard.getExpectedShardSize(),
9595
getClusterInfo(),
9696
allocation.snapshotShardSizeInfo(),
9797
allocation.metadata(),

test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,15 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta
263263
);
264264
}
265265

266+
/**
267+
* Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state
268+
* that satisfies {@code predicate}, at which point it unsubscribes itself.
269+
*
270+
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
271+
* given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is
272+
* already complete. If no matching cluster state is seen within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is
273+
* completed exceptionally on the scheduler thread that belongs to {@code clusterService}.
274+
*/
266275
public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) {
267276
final var listener = new SubscribableListener<Void>();
268277
final ClusterStateListener clusterStateListener = new ClusterStateListener() {
@@ -291,4 +300,35 @@ public String toString() {
291300
}
292301
return listener;
293302
}
303+
304+
/**
305+
* Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of one of the nodes in the
306+
* {@link ESIntegTestCase#internalCluster()}. When the chosen {@link ClusterService} applies a state that satisfies {@code predicate}
307+
* the listener unsubscribes itself.
308+
*
309+
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
310+
* {@link ClusterService} belonging to one of the nodes in the {@link ESIntegTestCase#internalCluster()}. If the current cluster
311+
* state already matches {@code predicate} then the returned listener is already complete. If no matching cluster state is seen
312+
* within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that
313+
* belongs to the chosen node's {@link ClusterService}.
314+
*/
315+
public static SubscribableListener<Void> addTemporaryStateListener(Predicate<ClusterState> predicate) {
316+
return addTemporaryStateListener(ESIntegTestCase.internalCluster().clusterService(), predicate);
317+
}
318+
319+
/**
320+
* Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of the current elected master node in the
321+
* {@link ESIntegTestCase#internalCluster()}. When this node's {@link ClusterService} applies a state that satisfies {@code predicate}
322+
* the listener unsubscribes itself.
323+
*
324+
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
325+
* {@link ClusterService} belonging to the node that was the elected master node in the
326+
* {@link ESIntegTestCase#internalCluster()} when this method was first called. If the current cluster state already matches
327+
* {@code predicate} then the returned listener is already complete. If no matching cluster state is seen within
328+
* {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that belongs to
329+
* the elected master node's {@link ClusterService}.
330+
*/
331+
public static SubscribableListener<Void> addMasterTemporaryStateListener(Predicate<ClusterState> predicate) {
332+
return addTemporaryStateListener(ESIntegTestCase.internalCluster().getCurrentMasterNodeInstance(ClusterService.class), predicate);
333+
}
294334
}

0 commit comments

Comments
 (0)