Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/126783.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 126783
summary: Fix shard size of initializing restored shard
area: Allocation
type: bug
issues:
- 105331
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
Expand All @@ -34,15 +35,16 @@
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.hamcrest.Matcher;

import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -54,8 +56,10 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
Expand Down Expand Up @@ -163,20 +167,10 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getSmallestShardIds())));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105331")
@TestIssueLogging(
value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer:TRACE,"
+ "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler:DEBUG,"
+ "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:TRACE,"
+ "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator:TRACE,"
+ "org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders:TRACE,"
+ "org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider:TRACE",
issueUrl = "https://github.com/elastic/elasticsearch/issues/105331"
)
public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards() throws Exception {
public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleRestores() {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String dataNodeName = internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
ensureStableCluster(3);

assertAcked(
Expand All @@ -185,26 +179,16 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard
.setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean()))
);

final AtomicBoolean allowRelocations = new AtomicBoolean(true);
final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService();
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
ClusterInfoServiceUtils.refresh(clusterInfoService);
if (allowRelocations.get() == false) {
assertThat(
"Expects no relocating shards but got: " + event.state().getRoutingNodes(),
numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING),
equalTo(0)
);
}
});

final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
.addListener(event -> ClusterInfoServiceUtils.refresh(clusterInfoService));

final String indexName = randomIdentifier();
createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build());
var shardSizes = createReasonableSizedShards(indexName);
final var shardSizes = createReasonableSizedShards(indexName);

final CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap")
.setIndices(indexName)
.setWaitForCompletion(true)
.get();
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
Expand All @@ -213,21 +197,82 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard

assertAcked(indicesAdmin().prepareDelete(indexName).get());
updateClusterSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString()));
allowRelocations.set(false);

// reduce disk size of node 0 so that only 1 of 2 smallest shards can be allocated
var usableSpace = shardSizes.sizes().get(1).size();
// Verify that from this point on we do not do any rebalancing
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
assertThat(
"Expects no relocating shards but got: " + event.state().getRoutingNodes(),
numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING),
equalTo(0)
);
});

// reduce disk size of one data node so that only one shard copy fits there, forcing all the other shards to be assigned to the
// other data node
final var usableSpace = randomLongBetween(shardSizes.getSmallestShardSize(), shardSizes.getSmallestShardSize() * 2 - 1L);
getTestFileStore(dataNodeName).setTotalSpace(usableSpace + WATERMARK_BYTES);
refreshDiskUsage();

// We're going to restore the index twice in quick succession and verify that we don't assign more than one shard in total to the
// chosen node, but to do this we have to work backwards: first we have to set up listeners to react to events and then finally we
// trigger the whole chain by starting the first restore.
final var copyIndexName = indexName + "-copy";

// set up a listener that explicitly forbids more than one shard to be assigned to the tiny node
final var dataNodeId = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
final var allShardsActiveListener = ClusterServiceUtils.addTemporaryStateListener(cs -> {
assertThat(cs.getRoutingNodes().toString(), cs.getRoutingNodes().node(dataNodeId).size(), lessThanOrEqualTo(1));
var seenCopy = false;
for (final IndexRoutingTable indexRoutingTable : cs.routingTable()) {
if (indexRoutingTable.getIndex().getName().equals(copyIndexName)) {
seenCopy = true;
}
if (indexRoutingTable.allShardsActive() == false) {
return false;
}
}
return seenCopy; // only remove this listener when we've started both restores and all the resulting shards are complete
});

// set up a listener which waits for the shards from the first restore to start initializing and then kick off another restore
final var secondRestoreCompleteLatch = new CountDownLatch(1);
final var secondRestoreStartedListener = ClusterServiceUtils.addTemporaryStateListener(cs -> {
final var indexRoutingTable = cs.routingTable().index(indexName);
if (indexRoutingTable != null && indexRoutingTable.shardsWithState(ShardRoutingState.INITIALIZING).isEmpty() == false) {
clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap")
.setWaitForCompletion(true)
.setRenamePattern(indexName)
.setRenameReplacement(indexName + "-copy")
.execute(ActionTestUtils.assertNoFailureListener(restoreSnapshotResponse -> {
final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards()));
assertThat(restoreInfo.failedShards(), is(0));
secondRestoreCompleteLatch.countDown();
}));
return true;
}
return false;
});

// now set the ball rolling by doing the first restore, waiting for it to complete
final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap")
.setWaitForCompletion(true)
.get();
final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards()));
assertThat(restoreInfo.failedShards(), is(0));

assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace))));
// wait for the second restore to complete too
safeAwait(secondRestoreStartedListener);
safeAwait(secondRestoreCompleteLatch);

// wait for all the shards to finish moving
safeAwait(allShardsActiveListener);
ensureGreen(indexName, indexName + "-copy");

final var tinyNodeShardIds = getShardIds(dataNodeId, indexName);
assertThat(tinyNodeShardIds, hasSize(1));
assertThat(tinyNodeShardIds.iterator().next(), in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace)));
}

private Set<ShardId> getShardIds(final String nodeId, final String indexName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void simulateShardStarted(ShardRouting shard) {

var size = getExpectedShardSize(
shard,
UNAVAILABLE_EXPECTED_SHARD_SIZE,
shard.getExpectedShardSize(),
getClusterInfo(),
allocation.snapshotShardSizeInfo(),
allocation.metadata(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta
);
}

/**
* Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state
* that satisfies {@code predicate}, at which point it unsubscribes itself.
*
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
* given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is
* already complete. If no matching cluster state is seen within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is
* completed exceptionally on the scheduler thread that belongs to {@code clusterService}.
*/
public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) {
return addTemporaryStateListener(clusterService, predicate, ESTestCase.SAFE_AWAIT_TIMEOUT);
}
Expand Down Expand Up @@ -308,4 +317,35 @@ public String toString() {
}
return listener;
}

/**
* Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of one of the nodes in the
* {@link ESIntegTestCase#internalCluster()}. When the chosen {@link ClusterService} applies a state that satisfies {@code predicate}
* the listener unsubscribes itself.
*
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
* {@link ClusterService} belonging to one of the nodes in the {@link ESIntegTestCase#internalCluster()}. If the current cluster
* state already matches {@code predicate} then the returned listener is already complete. If no matching cluster state is seen
* within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that
* belongs to the chosen node's {@link ClusterService}.
*/
public static SubscribableListener<Void> addTemporaryStateListener(Predicate<ClusterState> predicate) {
return addTemporaryStateListener(ESIntegTestCase.internalCluster().clusterService(), predicate);
}

/**
* Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of the current elected master node in the
* {@link ESIntegTestCase#internalCluster()}. When this node's {@link ClusterService} applies a state that satisfies {@code predicate}
* the listener unsubscribes itself.
*
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
* {@link ClusterService} belonging to the node that was the elected master node in the
* {@link ESIntegTestCase#internalCluster()} when this method was first called. If the current cluster state already matches
* {@code predicate} then the returned listener is already complete. If no matching cluster state is seen within
* {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that belongs to
* the elected master node's {@link ClusterService}.
*/
public static SubscribableListener<Void> addMasterTemporaryStateListener(Predicate<ClusterState> predicate) {
return addTemporaryStateListener(ESIntegTestCase.internalCluster().getCurrentMasterNodeInstance(ClusterService.class), predicate);
}
}