Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.hamcrest.Matcher;
Expand All @@ -43,6 +45,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -56,6 +59,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
Expand Down Expand Up @@ -163,16 +167,6 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getSmallestShardIds())));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105331")
@TestIssueLogging(
value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer:TRACE,"
+ "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler:DEBUG,"
+ "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:TRACE,"
+ "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator:TRACE,"
+ "org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders:TRACE,"
+ "org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider:TRACE",
issueUrl = "https://github.com/elastic/elasticsearch/issues/105331"
)
public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards() throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -230,6 +224,110 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace))));
}

public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleRestores() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards was flaky because of this bug, but only failing once every few hundred iterations. This test fails more reliably for the same reason, although still not all that reliably (after around 20-30 iterations on my laptop). I could make it exercise the exact path that hits the bug every time, but it'd be very specific to this one bug and I'd rather have something a little more general to look out for related bugs too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards does a little more disk usage checking as well as the shard assignment. This new test checks that shards are assigned correctly, not so much disk usage.

They feel a little redundant: might we add a couple more touches to the new one and delete the old?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was sort of inclined to keep them both but you're right, we're not really testing anything different in the old test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for updating

internalCluster().startMasterOnlyNode();
final String dataNodeName = internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
ensureStableCluster(3);

assertAcked(
clusterAdmin().preparePutRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, "repo")
.setType(FsRepository.TYPE)
.setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean()))
);

final AtomicBoolean allowRelocations = new AtomicBoolean(true);
final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService();
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
ClusterInfoServiceUtils.refresh(clusterInfoService);
if (allowRelocations.get() == false) {
assertThat(
"Expects no relocating shards but got: " + event.state().getRoutingNodes(),
numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING),
equalTo(0)
);
}
});

final String indexName = randomIdentifier();
createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build());
var shardSizes = createReasonableSizedShards(indexName);

final CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap")
.setIndices(indexName)
.setWaitForCompletion(true)
.get();
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.successfulShards(), is(snapshotInfo.totalShards()));
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));

assertAcked(indicesAdmin().prepareDelete(indexName).get());
updateClusterSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString()));
allowRelocations.set(false);

// reduce disk size of one data node so that only one shard copy fits there, forcing all the other shards to be assigned to the
// other data node
getTestFileStore(dataNodeName).setTotalSpace(shardSizes.getSmallestShardSize() * 2 + WATERMARK_BYTES - 1L);
refreshDiskUsage();

// We're going to restore the index twice in quick succession and verify that we don't assign more than one shard in total to the
// chosen node, but to do this we have to work backwards: first we have to set up listeners to react to events and then finally we
// trigger the whole chain by starting the first restore.
final var copyIndexName = indexName + "-copy";

// set up a listener that explicitly forbids more than one shard to be assigned to the tiny node
final String dataNodeId = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
final var allShardsActiveListener = ClusterServiceUtils.addTemporaryStateListener(cs -> {
assertThat(cs.getRoutingNodes().toString(), cs.getRoutingNodes().node(dataNodeId).size(), lessThanOrEqualTo(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooph, RoutingNode#size() is an unhelpful method name 😓

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming things is hard, but yeah this is not good. At least it has Javadocs :)

var seenCopy = false;
for (final IndexRoutingTable indexRoutingTable : cs.routingTable()) {
if (indexRoutingTable.getIndex().getName().equals(copyIndexName)) {
seenCopy = true;
}
if (indexRoutingTable.allShardsActive() == false) {
return false;
}
}
return seenCopy; // only remove this listener when we've started both restores and all the resulting shards are complete
});

// set up a listener which waits for the shards from the first restore to start initializing and then kick off another restore
final var secondRestoreCompleteLatch = new CountDownLatch(1);
final var secondRestoreStartedListener = ClusterServiceUtils.addTemporaryStateListener(cs -> {
final var indexRoutingTable = cs.routingTable().index(indexName);
if (indexRoutingTable != null && indexRoutingTable.shardsWithState(ShardRoutingState.INITIALIZING).isEmpty() == false) {
clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap")
.setWaitForCompletion(true)
.setRenamePattern(indexName)
.setRenameReplacement(indexName + "-copy")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, is this purely a test-only feature? Doesn't look like it's used anyplace else.

(not actionable, I'm just surprised)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the feature in production, see here:

We just don't use the RestoreSnapshotRequestBuilder to build a RestoreSnapshotRequest anywhere, instead building the request directly since it's all mutable anyway. Not a pattern I like, but one that is going to take a long time to completely eliminate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh 🤔 Got it 👍

.execute(ActionTestUtils.assertNoFailureListener(restoreSnapshotResponse -> {
final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards()));
assertThat(restoreInfo.failedShards(), is(0));
secondRestoreCompleteLatch.countDown();
}));
return true;
}
return false;
});

// now set the ball rolling by doing the first restore, waiting for it to complete
final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap")
.setWaitForCompletion(true)
.get();
final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards()));
assertThat(restoreInfo.failedShards(), is(0));

// wait for the second restore to complete too
safeAwait(secondRestoreStartedListener);
safeAwait(secondRestoreCompleteLatch);

// wait for all the shards to finish moving
safeAwait(allShardsActiveListener);
ensureGreen(indexName, indexName + "-copy");
}

private Set<ShardId> getShardIds(final String nodeId, final String indexName) {
final Set<ShardId> shardIds = new HashSet<>();
final IndexRoutingTable indexRoutingTable = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void simulateShardStarted(ShardRouting shard) {
var project = allocation.metadata().projectFor(shard.index());
var size = getExpectedShardSize(
shard,
UNAVAILABLE_EXPECTED_SHARD_SIZE,
shard.getExpectedShardSize(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One-line fix \o/

getClusterInfo(),
allocation.snapshotShardSizeInfo(),
project,
Expand Down