diff --git a/docs/changelog/131592.yaml b/docs/changelog/131592.yaml new file mode 100644 index 0000000000000..09f0afebc1ff3 --- /dev/null +++ b/docs/changelog/131592.yaml @@ -0,0 +1,5 @@ +pr: 131592 +summary: "Limited number of shard snapshot in INIT state per node" +area: Snapshot/Restore +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotAndRelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotAndRelocationIT.java new file mode 100644 index 0000000000000..2039a1354927b --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotAndRelocationIT.java @@ -0,0 +1,187 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.snapshots; + +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SnapshotAndRelocationIT extends AbstractSnapshotIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(SnapshotsService.SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING.getKey(), 1) + .build(); + } + + public void testLimitingInitAndRelocationForAssignedQueueShards() throws Exception { + final String masterNode = internalCluster().startMasterOnlyNode(); + final String dataNodeA = internalCluster().startDataOnlyNode(); + final String dataNodeB = internalCluster().startDataOnlyNode(); + ensureStableCluster(3); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + + final AtomicBoolean delayOnce = new AtomicBoolean(false); + final AtomicReference> delayedAction = new AtomicReference<>(); + final var delayedActionSetLatch = new CountDownLatch(1); + MockTransportService.getInstance(masterNode) + .addRequestHandlingBehavior(TransportUpdateSnapshotStatusAction.NAME, (handler, request, channel, task) -> { + if (delayOnce.compareAndSet(false, true)) { + delayedAction.set(() -> handler.messageReceived(request, channel, task)); + delayedActionSetLatch.countDown(); + } else { + handler.messageReceived(request, channel, task); + } + }); + + final var numIndices = between(2, 4); + final var indexNames = IntStream.range(0, numIndices).mapToObj(i -> "index-" + i).toList(); + + for (var indexName : indexNames) { + createIndex(indexName, indexSettings(1, 0).put("index.routing.allocation.include._name", dataNodeA).build()); + indexRandomDocs(indexName, between(10, 42)); + } + ensureGreen(); + + final var future = startFullSnapshot(repoName, "snapshot"); + safeAwait(delayedActionSetLatch); + + final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final SnapshotsInProgress.Entry snapshot = SnapshotsInProgress.get(clusterService.state()).asStream().iterator().next(); + logger.info("--> snapshot=[{}]", snapshot); + final var shards = snapshot.shards(); + assertThat(shards.size(), equalTo(numIndices)); + + final var dataNodeAId = getNodeId(dataNodeA); + final var initShards = shards.entrySet() + .stream() + .filter(entry -> entry.getValue().state() == SnapshotsInProgress.ShardState.INIT) + .peek(entry -> assertThat(entry.getValue().nodeId(), equalTo(dataNodeAId))) + .map(Map.Entry::getKey) + .toList(); + logger.info("--> init shards [{}]", initShards); + assertThat(initShards.size(), equalTo(1)); + + final var assignedQueuedShards = shards.entrySet() + .stream() + .filter(entry -> entry.getValue().isAssignedQueued()) + .peek(entry -> assertThat(entry.getValue().nodeId(), equalTo(dataNodeAId))) + .map(Map.Entry::getKey) + .toList(); + logger.info("--> assigned queued shards [{}]", assignedQueuedShards); + assertThat(assignedQueuedShards.size(), equalTo(numIndices - 1)); + + // Relocate indices that are assigned queued + final String[] indices = assignedQueuedShards.stream().map(ShardId::getIndexName).toArray(String[]::new); + logger.info("--> relocate indices [{}]", Arrays.toString(indices)); + updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", dataNodeB), indices); + ensureGreen(indices); + + final var dataNodeBIndicesService = internalCluster().getInstance(IndicesService.class, dataNodeB); + for (var shardId : assignedQueuedShards) { + assertTrue( + "indices: " + + StreamSupport.stream(dataNodeBIndicesService.spliterator(), false) + .map(indexService -> indexService.index().getName()) + .toList(), + dataNodeBIndicesService.hasIndex(shardId.getIndex()) + ); + } + + assertThat(future.isDone(), is(false)); + logger.info("--> run delayed action"); + delayedAction.get().run(); + assertSuccessful(future); + } + + public void testSnapshotStartedEarlierCompletesFirst() throws Exception { + final String masterNode = internalCluster().startMasterOnlyNode(); + final String dataNode = internalCluster().startDataOnlyNode(); + ensureStableCluster(2); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + + final var numIndices = between(2, 4); + final var indexNames = IntStream.range(0, numIndices).mapToObj(i -> "index-" + i).toList(); + + for (var indexName : indexNames) { + createIndex(indexName, 1, 0); + indexRandomDocs(indexName, between(10, 42)); + } + ensureGreen(); + + final String firstSnapshot = "snapshot-0"; + final String secondSnapshot = "snapshot-1"; + + // Start two snapshots and wait for both of them to appear in the cluster state + blockDataNode(repoName, dataNode); + final var future0 = startFullSnapshot(repoName, firstSnapshot); + safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(state -> { + final var snapshotsInProgress = SnapshotsInProgress.get(state); + final List snapshotNames = snapshotsInProgress.asStream() + .map(entry -> entry.snapshot().getSnapshotId().getName()) + .toList(); + return snapshotNames.equals(List.of(firstSnapshot)); + })); + final var future1 = startFullSnapshot(repoName, secondSnapshot); + safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(state -> { + final var snapshotsInProgress = SnapshotsInProgress.get(state); + final List snapshotNames = snapshotsInProgress.asStream() + .map(entry -> entry.snapshot().getSnapshotId().getName()) + .toList(); + return snapshotNames.equals(List.of(firstSnapshot, secondSnapshot)); + })); + + // Ensure the first snapshot is completed first before the second one by observing a cluster state containing only the 2nd one + final var listenerForFirstSnapshotCompletion = ClusterServiceUtils.addMasterTemporaryStateListener(state -> { + final var snapshotsInProgress = SnapshotsInProgress.get(state); + final Set snapshotNames = snapshotsInProgress.asStream() + .map(entry -> entry.snapshot().getSnapshotId().getName()) + .collect(Collectors.toSet()); + return snapshotNames.equals(Set.of(secondSnapshot)); + }); + unblockNode(repoName, dataNode); + safeAwait(listenerForFirstSnapshotCompletion); + + assertSuccessful(future0); + assertSuccessful(future1); + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStressTestsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStressTestsIT.java index 16760bb7cb165..9c3f70181d59e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStressTestsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStressTestsIT.java @@ -31,9 +31,11 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Randomness; @@ -57,6 +59,7 @@ import org.elasticsearch.threadpool.ScalingExecutorBuilder; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; import java.nio.file.Path; import java.util.ArrayList; @@ -89,7 +92,26 @@ @LuceneTestCase.SuppressFileSystems(value = "HandleLimitFS") // we sometimes have >2048 open files public class SnapshotStressTestsIT extends AbstractSnapshotIntegTestCase { + private int initialShardSnapshotPerNodeLimit; + + @Override + public void setUp() throws Exception { + super.setUp(); + initialShardSnapshotPerNodeLimit = between(0, 10); + } + + @After + public void clearShardSnapshotPerNodeLimitSetting() { + // Clear any persistent setting that may have been set during the test. The teardown process does not like it. + safeGet( + clusterAdmin().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .setPersistentSettings(Settings.builder().putNull(SnapshotsService.SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING.getKey())) + .execute() + ); + } + public void testRandomActivities() throws InterruptedException { + logger.info("--> initial shard snapshot per node limit: [{}]", initialShardSnapshotPerNodeLimit); final DiscoveryNodes discoveryNodes = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT) .clear() .setNodes(true) @@ -100,6 +122,14 @@ public void testRandomActivities() throws InterruptedException { disableRepoConsistencyCheck("have not necessarily written to all repositories"); } + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(SnapshotsService.SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING.getKey(), initialShardSnapshotPerNodeLimit) + .build(); + } + private static Set nodeNames(Map nodesMap) { return nodesMap.values().stream().map(DiscoveryNode::getName).collect(Collectors.toSet()); } @@ -323,8 +353,16 @@ public void run() throws InterruptedException { startCleaner(); } - if (randomBoolean()) { + final int shardMovingVariant = between(0, 2); + if (shardMovingVariant == 0) { startNodeShutdownMarker(); + } else if (shardMovingVariant == 1) { + startAllocationFiltering(); + } + // Intentionally have neither node shutdown marker nor allocation filtering in some tests + + if (randomBoolean()) { + startUpdateShardSnapshotPerNodeLimit(); } if (completedSnapshotLatch.await(30, TimeUnit.SECONDS)) { @@ -1306,6 +1344,121 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta }); } + private void startAllocationFiltering() { + enqueueAction(() -> { + boolean rerun = true; + try (TransferableReleasables localReleasables = new TransferableReleasables()) { + if (usually()) { + return; + } + + final List trackedIndices = indices.values() + .stream() + .filter(index -> index.supportsAllocationFilter) + .toList(); + if (trackedIndices.isEmpty()) { + return; + } + final var trackedIndex = randomFrom(trackedIndices); + if (localReleasables.add(tryAcquirePermit(trackedIndex.permits)) == null) { + return; + } + + if (localReleasables.add(blockNodeRestarts()) == null) { + return; + } + final var trackedNode = randomFrom(shuffledNodes.stream().filter(node -> node.isDataNode).toList()); + + final Releasable releaseAll = localReleasables.transfer(); + + logger.info(" --> moving index [{}] away from node [{}]", trackedIndex.indexName, trackedNode.nodeName); + + SubscribableListener.newForked( + l -> indicesAdmin().prepareUpdateSettings(trackedIndex.indexName) + .setSettings(Settings.builder().put("index.routing.allocation.exclude._name", trackedNode.nodeName)) + .execute(l) + ).addListener(mustSucceed(acknowledgedResponse -> { + assertTrue(acknowledgedResponse.isAcknowledged()); + logger.info("--> updated index [{}] settings to exclude node [{}]", trackedIndex.indexName, trackedNode.nodeName); + pollForAllocationFilterCompletion(trackedNode, trackedIndex.indexName, releaseAll, this::startAllocationFiltering); + })); + rerun = false; + + } finally { + if (rerun) { + startAllocationFiltering(); + } + } + }); + } + + private void pollForAllocationFilterCompletion( + TrackedNode excludedNode, + String indexName, + Releasable onCompletion, + Runnable onSuccess + ) { + if (shouldStop.get()) { + Releasables.close(onCompletion); + return; + } + clientExecutor.execute(mustSucceed(() -> { + final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final ClusterState state = clusterService.state(); + + final var allRebalanced = state.routingTable(ProjectId.DEFAULT) + .index(indexName) + .allShards() + .flatMap(IndexShardRoutingTable::allShards) + .allMatch(shardRouting -> shardRouting.active() && excludedNode.nodeId.equals(shardRouting.currentNodeId()) == false); + + if (allRebalanced) { + logger.info("--> moved index [{}] away from node [{}]", indexName, excludedNode.nodeName); + Releasables.close(onCompletion); + onSuccess.run(); + } else { + pollForAllocationFilterCompletion(excludedNode, indexName, onCompletion, onSuccess); + } + })); + } + + private void startUpdateShardSnapshotPerNodeLimit() { + enqueueAction(() -> { + boolean rerun = true; + try (TransferableReleasables localReleasables = new TransferableReleasables()) { + if (usually()) { + return; + } + + if (localReleasables.add(blockNodeRestarts()) == null) { + return; + } + + final Releasable releaseAll = localReleasables.transfer(); + + final int newLimit = between(0, 10); + logger.info("--> updating shard snapshot per node limit to [{}]", newLimit); + + clusterAdmin().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .setPersistentSettings( + Settings.builder().put(SnapshotsService.SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING.getKey(), newLimit) + ) + .execute(mustSucceed(response -> { + assertTrue(response.isAcknowledged()); + logger.info("--> updated shard snapshot per node limit to [{}]", newLimit); + Releasables.close(releaseAll); + startUpdateShardSnapshotPerNodeLimit(); + })); + + rerun = false; + } finally { + if (rerun) { + startUpdateShardSnapshotPerNodeLimit(); + } + } + }); + } + @Nullable // if we couldn't block node restarts private Releasable blockNodeRestarts() { try (TransferableReleasables localReleasables = new TransferableReleasables()) { @@ -1449,6 +1602,7 @@ private class TrackedIndex { // these fields are only changed when all permits held by the delete/recreate process: private int shardCount; + private boolean supportsAllocationFilter; private Semaphore docPermits; private TrackedIndex(String indexName) { @@ -1472,8 +1626,10 @@ private void createIndexAndContinue(Releasable releasable) { shardCount = between(1, 5); docPermits = new Semaphore(between(1000, 3000)); logger.info("--> create index [{}] with max [{}] docs", indexName, docPermits.availablePermits()); + final int replicaCount = between(0, cluster.numDataNodes() - 1); + supportsAllocationFilter = 1 + replicaCount < cluster.numDataNodes(); indicesAdmin().prepareCreate(indexName) - .setSettings(indexSettings(shardCount, between(0, cluster.numDataNodes() - 1))) + .setSettings(indexSettings(shardCount, replicaCount)) .execute(mustSucceed(response -> { assertTrue(response.isAcknowledged()); logger.info("--> finished create index [{}]", indexName); @@ -1753,11 +1909,13 @@ private static class TrackedNode { private final String nodeName; private final boolean isMasterNode; private final boolean isDataNode; + private final String nodeId; TrackedNode(String nodeName, boolean isMasterNode, boolean isDataNode) { this.nodeName = nodeName; this.isMasterNode = isMasterNode; this.isDataNode = isDataNode; + this.nodeId = getNodeId(nodeName); } Semaphore getPermits() { diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index af15382e890d2..0783e90f71d4c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -220,6 +220,10 @@ public int count() { return count; } + public Set repos() { + return entries.keySet(); + } + public Iterable> entriesByRepo() { return () -> Iterators.map(entries.values().iterator(), byRepo -> byRepo.entries); } @@ -527,7 +531,7 @@ private static boolean assertShardStateConsistent( int shardId, ShardSnapshotStatus shardSnapshotStatus ) { - if (shardSnapshotStatus.isActive()) { + if (shardSnapshotStatus.isActiveOrAssignedQueued()) { Tuple plainShardId = Tuple.tuple(indexName, shardId); assert assignedShards.add(plainShardId) : plainShardId + " is assigned twice in " + entries; assert queuedShards.contains(plainShardId) == false : plainShardId + " is queued then assigned in " + entries; @@ -775,6 +779,20 @@ public static ShardSnapshotStatus success(String nodeId, ShardSnapshotResult sha return new ShardSnapshotStatus(nodeId, ShardState.SUCCESS, shardSnapshotResult.getGeneration(), null, shardSnapshotResult); } + @SuppressForbidden(reason = "using a private constructor within the same file") + public static ShardSnapshotStatus assignedQueued(String nodeId, ShardGeneration generation) { + return new ShardSnapshotStatus(nodeId, ShardState.QUEUED, generation, null, null); + } + + public boolean isAssignedQueued() { + // generation can still be null if previous shard snapshots all failed + return state == ShardState.QUEUED && nodeId != null; + } + + public boolean isUnassignedQueued() { + return this == UNASSIGNED_QUEUED || (state == ShardState.QUEUED && generation == null && nodeId == null); + } + public ShardSnapshotStatus( @Nullable String nodeId, ShardState state, @@ -795,8 +813,16 @@ private boolean assertConsistent() { assert state.failed() == false || reason != null; assert (state != ShardState.INIT && state != ShardState.WAITING && state != ShardState.PAUSED_FOR_NODE_REMOVAL) || nodeId != null : "Null node id for state [" + state + "]"; - assert state != ShardState.QUEUED || (nodeId == null && generation == null && reason == null) - : "Found unexpected non-null values for queued state shard nodeId[" + nodeId + "][" + generation + "][" + reason + "]"; + assert state != ShardState.QUEUED || (isUnassignedQueued() || isAssignedQueued()) + : "Found unexpected shard state=[" + + state + + "], nodeId=[" + + nodeId + + "], generation=[" + + generation + + "], reason=[" + + reason + + "]"; assert state == ShardState.SUCCESS || shardSnapshotResult == null; assert shardSnapshotResult == null || shardSnapshotResult.getGeneration().equals(generation) : "generation [" + generation + "] does not match result generation [" + shardSnapshotResult.getGeneration() + "]"; @@ -810,7 +836,7 @@ public static ShardSnapshotStatus readFrom(StreamInput in) throws IOException { final ShardGeneration generation = in.readOptionalWriteable(ShardGeneration::new); final String reason = in.readOptionalString(); final ShardSnapshotResult shardSnapshotResult = in.readOptionalWriteable(ShardSnapshotResult::new); - if (state == ShardState.QUEUED) { + if (state == ShardState.QUEUED && nodeId == null && generation == null) { return UNASSIGNED_QUEUED; } return new ShardSnapshotStatus(nodeId, state, generation, reason, shardSnapshotResult); @@ -849,6 +875,10 @@ public boolean isActive() { }; } + public boolean isActiveOrAssignedQueued() { + return isActive() || isAssignedQueued(); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(nodeId); @@ -904,6 +934,12 @@ public static class Entry implements Writeable, ToXContentObject, RepositoryOper */ private final boolean hasShardsInInitState; + /** + * Flag set to true in case any of the shard snapshots in {@link #shards} are {@link ShardSnapshotStatus#isAssignedQueued}. + * This is used to avoid having to iterate the full {@link #shards} map. + */ + private final boolean hasAssignedQueuedShards; + // visible for testing, use #startedEntry and copy constructors in production code public static Entry snapshot( Snapshot snapshot, @@ -923,6 +959,7 @@ public static Entry snapshot( final Map res = Maps.newMapWithExpectedSize(indices.size()); final Map byRepoShardIdBuilder = Maps.newHashMapWithExpectedSize(shards.size()); boolean hasInitStateShards = false; + boolean hasAssignedQueuedShards = false; for (Map.Entry entry : shards.entrySet()) { final ShardId shardId = entry.getKey(); final IndexId indexId = indices.get(shardId.getIndexName()); @@ -931,6 +968,7 @@ public static Entry snapshot( assert existing == null || existing.equals(index) : "Conflicting indices [" + existing + "] and [" + index + "]"; final var shardSnapshotStatus = entry.getValue(); hasInitStateShards |= shardSnapshotStatus.state() == ShardState.INIT; + hasAssignedQueuedShards |= shardSnapshotStatus.isAssignedQueued(); byRepoShardIdBuilder.put(new RepositoryShardId(indexId, shardId.id()), shardSnapshotStatus); } return new Entry( @@ -950,7 +988,8 @@ public static Entry snapshot( null, byRepoShardIdBuilder, res, - hasInitStateShards + hasInitStateShards, + hasAssignedQueuedShards ); } @@ -982,6 +1021,7 @@ private static Entry createClone( source, shardStatusByRepoShardId, Map.of(), + false, false ); } @@ -1003,7 +1043,8 @@ private Entry( @Nullable SnapshotId source, Map shardStatusByRepoShardId, Map snapshotIndices, - boolean hasShardsInInitState + boolean hasShardsInInitState, + boolean hasAssignedQueuedShards ) { this.state = state; this.snapshot = snapshot; @@ -1022,13 +1063,15 @@ private Entry( this.shardStatusByRepoShardId = Map.copyOf(shardStatusByRepoShardId); this.snapshotIndices = snapshotIndices; this.hasShardsInInitState = hasShardsInInitState; + this.hasAssignedQueuedShards = hasAssignedQueuedShards; assert assertShardsConsistent( this.source, this.state, this.indices, this.shards, this.shardStatusByRepoShardId, - this.hasShardsInInitState + this.hasShardsInInitState, + this.hasAssignedQueuedShards ); } @@ -1078,7 +1121,8 @@ private static boolean assertShardsConsistent( Map indices, Map shards, Map statusByRepoShardId, - boolean hasInitStateShards + boolean hasInitStateShards, + boolean hasAssignedQueuedShards ) { if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) { return true; @@ -1086,6 +1130,9 @@ private static boolean assertShardsConsistent( if (hasInitStateShards) { assert state == State.STARTED : "shouldn't have INIT-state shards in state " + state; } + if (hasAssignedQueuedShards) { + assert source == null : "clone entry must not have any shards in assigned-queued state"; + } final Set indexNames = indices.keySet(); final Set indexNamesInShards = new HashSet<>(); shards.entrySet().forEach(s -> { @@ -1141,7 +1188,8 @@ public Entry withRepoGen(long newRepoGen) { source, shardStatusByRepoShardId, snapshotIndices, - hasShardsInInitState + hasShardsInInitState, + hasAssignedQueuedShards ); } @@ -1219,27 +1267,38 @@ public Entry withClones(Map updatedClone * In the special case where this instance has not yet made any progress on any shard this method just returns * {@code null} since no abort is needed and the snapshot can simply be removed from the cluster state outright. * + * @param completedAssignedQueuedShards Map to accumulate assigned-queued shards that get aborted in this entry * @return aborted snapshot entry or {@code null} if entry can be removed from the cluster state directly */ @Nullable - public Entry abort() { + public Entry abort(Map> completedAssignedQueuedShards) { final Map shardsBuilder = new HashMap<>(); boolean completed = true; boolean allQueued = true; for (Map.Entry shardEntry : shards.entrySet()) { ShardSnapshotStatus status = shardEntry.getValue(); + final var isAssignedQueued = status.isAssignedQueued(); allQueued &= status.state() == ShardState.QUEUED; + final ShardId shardId = shardEntry.getKey(); if (status.state().completed() == false) { final String nodeId = status.nodeId(); - status = new ShardSnapshotStatus( - nodeId, - nodeId == null ? ShardState.FAILED : ShardState.ABORTED, - status.generation(), - "aborted by snapshot deletion" - ); + if (isAssignedQueued == false) { + status = new ShardSnapshotStatus( + nodeId, + nodeId == null ? ShardState.FAILED : ShardState.ABORTED, + status.generation(), + "aborted by snapshot deletion" + ); + } else { + final String reason = "assigned-queued aborted by snapshot deletion"; + status = new ShardSnapshotStatus(nodeId, ShardState.FAILED, status.generation(), reason); + // Record the failure so that we may start the first QUEUED one in later snapshots. + final var old = completedAssignedQueuedShards.put(shardId, new Tuple<>(snapshot(), status)); + assert old == null : shardId + " has unexpected old entry " + old + " conflicting with " + this; + } } completed &= status.state().completed(); - shardsBuilder.put(shardEntry.getKey(), status); + shardsBuilder.put(shardId, status); } if (allQueued) { return null; @@ -1375,6 +1434,15 @@ public boolean hasShardsInInitState() { return hasShardsInInitState; } + /** + * See {@link #hasAssignedQueuedShards}. + */ + public boolean hasAssignedQueuedShards() { + assert hasAssignedQueuedShards == false || isClone() == false + : "a clone entry must not have assigned-queued shards, but saw " + this; + return hasAssignedQueuedShards; + } + public boolean partial() { return partial; } @@ -1756,7 +1824,8 @@ public Entry apply(Entry part) { null, part.shardStatusByRepoShardId, part.snapshotIndices, - part.hasShardsInInitState + part.hasShardsInInitState, + part.hasAssignedQueuedShards ); } if (part.isClone()) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java index 03ae7c6f1d32c..5ca3d61470940 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java @@ -118,6 +118,10 @@ private static Decision canMove(ShardRouting shardRouting, RoutingAllocation all } } + if (shardSnapshotStatus.isAssignedQueued()) { + continue; + } + return allocation.decision( Decision.THROTTLE, NAME, diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 9c2d6fab10368..e2f436ca40c15 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -577,6 +577,7 @@ public void apply(Settings value, Settings current, Settings previous) { HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING, HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING, SnapshotsService.MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, + SnapshotsService.SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING, RestoreService.REFRESH_REPO_UUID_ON_RESTORE_SETTING, FsHealthService.ENABLED_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING, diff --git a/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java b/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java index 94fc6dfad06d8..c7cc43b9ff0ab 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java +++ b/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java @@ -67,7 +67,8 @@ private static void addStateInformation( int shardId, String indexName ) { - if (shardState.isActive()) { + // Both active or assigned queued means the shard is meant to be the one with actions if node capacity allows it + if (shardState.isActiveOrAssignedQueued()) { busyIds.computeIfAbsent(indexName, k -> new HashSet<>()).add(shardId); assert assertGenerationConsistency(generations, indexName, shardId, shardState.generation()); } else if (shardState.state() == SnapshotsInProgress.ShardState.SUCCESS) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/PerNodeShardSnapshotCounter.java b/server/src/main/java/org/elasticsearch/snapshots/PerNodeShardSnapshotCounter.java new file mode 100644 index 0000000000000..c0bb5ffd7a0ec --- /dev/null +++ b/server/src/main/java/org/elasticsearch/snapshots/PerNodeShardSnapshotCounter.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; + +import java.util.Map; +import java.util.stream.Collectors; + +public class PerNodeShardSnapshotCounter { + + private final int shardSnapshotPerNodeLimit; + private final Map perNodeCounts; + + public PerNodeShardSnapshotCounter( + int shardSnapshotPerNodeLimit, + SnapshotsInProgress snapshotsInProgress, + DiscoveryNodes nodes, + boolean isStateless + ) { + this.shardSnapshotPerNodeLimit = shardSnapshotPerNodeLimit; + if (this.shardSnapshotPerNodeLimit <= 0) { + this.perNodeCounts = Map.of(); + } else { + final var perNodeCounts = nodes.getDataNodes() + .values() + .stream() + .filter(node -> isStateless == false || node.hasRole(DiscoveryNodeRole.INDEX_ROLE.roleName())) + .filter(node -> snapshotsInProgress.isNodeIdForRemoval(node.getId()) == false) + .collect(Collectors.toMap(DiscoveryNode::getId, node -> 0)); + + snapshotsInProgress.asStream().forEach(entry -> { + if (entry.state().completed() || entry.isClone()) { + return; + } + for (var shardSnapshotStatus : entry.shards().values()) { + if (isRunningOnDataNode(shardSnapshotStatus)) { + perNodeCounts.computeIfPresent(shardSnapshotStatus.nodeId(), (nodeId, count) -> count + 1); + } + } + }); + this.perNodeCounts = perNodeCounts; + } + } + + public boolean tryStartShardSnapshotOnNode(String nodeId) { + if (enabled() == false) { + return true; + } + final Integer count = perNodeCounts.get(nodeId); + if (count == null) { + return false; + } + if (count < shardSnapshotPerNodeLimit) { + perNodeCounts.put(nodeId, count + 1); + return true; + } else { + return false; + } + } + + public boolean completeShardSnapshotOnNode(String nodeId) { + if (enabled() == false) { + return true; + } + final Integer count = perNodeCounts.get(nodeId); + if (count == null) { + return false; + } + if (count <= 0) { + return false; + } + perNodeCounts.put(nodeId, count - 1); + return true; + } + + public boolean hasCapacityOnAnyNode() { + return enabled() == false || perNodeCounts.values().stream().anyMatch(count -> count < shardSnapshotPerNodeLimit); + } + + @Override + public String toString() { + return "PerNodeShardSnapshotCounter{" + + "shardSnapshotPerNodeLimit=" + + shardSnapshotPerNodeLimit + + ", perNodeCounts=" + + perNodeCounts + + '}'; + } + + private boolean enabled() { + return shardSnapshotPerNodeLimit > 0; + } + + private static boolean isRunningOnDataNode(SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus) { + return shardSnapshotStatus.state() == SnapshotsInProgress.ShardState.INIT + // Aborted shard snapshot is still running on the data node until it is FAILED + || shardSnapshotStatus.state() == SnapshotsInProgress.ShardState.ABORTED; + } +} diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index f28d4948f657a..8df122a3d723a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -149,6 +149,8 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement public static final String NO_FEATURE_STATES_VALUE = "none"; + private final boolean isStateless; + private final ClusterService clusterService; private final RerouteService rerouteService; @@ -202,7 +204,19 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement Setting.Property.Dynamic ); + /** + * Setting that specifies the max number of shard snapshots that can run on a data node. Default is 0 which means no limit. + */ + public static final Setting SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING = Setting.intSetting( + "snapshot.shard_snapshot_per_node_limit", + 0, + 0, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private volatile int maxConcurrentOperations; + private volatile int shardSnapshotPerNodeLimit; public SnapshotsService( Settings settings, @@ -215,6 +229,7 @@ public SnapshotsService( boolean serializeProjectMetadata, SnapshotMetrics snapshotMetrics ) { + this.isStateless = DiscoveryNode.isStateless(settings); this.clusterService = clusterService; this.rerouteService = rerouteService; this.indexNameExpressionResolver = indexNameExpressionResolver; @@ -230,6 +245,9 @@ public SnapshotsService( maxConcurrentOperations = MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, i -> maxConcurrentOperations = i); + clusterService.getClusterSettings() + .initializeAndWatch(SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING, i -> shardSnapshotPerNodeLimit = i); + } this.systemIndices = systemIndices; this.serializeProjectMetadata = serializeProjectMetadata; @@ -798,6 +816,12 @@ public ClusterState execute(ClusterState currentState) { final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState); final SnapshotDeletionsInProgress deletesInProgress = SnapshotDeletionsInProgress.get(currentState); DiscoveryNodes nodes = currentState.nodes(); + final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter( + shardSnapshotPerNodeLimit, + snapshotsInProgress, + nodes, + isStateless + ); final EnumSet statesToUpdate; if (changedNodes) { // If we are reacting to a change in the cluster node configuration we have to update the shard states of both started @@ -891,6 +915,7 @@ public ClusterState execute(ClusterState currentState) { currentState.routingTable(projectId), nodes, snapshotsInProgress::isNodeIdForRemoval, + perNodeShardSnapshotCounter, knownFailures ); if (shards != null) { @@ -1487,6 +1512,7 @@ public void deleteSnapshots(final ProjectId projectId, final DeleteSnapshotReque executeConsistentStateUpdate(repository, repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) { private SnapshotDeletionsInProgress.Entry newDelete = null; + private SnapshotDeletionsInProgress.Entry previousDeleteToStart = null; private boolean reusedExistingDelete = false; @@ -1581,6 +1607,14 @@ public ClusterState execute(ClusterState currentState) { } // Snapshot ids that will have to be physically deleted from the repository final Set snapshotIdsRequiringCleanup = new HashSet<>(snapshotIds); + + final Map> completedAssignedQueuedShards = new HashMap<>(); + final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter( + shardSnapshotPerNodeLimit, + snapshotsInProgress, + currentState.nodes(), + isStateless + ); final SnapshotsInProgress updatedSnapshots = snapshotsInProgress.createCopyWithUpdatedEntriesForRepo( projectId, repositoryName, @@ -1588,7 +1622,13 @@ public ClusterState execute(ClusterState currentState) { if (existing.state() == SnapshotsInProgress.State.STARTED && snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) { // snapshot is started - mark every non completed shard as aborted - final SnapshotsInProgress.Entry abortedEntry = existing.abort(); + final SnapshotsInProgress.Entry abortedEntry = existing.abort(completedAssignedQueuedShards); + assert deletionsInProgress.hasExecutingDeletion(projectId, repositoryName) == false + || completedAssignedQueuedShards.isEmpty() + : "unexpected running deletions " + + deletionsInProgress + + " while there were assigned-queued shard snapshots " + + completedAssignedQueuedShards; if (abortedEntry == null) { // No work has been done for this snapshot yet so we remove it from the cluster state directly final Snapshot existingNotYetStartedSnapshot = existing.snapshot(); @@ -1602,13 +1642,41 @@ public ClusterState execute(ClusterState currentState) { completedWithCleanup.add(abortedEntry); } return abortedEntry; + } else { + return SnapshotShardsUpdateContext.EntryContext.maybeUpdateEntryWithCompletedAssignedQueuedShards( + currentState, + existing, + completedAssignedQueuedShards, + perNodeShardSnapshotCounter, + () -> {}, + () -> {} + ); } - return existing; }).filter(Objects::nonNull).toList() ); if (snapshotIdsRequiringCleanup.isEmpty()) { - // We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions - return SnapshotsServiceUtils.updateWithSnapshots(currentState, updatedSnapshots, null); + if (updatedSnapshots.forRepo(projectId, repositoryName).isEmpty()) { + // The last snapshot deleted may have only assigned-queued shard snapshots. Hence, this deletion requires + // no clean up, i.e. no finalization. In this case, must check whether there is any deletion previously WAITING + // due to this snapshot and is now ready to run. + SnapshotDeletionsInProgress updateDeletions = null; + for (var entry : deletionsInProgress.getEntries()) { + if (projectId.equals(entry.projectId()) && repositoryName.equals(entry.repository())) { + if (entry.state() == SnapshotDeletionsInProgress.State.STARTED) { + break; + } else if (entry.state() == SnapshotDeletionsInProgress.State.WAITING) { + previousDeleteToStart = entry.started(); + updateDeletions = deletionsInProgress.withRemovedEntry(entry.uuid()) + .withAddedEntry(previousDeleteToStart); + break; + } + } + } + return SnapshotsServiceUtils.updateWithSnapshots(currentState, updatedSnapshots, updateDeletions); + } else { + // We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions + return SnapshotsServiceUtils.updateWithSnapshots(currentState, updatedSnapshots, null); + } } // add the snapshot deletion to the cluster state final SnapshotDeletionsInProgress.Entry replacedEntry = deletionsInProgress.getEntries() @@ -1638,7 +1706,9 @@ public ClusterState execute(ClusterState currentState) { List.copyOf(snapshotIdsRequiringCleanup), threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), - updatedSnapshots.forRepo(projectId, repositoryName).stream().noneMatch(SnapshotsServiceUtils::isWritingToRepository) + updatedSnapshots.forRepo(projectId, repositoryName) + .stream() + .noneMatch(SnapshotsServiceUtils::isWritingToRepositoryOrAssignedQueued) && deletionsInProgress.hasExecutingDeletion(projectId, repositoryName) == false ? SnapshotDeletionsInProgress.State.STARTED : SnapshotDeletionsInProgress.State.WAITING @@ -1686,6 +1756,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) addDeleteListener(newDelete.uuid(), listener); } if (newDelete != null) { + assert previousDeleteToStart == null : previousDeleteToStart; if (reusedExistingDelete) { return; } @@ -1704,6 +1775,17 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) endSnapshot(completedSnapshot, newState.metadata(), repositoryData); } } + } else if (previousDeleteToStart != null) { + assert previousDeleteToStart.state() == SnapshotDeletionsInProgress.State.STARTED : previousDeleteToStart; + if (tryEnterRepoLoop(projectId, repositoryName)) { + deleteSnapshotsFromRepository( + previousDeleteToStart, + repositoryData, + newState.nodes().getMaxDataNodeCompatibleIndexVersion() + ); + } else { + logger.trace("Delete [{}] could not execute directly and was queued", previousDeleteToStart); + } } } @@ -2028,6 +2110,7 @@ public ClusterState execute(ClusterState currentState) { deleteEntry.projectId() ); readyDeletions = res.v2(); + return res.v1(); } @@ -2191,7 +2274,13 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState entry.indices().values(), entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), repositoryData, - repoName + repoName, + new PerNodeShardSnapshotCounter( + shardSnapshotPerNodeLimit, + snapshotsInProgress, + currentState.nodes(), + isStateless + ) ); final ImmutableOpenMap.Builder updatedAssignmentsBuilder = ImmutableOpenMap .builder(entry.shards()); @@ -2204,7 +2293,7 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState : "Missing assignment for [" + sid + "]"; updatedAssignmentsBuilder.put(sid, ShardSnapshotStatus.MISSING); } else { - if (updated.isActive()) { + if (updated.isActiveOrAssignedQueued()) { markShardReassigned(shardId, reassignedShardIds); } updatedAssignmentsBuilder.put(sid, updated); @@ -2336,12 +2425,16 @@ static final class SnapshotShardsUpdateContext { // entries that became complete due to this batch of updates private final List newlyCompletedEntries = new ArrayList<>(); + private final PerNodeShardSnapshotCounter perNodeShardSnapshotCounter; + /** * Sets up {@link #updatesByRepo} to organize the {@link ShardSnapshotUpdate} tasks by repository name. */ SnapshotShardsUpdateContext( ClusterStateTaskExecutor.BatchExecutionContext batchExecutionContext, - ShardSnapshotUpdateCompletionHandler completionHandler + ShardSnapshotUpdateCompletionHandler completionHandler, + int shardSnapshotPerNodeLimit, + boolean isStateless ) { this.batchExecutionContext = batchExecutionContext; this.initialState = batchExecutionContext.initialState(); @@ -2358,6 +2451,12 @@ static final class SnapshotShardsUpdateContext { ).add(task); } } + this.perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter( + shardSnapshotPerNodeLimit, + SnapshotsInProgress.get(initialState), + initialState.nodes(), + isStateless + ); } /** @@ -2377,8 +2476,11 @@ SnapshotsInProgress computeUpdatedState() { final List newEntries = new ArrayList<>(oldEntries.size()); // Iterate through the snapshots passing in the updates list. Once an update gets applied to a snapshot, it will be removed // from the updates list passed to the next snapshot. + final Map> completedAssignedQueuedShards = new HashMap<>(); for (SnapshotsInProgress.Entry entry : oldEntries) { - final var newEntry = applyUpdatesToEntry(entry, updates.getValue()); + SnapshotsInProgress.Entry newEntry = entry; + newEntry = maybeApplyCompletedAssignedQueuedShardsToEntry(newEntry, completedAssignedQueuedShards); + newEntry = applyUpdatesToEntry(newEntry, updates.getValue(), completedAssignedQueuedShards); newEntries.add(newEntry); if (newEntry != entry && newEntry.state().completed()) { newlyCompletedEntries.add(newEntry); @@ -2387,6 +2489,18 @@ SnapshotsInProgress computeUpdatedState() { updated = updated.createCopyWithUpdatedEntriesForRepo(projectRepo.projectId(), projectRepo.name(), newEntries); } + // Also check snapshots in all repositories to start any assigned queued shard snapshots. This is needed for both + // (1) Repos that have not seen updates in this batch because their snapshots may have been limited earlier + // due to snapshots running for a repository that is update to completion in this batch. + // (2) Repos that have seen updates in this batch because updates releasing capacity may all belong to later snapshots + // than the one has assigned-queued shards. These updates could be either for the same repo or a different repo. + for (var repo : updated.repos()) { + if (perNodeShardSnapshotCounter.hasCapacityOnAnyNode() == false) { + break; + } + updated = maybeStartAssignedQueuedShardSnapshotsForRepo(repo, initialState, updated, perNodeShardSnapshotCounter); + } + if (changedCount > 0) { logger.trace( "changed cluster state triggered by [{}] snapshot state updates and resulted in starting " + "[{}] shard snapshots", @@ -2400,6 +2514,97 @@ SnapshotsInProgress computeUpdatedState() { return existing; } + private SnapshotsInProgress maybeStartAssignedQueuedShardSnapshotsForRepo( + ProjectRepo projectRepo, + ClusterState clusterState, + SnapshotsInProgress snapshotsInProgress, + PerNodeShardSnapshotCounter perNodeShardSnapshotCounter + ) { + assert perNodeShardSnapshotCounter.hasCapacityOnAnyNode() : "no capacity left on any node " + perNodeShardSnapshotCounter; + final List oldEntries = snapshotsInProgress.forRepo(projectRepo); + if (oldEntries.isEmpty() || oldEntries.stream().allMatch(entry -> entry.hasAssignedQueuedShards() == false)) { + return snapshotsInProgress; + } + final List newEntries = new ArrayList<>(oldEntries.size()); + final Map> completedAssignedQueuedShards = new HashMap<>(); + for (SnapshotsInProgress.Entry entry : oldEntries) { + var newEntry = maybeApplyCompletedAssignedQueuedShardsToEntry(entry, completedAssignedQueuedShards); + + if (newEntry.hasAssignedQueuedShards() && perNodeShardSnapshotCounter.hasCapacityOnAnyNode()) { + final var shardsBuilder = ImmutableOpenMap.builder(newEntry.shards()); + final var changed = maybeStartAssignedQueuedShardSnapshots( + clusterState, + newEntry, + snapshotsInProgress::isNodeIdForRemoval, + shardsBuilder, + perNodeShardSnapshotCounter, + completedAssignedQueuedShards + ); + if (changed) { + newEntry = newEntry.withShardStates(shardsBuilder.build()); + } + } + + newEntries.add(newEntry); + if (newEntry != entry && newEntry.state().completed()) { + newlyCompletedEntries.add(newEntry); + } + } + return snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(projectRepo.projectId(), projectRepo.name(), newEntries); + } + + private boolean maybeStartAssignedQueuedShardSnapshots( + ClusterState clusterState, + SnapshotsInProgress.Entry entry, + Predicate nodeIdRemovalPredicate, + ImmutableOpenMap.Builder shardsBuilder, + PerNodeShardSnapshotCounter perNodeShardSnapshotCounter, + Map> completedAssignedQueuedShards + ) { + assert entry.hasAssignedQueuedShards() : "entry has no assigned queued shards: " + entry; + assert perNodeShardSnapshotCounter.hasCapacityOnAnyNode() : "no capacity left on any node " + perNodeShardSnapshotCounter; + boolean changed = false; + for (var shardId : shardsBuilder.keys()) { + if (perNodeShardSnapshotCounter.hasCapacityOnAnyNode() == false) { + return changed; + } + final var existingShardSnapshotStatus = shardsBuilder.get(shardId); + if (existingShardSnapshotStatus.isAssignedQueued() == false) { + continue; + } + final IndexRoutingTable indexRouting = clusterState.routingTable(entry.projectId()).index(shardId.getIndex()); + final ShardRouting shardRouting; + if (indexRouting == null) { + shardRouting = null; + } else { + shardRouting = indexRouting.shard(shardId.id()).primaryShard(); + } + final var newShardSnapshotStatus = SnapshotsServiceUtils.initShardSnapshotStatus( + existingShardSnapshotStatus.generation(), + shardRouting, + nodeIdRemovalPredicate, + perNodeShardSnapshotCounter + ); + + if (newShardSnapshotStatus.equals(existingShardSnapshotStatus) == false) { + if (newShardSnapshotStatus.state().completed()) { + // It can become complete if the shard is unassigned or deleted, i.e. state == MISSING. + // The same shard may be QUEUED in a later snapshot which needs to be updated as well. + // So we record the status here to let it be processed through another round + final var old = completedAssignedQueuedShards.put(shardId, new Tuple<>(entry.snapshot(), newShardSnapshotStatus)); + assert old == null : shardId + " has unexpected old entry " + old + " conflicting with " + entry; + } + changedCount++; + if (newShardSnapshotStatus.state() == ShardState.INIT) { + startedCount++; + } + shardsBuilder.put(shardId, newShardSnapshotStatus); + changed = true; + } + } + return changed; + } + /** * Sets up the final callback {@link #completionHandler} to run after the {@link MasterService} successfully publishes the batched * update {@link #batchExecutionContext}. Also sets up the callers of the tasks within the {@link #batchExecutionContext} to receive @@ -2437,32 +2642,157 @@ void setupSuccessfulPublicationCallbacks(SnapshotsInProgress snapshotsInProgress *

* NB: this method may remove entries from the underlying data structure to which {@code shardUpdates} refers. */ - private SnapshotsInProgress.Entry applyUpdatesToEntry(SnapshotsInProgress.Entry entry, List shardUpdates) { + private SnapshotsInProgress.Entry applyUpdatesToEntry( + SnapshotsInProgress.Entry entry, + List shardUpdates, + Map> completedAssignedQueuedShards + ) { // Completed snapshots do not require any updates so we just add them to the output list and keep going. // Also we short circuit if there are no more unconsumed updates to apply. if (entry.state().completed() || shardUpdates.isEmpty()) { return entry; } - return new EntryContext(entry, shardUpdates).computeUpdatedSnapshotEntryFromShardUpdates(); + return new EntryContext( + initialState, + entry, + shardUpdates, + executedUpdates, + perNodeShardSnapshotCounter, + () -> changedCount++, + () -> startedCount++, + shardsBuilder -> { + if (entry.hasAssignedQueuedShards() && perNodeShardSnapshotCounter.hasCapacityOnAnyNode()) { + maybeStartAssignedQueuedShardSnapshots( + initialState, + entry, + nodeIdRemovalPredicate, + shardsBuilder, + perNodeShardSnapshotCounter, + completedAssignedQueuedShards + ); + } + } + ).computeUpdatedSnapshotEntryFromShardUpdates(); + } + + private SnapshotsInProgress.Entry maybeApplyCompletedAssignedQueuedShardsToEntry( + SnapshotsInProgress.Entry entry, + Map> completedAssignedQueuedShards + ) { + return EntryContext.maybeUpdateEntryWithCompletedAssignedQueuedShards( + initialState, + entry, + completedAssignedQueuedShards, + perNodeShardSnapshotCounter, + () -> changedCount++, + () -> startedCount++ + ); } // Per snapshot entry state - private final class EntryContext { + private static final class EntryContext { + + private final ClusterState initialState; + + // tests whether node IDs are currently marked for removal + private final Predicate nodeIdRemovalPredicate; private final SnapshotsInProgress.Entry entry; /** iterator containing the updates yet to be applied to #entry */ private final Iterator updatesIterator; + /** Updates that were used to update an existing in-progress shard snapshot */ + private final Set executedUpdates; + + private final PerNodeShardSnapshotCounter perNodeShardSnapshotCounter; + + private final Runnable onChanged; + private final Runnable onStarted; + + /** + * Callback to start any assigned-queued shards for the entry in case the updates release any node capacity. + */ + private final Consumer> assignedQueuedShardsStarter; + /** builder for updated shard snapshot status mappings if any could be computed */ private ImmutableOpenMap.Builder shardsBuilder = null; /** builder for updated shard clone status mappings if any could be computed */ private ImmutableOpenMap.Builder clonesBuilder = null; - EntryContext(SnapshotsInProgress.Entry entry, List shardSnapshotUpdates) { + EntryContext( + ClusterState initialState, + SnapshotsInProgress.Entry entry, + List shardSnapshotUpdates, + Set executedUpdates, + PerNodeShardSnapshotCounter perNodeShardSnapshotCounter, + Runnable onChanged, + Runnable onStarted, + Consumer> assignedQueuedShardsStarter + ) { + this.initialState = initialState; + this.nodeIdRemovalPredicate = SnapshotsInProgress.get(initialState)::isNodeIdForRemoval; this.entry = entry; this.updatesIterator = shardSnapshotUpdates.iterator(); + this.executedUpdates = executedUpdates; + this.perNodeShardSnapshotCounter = perNodeShardSnapshotCounter; + this.onChanged = onChanged; + this.onStarted = onStarted; + this.assignedQueuedShardsStarter = assignedQueuedShardsStarter; + } + + /** + * Update the entry with assigned-queued shard snapshots which are completed either due to snapshot deletion + * or unassigned shards. The completion needs to be propagated to later entries: (1) A shard snapshot may start + * (INIT) if the assigned-queued one is completed due to snapshot deletion or (2) Shard snapshots all complete + * with MISSING if the shard is unassigned. + * @param initialState Cluster state associated to this update + * @param entry The snapshot/clone entry to update + * @param completedAssignedQueuedShards The map of completed assigned-queued shards and associated snapshot + * @param perNodeShardSnapshotCounter The counter to track per-node shard snapshot limits + * @param onChanged Callback when any shard snapshot of the entry changes state + * @param onStarted Callback when any shard snapshot of the entry changes state to INIT + * @return The update snapshot/clone entry + */ + static SnapshotsInProgress.Entry maybeUpdateEntryWithCompletedAssignedQueuedShards( + ClusterState initialState, + SnapshotsInProgress.Entry entry, + Map> completedAssignedQueuedShards, + PerNodeShardSnapshotCounter perNodeShardSnapshotCounter, + Runnable onChanged, + Runnable onStarted + ) { + if (entry.state().completed() || completedAssignedQueuedShards.isEmpty()) { + return entry; + } + final List shardUpdates = completedAssignedQueuedShards.entrySet() + .stream() + .map(e -> new ShardSnapshotUpdate(e.getValue().v1(), e.getKey(), null, e.getValue().v2(), ActionListener.noop())) + .collect(Collectors.toList()); + assert shardUpdates.stream() + .map(shardSnapshotUpdate -> shardSnapshotUpdate.updatedState.state()) + .allMatch(state -> state.completed() && state.failed()) + : "expected all updates to be completed and failed, but got " + shardUpdates; + final SnapshotsInProgress.Entry newEntry = new EntryContext( + initialState, + entry, + shardUpdates, + Set.copyOf(shardUpdates), // intentionally immutable since all updates are from earlier snapshots + perNodeShardSnapshotCounter, + onChanged, + onStarted, + // No need to kick off any assigned-queue shards since we are not releasing any capacity with these updates + ignore -> {} + ).computeUpdatedSnapshotEntryFromShardUpdates(); + // Remove updates that are consumed + completedAssignedQueuedShards.keySet() + .retainAll( + shardUpdates.stream() + .map(shardSnapshotUpdate -> shardSnapshotUpdate.shardId) + .collect(Collectors.toUnmodifiableSet()) + ); + return newEntry; } /** @@ -2503,6 +2833,12 @@ SnapshotsInProgress.Entry computeUpdatedSnapshotEntryFromShardUpdates() { + clonesBuilder + " as well as " + shardsBuilder; + + // Shard snapshots changed status for this entry, check within the snapshot to see whether any previously limited + // shard snapshots can now start due to newly completed ones. This is only necessary if the entry has any + // assigned-queued shards before the update. If the entry gets any new assigned-queued shards from processing the + // update, they cannot be started anyway because they already reflect the latest node capacities. + assignedQueuedShardsStarter.accept(shardsBuilder); return entry.withShardStates(shardsBuilder.build()); } else if (clonesBuilder != null) { return entry.withClones(clonesBuilder.build()); @@ -2518,13 +2854,12 @@ SnapshotsInProgress.Entry computeUpdatedSnapshotEntryFromShardUpdates() { * @param nodeId node id to execute started operation on * @param generation shard generation to base started operation on * @param shardId shard identifier of shard to start operation for - * @param either {@link ShardId} for snapshots or {@link RepositoryShardId} for clones */ - private void startShardOperation( - ImmutableOpenMap.Builder newStates, + private void startShardOperation( + ImmutableOpenMap.Builder newStates, String nodeId, ShardGeneration generation, - T shardId + RepositoryShardId shardId ) { startShardOperation(newStates, shardId, new ShardSnapshotStatus(nodeId, generation)); } @@ -2551,7 +2886,7 @@ private void startShardOperation( ); newStates.put(shardId, newState); updatesIterator.remove(); - startedCount++; + onStarted.run(); } /** @@ -2623,17 +2958,28 @@ private void applyShardSnapshotUpdate( assert updatedShardSnapshotStatus.isActive() == false : updatedShardSnapshotStatus; } + if (shardSnapshotStatusUpdate.isClone() == false && changeReleasesDataNode(existing, updatedShardSnapshotStatus)) { + perNodeShardSnapshotCounter.completeShardSnapshotOnNode(updatedShardSnapshotStatus.nodeId()); + } logger.trace( "[{}] Updating shard [{}] with status [{}]", shardSnapshotStatusUpdate.snapshot, shardSnapshotId, updatedShardSnapshotStatus.state() ); - changedCount++; + onChanged.run(); newShardSnapshotStatusesBuilder.put(shardSnapshotId, updatedShardSnapshotStatus); executedUpdates.add(shardSnapshotStatusUpdate); } + private boolean changeReleasesDataNode(ShardSnapshotStatus previous, ShardSnapshotStatus current) { + // Previous running or aborting shard snapshot releases data node capacity once it is completed or paused + if (previous.state() == ShardState.INIT || previous.state() == ShardState.ABORTED) { + return current.state().completed() || current.state() == ShardState.PAUSED_FOR_NODE_REMOVAL; + } + return false; + } + private void tryStartNextTaskAfterCloneUpdated(RepositoryShardId repoShardId, ShardSnapshotStatus updatedState) { // the update was already executed on the clone operation it applied to, now we check if it may be possible to // start a shard snapshot or clone operation on the current entry @@ -2697,12 +3043,16 @@ private void startShardSnapshot(RepositoryShardId repoShardId, ShardGeneration g final ShardSnapshotStatus shardSnapshotStatus = SnapshotsServiceUtils.initShardSnapshotStatus( generation, shardRouting, - nodeIdRemovalPredicate + nodeIdRemovalPredicate, + perNodeShardSnapshotCounter ); final ShardId routingShardId = shardRouting != null ? shardRouting.shardId() : new ShardId(index, repoShardId.shardId()); if (shardSnapshotStatus.isActive()) { startShardOperation(shardsBuilder(), routingShardId, shardSnapshotStatus); } else { + if (shardSnapshotStatus.isAssignedQueued()) { + updatesIterator.remove(); + } // update to queued snapshot did not result in an actual update execution so we just record it but keep applying // the update to e.g. fail all snapshots for a given shard if the primary for the shard went away shardsBuilder().put(routingShardId, shardSnapshotStatus); @@ -3110,7 +3460,9 @@ public ClusterState execute(BatchExecutionContext batchExecutionCo final ClusterState state = batchExecutionContext.initialState(); final SnapshotShardsUpdateContext shardsUpdateContext = new SnapshotShardsUpdateContext( batchExecutionContext, - shardSnapshotUpdateCompletionHandler + shardSnapshotUpdateCompletionHandler, + shardSnapshotPerNodeLimit, + isStateless ); final SnapshotsInProgress initialSnapshots = SnapshotsInProgress.get(state); @@ -3307,7 +3659,8 @@ private SnapshotsInProgress createSnapshot( indexIds.values(), SnapshotsServiceUtils.useShardGenerations(version), repositoryData, - repositoryName + repositoryName, + new PerNodeShardSnapshotCounter(shardSnapshotPerNodeLimit, snapshotsInProgress, projectState.cluster().nodes(), isStateless) ); if (request.partial() == false) { Set missing = new TreeSet<>(); // sorted for more usable message diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsServiceUtils.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsServiceUtils.java index 048c3f86d5613..e505b561144c0 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsServiceUtils.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsServiceUtils.java @@ -211,7 +211,7 @@ public static boolean assertNoDanglingSnapshots(ClusterState state) { if (value.equals(SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)) { assert reposWithRunningDelete.contains(new ProjectRepo(entry.projectId(), entry.repository())) : "Found shard snapshot waiting to be assigned in [" + entry + "] but it is not blocked by any running delete"; - } else if (value.isActive()) { + } else if (value.isActiveOrAssignedQueued()) { assert reposWithRunningDelete.contains(new ProjectRepo(entry.projectId(), entry.repository())) == false : "Found shard snapshot actively executing in [" + entry @@ -268,13 +268,13 @@ public static boolean supportsNodeRemovalTracking(ClusterState clusterState) { * @param entry snapshot entry * @return true if entry is currently writing to the repository */ - public static boolean isWritingToRepository(SnapshotsInProgress.Entry entry) { + public static boolean isWritingToRepositoryOrAssignedQueued(SnapshotsInProgress.Entry entry) { if (entry.state().completed()) { // Entry is writing to the repo because it's finalizing on master return true; } for (SnapshotsInProgress.ShardSnapshotStatus value : entry.shardSnapshotStatusByRepoShardId().values()) { - if (value.isActive()) { + if (value.isActiveOrAssignedQueued()) { // Entry is writing to the repo because it's writing to a shard on a data node or waiting to do so for a concrete shard return true; } @@ -283,7 +283,7 @@ public static boolean isWritingToRepository(SnapshotsInProgress.Entry entry) { } public static boolean isQueued(@Nullable SnapshotsInProgress.ShardSnapshotStatus status) { - return status != null && status.state() == SnapshotsInProgress.ShardState.QUEUED; + return status != null && status.isUnassignedQueued(); } public static FinalizeSnapshotContext.UpdatedShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) { @@ -393,6 +393,7 @@ public static List currentSnapshots( * failed shard snapshots on the same shard IDs. * * @param nodeIdRemovalPredicate identify any nodes that are marked for removal / in shutdown mode + * @param perNodeShardSnapshotCounter The counter to keep track the number of shard snapshots running per node * @param knownFailures already known failed shard snapshots, but more may be found in this method * @return an updated map of shard statuses */ @@ -401,6 +402,7 @@ public static ImmutableOpenMap RoutingTable routingTable, DiscoveryNodes nodes, Predicate nodeIdRemovalPredicate, + PerNodeShardSnapshotCounter perNodeShardSnapshotCounter, Map knownFailures ) { assert snapshotEntry.isClone() == false : "clones take a different path"; @@ -411,7 +413,7 @@ public static ImmutableOpenMap .entrySet()) { SnapshotsInProgress.ShardSnapshotStatus shardStatus = shardSnapshotEntry.getValue(); ShardId shardId = snapshotEntry.shardId(shardSnapshotEntry.getKey()); - if (shardStatus.equals(SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)) { + if (shardStatus.isUnassignedQueued() || shardStatus.isAssignedQueued()) { // this shard snapshot is waiting for a previous snapshot to finish execution for this shard final SnapshotsInProgress.ShardSnapshotStatus knownFailure = knownFailures.get(shardSnapshotEntry.getKey()); if (knownFailure == null) { @@ -420,14 +422,30 @@ public static ImmutableOpenMap // shard became unassigned while queued after a delete or clone operation so we can fail as missing here assert snapshotEntry.partial(); snapshotChanged = true; - logger.debug("failing snapshot of shard [{}] because index got deleted", shardId); - shards.put(shardId, SnapshotsInProgress.ShardSnapshotStatus.MISSING); - knownFailures.put(shardSnapshotEntry.getKey(), SnapshotsInProgress.ShardSnapshotStatus.MISSING); + logger.debug("failing snapshot of shard [{}] with status [{}] because index got deleted", shardId, shardStatus); + final SnapshotsInProgress.ShardSnapshotStatus newShardStatus = shardStatus.isAssignedQueued() + ? new SnapshotsInProgress.ShardSnapshotStatus( + shardStatus.nodeId(), + SnapshotsInProgress.ShardState.FAILED, + shardStatus.generation(), + "shard is deleted" + ) + : SnapshotsInProgress.ShardSnapshotStatus.MISSING; + shards.put(shardId, newShardStatus); + knownFailures.put(shardSnapshotEntry.getKey(), newShardStatus); } else { // if no failure is known for the shard we keep waiting shards.put(shardId, shardStatus); } } else { + assert shardStatus.isAssignedQueued() == false + : shardId + + " with status " + + shardStatus + + " has unexpected known failure " + + knownFailure + + " in snapshot entry " + + snapshotEntry; // If a failure is known for an execution we waited on for this shard then we fail with the same exception here // as well snapshotChanged = true; @@ -465,7 +483,17 @@ public static ImmutableOpenMap Starting shard [{}] with shard generation [{}] that we were waiting to start on node [{}]. Previous \ shard state [{}] """, shardId, shardStatus.generation(), shardStatus.nodeId(), shardStatus.state()); - shards.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primaryNodeId, shardStatus.generation())); + if (perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(primaryNodeId)) { + shards.put( + shardId, + new SnapshotsInProgress.ShardSnapshotStatus(primaryNodeId, shardStatus.generation()) + ); + } else { + shards.put( + shardId, + SnapshotsInProgress.ShardSnapshotStatus.assignedQueued(primaryNodeId, shardStatus.generation()) + ); + } continue; } else if (shardRouting.primaryShard().initializing() || shardRouting.primaryShard().relocating()) { // Shard that we were waiting for hasn't started yet or still relocating - will continue to wait @@ -492,6 +520,9 @@ public static ImmutableOpenMap // TODO: Restart snapshot on another node? snapshotChanged = true; logger.warn("failing snapshot of shard [{}] on departed node [{}]", shardId, shardStatus.nodeId()); + // This can move an INIT shard directly to FAILED (completed) but does not release capacity since the node is gone. + // Hence there is no need to kick off assigned-queued shards. In fact, all assigned-queued or unassigned-queued + // shards on the same node should all be moved to FAILED state in this update. final SnapshotsInProgress.ShardSnapshotStatus failedState = new SnapshotsInProgress.ShardSnapshotStatus( shardStatus.nodeId(), SnapshotsInProgress.ShardState.FAILED, @@ -522,7 +553,7 @@ public static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snaps .entrySet()) { final SnapshotsInProgress.ShardState state = shardStatus.getValue().state(); if (state != SnapshotsInProgress.ShardState.WAITING - && state != SnapshotsInProgress.ShardState.QUEUED + && shardStatus.getValue().isUnassignedQueued() == false && state != SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL) { continue; } @@ -628,7 +659,9 @@ public static Tuple> ready final var projectRepo = new ProjectRepo(entry.projectId(), entry.repository()); if (repositoriesSeen.add(projectRepo) && entry.state() == SnapshotDeletionsInProgress.State.WAITING - && snapshotsInProgress.forRepo(projectRepo).stream().noneMatch(SnapshotsServiceUtils::isWritingToRepository)) { + && snapshotsInProgress.forRepo(projectRepo) + .stream() + .noneMatch(SnapshotsServiceUtils::isWritingToRepositoryOrAssignedQueued)) { changed = true; final SnapshotDeletionsInProgress.Entry newEntry = entry.started(); readyDeletions.add(newEntry); @@ -957,6 +990,7 @@ public static void completeListenersIgnoringException(@Nullable List shards( @@ -966,7 +1000,8 @@ public static ImmutableOpenMap Collection indices, boolean useShardGenerations, RepositoryData repositoryData, - String repoName + String repoName, + PerNodeShardSnapshotCounter perNodeShardSnapshotCounter ) { ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); final ShardGenerations shardGenerations = repositoryData.shardGenerations(); @@ -1010,7 +1045,8 @@ public static ImmutableOpenMap shardSnapshotStatus = initShardSnapshotStatus( shardRepoGeneration, indexRoutingTable.shard(i).primaryShard(), - snapshotsInProgress::isNodeIdForRemoval + snapshotsInProgress::isNodeIdForRemoval, + perNodeShardSnapshotCounter ); } builder.put(shardId, shardSnapshotStatus); @@ -1032,7 +1068,8 @@ public static ImmutableOpenMap public static SnapshotsInProgress.ShardSnapshotStatus initShardSnapshotStatus( ShardGeneration shardRepoGeneration, ShardRouting primary, - Predicate nodeIdRemovalPredicate + Predicate nodeIdRemovalPredicate, + PerNodeShardSnapshotCounter perNodeShardSnapshotCounter ) { SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus; if (primary == null || primary.assignedToNode() == false) { @@ -1062,7 +1099,11 @@ public static SnapshotsInProgress.ShardSnapshotStatus initShardSnapshotStatus( "primary shard hasn't been started yet" ); } else { - shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration); + if (perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(primary.currentNodeId())) { + shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration); + } else { + shardSnapshotStatus = SnapshotsInProgress.ShardSnapshotStatus.assignedQueued(primary.currentNodeId(), shardRepoGeneration); + } } return shardSnapshotStatus; } diff --git a/server/src/main/resources/transport/upper_bounds/8.18.csv b/server/src/main/resources/transport/upper_bounds/8.18.csv index 4eb5140004ea6..266bfbbd3bf78 100644 --- a/server/src/main/resources/transport/upper_bounds/8.18.csv +++ b/server/src/main/resources/transport/upper_bounds/8.18.csv @@ -1 +1 @@ -initial_elasticsearch_8_18_6,8840008 +transform_check_for_dangling_tasks,8840011 diff --git a/server/src/main/resources/transport/upper_bounds/8.19.csv b/server/src/main/resources/transport/upper_bounds/8.19.csv index 476468b203875..3600b3f8c633a 100644 --- a/server/src/main/resources/transport/upper_bounds/8.19.csv +++ b/server/src/main/resources/transport/upper_bounds/8.19.csv @@ -1 +1 @@ -initial_elasticsearch_8_19_3,8841067 +transform_check_for_dangling_tasks,8841070 diff --git a/server/src/main/resources/transport/upper_bounds/9.0.csv b/server/src/main/resources/transport/upper_bounds/9.0.csv index f8f50cc6d7839..c11e6837bb813 100644 --- a/server/src/main/resources/transport/upper_bounds/9.0.csv +++ b/server/src/main/resources/transport/upper_bounds/9.0.csv @@ -1 +1 @@ -initial_elasticsearch_9_0_6,9000015 +transform_check_for_dangling_tasks,9000018 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index 5a65f2e578156..80b97d85f7511 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_elasticsearch_9_1_4,9112007 +transform_check_for_dangling_tasks,9112009 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index 49360d5e62d69..2147eab66c207 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -inference_api_eis_diagnostics,9156000 +initial_9.2.0,9185000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv new file mode 100644 index 0000000000000..2147eab66c207 --- /dev/null +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -0,0 +1 @@ +initial_9.2.0,9185000 diff --git a/server/src/test/java/org/elasticsearch/snapshots/PerNodeShardSnapshotCounterTests.java b/server/src/test/java/org/elasticsearch/snapshots/PerNodeShardSnapshotCounterTests.java new file mode 100644 index 0000000000000..64bd081cea004 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/PerNodeShardSnapshotCounterTests.java @@ -0,0 +1,374 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryShardId; +import org.elasticsearch.repositories.ShardGeneration; +import org.elasticsearch.repositories.ShardSnapshotResult; +import org.elasticsearch.test.ESTestCase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.elasticsearch.snapshots.SnapshotsInProgressSerializationTests.randomSnapshot; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; + +public class PerNodeShardSnapshotCounterTests extends ESTestCase { + + public void testDisabledWhenLimitIsZero() { + final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter( + 0, + SnapshotsInProgress.EMPTY, + DiscoveryNodes.EMPTY_NODES, + randomBoolean() + ); + assertTrue(perNodeShardSnapshotCounter.hasCapacityOnAnyNode()); + assertTrue(perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(randomIdentifier())); + assertTrue(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(randomIdentifier())); + } + + public void testNodesCapacitiesWithKnownStates() { + final var snapshotNodeId = randomAlphaOfLength(16); + final boolean isStateless = randomBoolean(); + final DiscoveryNodes discoveryNodes = randomDiscoveryNodes(List.of(snapshotNodeId), List.of(randomAlphaOfLength(18)), isStateless); + final int perNodeLimit = 1; // Capacity for only a single shard snapshot + + SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY; + { + final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter( + perNodeLimit, + snapshotsInProgress, + discoveryNodes, + isStateless + ); + // Available capacity for only a single shard snapshot + assertTrue(perNodeShardSnapshotCounter.hasCapacityOnAnyNode()); + assertTrue(perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(snapshotNodeId)); + assertFalse(perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(snapshotNodeId)); + } + + // Clone does not count towards the limit + { + final IndexId indexId = new IndexId(randomIdentifier(), randomUUID()); + final SnapshotsInProgress.Entry cloneEntry = SnapshotsInProgress.startClone( + new Snapshot(ProjectId.DEFAULT, randomIdentifier(), new SnapshotId(randomIdentifier(), randomUUID())), + new SnapshotId(randomIdentifier(), randomUUID()), + Map.of(indexId.getName(), indexId), + randomNonNegativeLong(), + randomNonNegativeLong(), + IndexVersion.current() + ); + snapshotsInProgress.withAddedEntry( + cloneEntry.withClones( + Map.of(new RepositoryShardId(indexId, 0), new ShardSnapshotStatus(snapshotNodeId, new ShardGeneration(1L))) + ) + ); + final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter( + perNodeLimit, + snapshotsInProgress, + discoveryNodes, + isStateless + ); + assertTrue(perNodeShardSnapshotCounter.hasCapacityOnAnyNode()); + } + + // Enumerate states that do not count towards the limit + { + final Map shards = new HashMap<>(); + // Add shards that do not count towards limit + shards.put(randomShardId(), ShardSnapshotStatus.UNASSIGNED_QUEUED); + shards.put(randomShardId(), ShardSnapshotStatus.MISSING); + shards.put( + randomShardId(), + ShardSnapshotStatus.success(snapshotNodeId, new ShardSnapshotResult(new ShardGeneration(1L), ByteSizeValue.ofBytes(1L), 1)) + ); + shards.put( + randomShardId(), + new ShardSnapshotStatus(snapshotNodeId, SnapshotsInProgress.ShardState.WAITING, new ShardGeneration(1L)) + ); + shards.put( + randomShardId(), + new ShardSnapshotStatus(snapshotNodeId, SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL, new ShardGeneration(1L)) + ); + shards.put( + randomShardId(), + new ShardSnapshotStatus( + snapshotNodeId, + SnapshotsInProgress.ShardState.FAILED, + new ShardGeneration(1L), + randomAlphaOfLengthBetween(10, 20) + ) + ); + // assigned-queued shard + shards.put(randomShardId(), ShardSnapshotStatus.assignedQueued(snapshotNodeId, randomFrom(new ShardGeneration(1L), null))); + + final Map indexIds = shards.keySet() + .stream() + .collect(Collectors.toUnmodifiableMap(ShardId::getIndexName, shardId -> new IndexId(shardId.getIndexName(), randomUUID()))); + + snapshotsInProgress = snapshotsInProgress.withAddedEntry( + SnapshotsInProgress.Entry.snapshot( + new Snapshot(ProjectId.DEFAULT, randomIdentifier(), new SnapshotId(randomIdentifier(), randomUUID())), + randomBoolean(), + randomBoolean(), + SnapshotsInProgress.State.STARTED, + indexIds, + List.of(), + List.of(), + randomNonNegativeLong(), + randomNonNegativeLong(), + shards, + null, + Map.of(), + IndexVersion.current() + ) + ); + + final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter( + perNodeLimit, + snapshotsInProgress, + discoveryNodes, + isStateless + ); + assertTrue(perNodeShardSnapshotCounter.hasCapacityOnAnyNode()); + } + + // Shard snapshots with states counting towards the limit + { + // INIT shard + final Map shards = new HashMap<>(); + final ShardId shardId = randomShardId(); + shards.put(shardId, new ShardSnapshotStatus(snapshotNodeId, new ShardGeneration(1L))); + var entry = SnapshotsInProgress.Entry.snapshot( + new Snapshot(ProjectId.DEFAULT, randomIdentifier(), new SnapshotId(randomIdentifier(), randomUUID())), + randomBoolean(), + randomBoolean(), + SnapshotsInProgress.State.STARTED, + Map.of(shardId.getIndexName(), new IndexId(shardId.getIndexName(), randomUUID())), + List.of(), + List.of(), + randomNonNegativeLong(), + randomNonNegativeLong(), + shards, + null, + Map.of(), + IndexVersion.current() + ); + snapshotsInProgress = snapshotsInProgress.withAddedEntry(entry); + var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter( + perNodeLimit, + snapshotsInProgress, + discoveryNodes, + isStateless + ); + assertFalse(perNodeShardSnapshotCounter.hasCapacityOnAnyNode()); // no capacity left due to the INIT shard + + // Aborted shard snapshot that was previously in INIT state still count towards the limit + entry = entry.abort(new HashMap<>()); + assertNotNull(entry); + snapshotsInProgress = snapshotsInProgress.createCopyWithUpdatedEntriesForRepo( + ProjectId.DEFAULT, + entry.repository(), + List.of(entry) + ); + perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter(perNodeLimit, snapshotsInProgress, discoveryNodes, isStateless); + assertFalse(perNodeShardSnapshotCounter.hasCapacityOnAnyNode()); + + // Complete the entry by fail the shard snapshot and the node capacity is released + entry = entry.withShardStates( + Map.of( + shardId, + new ShardSnapshotStatus(snapshotNodeId, SnapshotsInProgress.ShardState.FAILED, new ShardGeneration(1L), "failed") + ) + ); + snapshotsInProgress = snapshotsInProgress.createCopyWithUpdatedEntriesForRepo( + ProjectId.DEFAULT, + entry.repository(), + List.of(entry) + ); + perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter(perNodeLimit, snapshotsInProgress, discoveryNodes, isStateless); + assertTrue(perNodeShardSnapshotCounter.hasCapacityOnAnyNode()); + } + } + + public void testNodesCapacitiesWithRandomSnapshots() { + final var snapshotNodeIds = randomList(1, 5, () -> randomAlphaOfLength(16)); + final var nonSnapshotNodeIds = randomList(0, 5, () -> randomAlphaOfLength(18)); + final boolean isStateless = randomBoolean(); + final DiscoveryNodes discoveryNodes = randomDiscoveryNodes(snapshotNodeIds, nonSnapshotNodeIds, isStateless); + final int perNodeLimit = between(1, 10); + + // Basic test when there is no ongoing snapshots + { + final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter( + perNodeLimit, + SnapshotsInProgress.EMPTY, + discoveryNodes, + isStateless + ); + assertTrue(perNodeShardSnapshotCounter.hasCapacityOnAnyNode()); + + // Always false for non-snapshotting nodes + for (var nonSnapshotNodeId : nonSnapshotNodeIds) { + assertFalse(perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(nonSnapshotNodeId)); + assertFalse(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(nonSnapshotNodeId)); + } + + // Cannot complete where there is nothing running + for (var snapshotNodeId : snapshotNodeIds) { + assertFalse(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(snapshotNodeId)); + } + + // Can start up to the limit + for (int i = 0; i < perNodeLimit; i++) { + for (var snapshotNodeId : snapshotNodeIds) { + assertTrue(perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(snapshotNodeId)); + } + } + // Cannot start beyond the limit + for (var snapshotNodeId : snapshotNodeIds) { + assertFalse(perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(snapshotNodeId)); + } + // Can complete all started snapshots + for (int i = 0; i < perNodeLimit; i++) { + for (var snapshotNodeId : snapshotNodeIds) { + assertTrue(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(snapshotNodeId)); + } + } + // Cannot complete when nothing is running + for (var snapshotNodeId : snapshotNodeIds) { + assertFalse(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(snapshotNodeId)); + } + } + + // random snapshots + { + final int numRepos = between(1, 3); + SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY; + for (int i = 0; i < numRepos; i++) { + final var numSnapshots = between(1, 5); + for (int j = 0; j < numSnapshots; j++) { + final var entry = randomSnapshot(ProjectId.DEFAULT, "repo-" + i, () -> randomFrom(snapshotNodeIds)); + if (entry.hasAssignedQueuedShards()) { + assertTrue(entry.shards().values().stream().anyMatch(ShardSnapshotStatus::isAssignedQueued)); + } else { + assertFalse(entry.shards().values().stream().anyMatch(ShardSnapshotStatus::isAssignedQueued)); + } + snapshotsInProgress = snapshotsInProgress.withAddedEntry(entry); + } + } + + final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter( + perNodeLimit, + snapshotsInProgress, + discoveryNodes, + isStateless + ); + + // Always false for non-snapshotting nodes + for (var nonSnapshotNodeId : nonSnapshotNodeIds) { + assertFalse(perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(nonSnapshotNodeId)); + assertFalse(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(nonSnapshotNodeId)); + } + + for (var snapshotNodeId : snapshotNodeIds) { + final var numRunning = runningShardSnapshotsForNode(snapshotsInProgress, snapshotNodeId); + final var started = perNodeShardSnapshotCounter.tryStartShardSnapshotOnNode(snapshotNodeId); + if (started) { + assertThat(numRunning, lessThan(perNodeLimit)); + if (numRunning > 0) { + assertTrue(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(snapshotNodeId)); + } else { + assertFalse(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(snapshotNodeId)); + } + } else { + assertThat(numRunning, greaterThanOrEqualTo(perNodeLimit)); + assertTrue(perNodeShardSnapshotCounter.completeShardSnapshotOnNode(snapshotNodeId)); + } + } + } + } + + private static DiscoveryNodes randomDiscoveryNodes(List snapshotNodeIds, List nonSnapshotNodeIds, boolean isStateless) { + final Set snapshotNodeRole; + final Set nonSnapshotNodeRole; + if (isStateless) { + snapshotNodeRole = Set.of(DiscoveryNodeRole.INDEX_ROLE); + nonSnapshotNodeRole = Set.of(randomFrom(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.ML_ROLE)); + } else { + snapshotNodeRole = Set.copyOf( + randomNonEmptySubsetOf( + Set.of( + DiscoveryNodeRole.DATA_ROLE, + DiscoveryNodeRole.DATA_HOT_NODE_ROLE, + DiscoveryNodeRole.DATA_WARM_NODE_ROLE, + DiscoveryNodeRole.DATA_COLD_NODE_ROLE, + DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE, + DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE + ) + ) + ); + nonSnapshotNodeRole = Set.copyOf( + randomNonEmptySubsetOf( + Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.ML_ROLE, DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE) + ) + ); + } + + final var nodesBuilder = DiscoveryNodes.builder(); + for (var snapshotNodeId : snapshotNodeIds) { + nodesBuilder.add(DiscoveryNodeUtils.builder(snapshotNodeId).name(snapshotNodeId).roles(snapshotNodeRole).build()); + } + for (var nonSnapshotNodeId : nonSnapshotNodeIds) { + nodesBuilder.add(DiscoveryNodeUtils.builder(nonSnapshotNodeId).name(nonSnapshotNodeId).roles(nonSnapshotNodeRole).build()); + } + final DiscoveryNodes discoveryNodes = nodesBuilder.build(); + return discoveryNodes; + } + + private int runningShardSnapshotsForNode(SnapshotsInProgress snapshotsInProgress, String nodeId) { + int result = 0; + final var allEntries = snapshotsInProgress.asStream().toList(); + for (var entry : allEntries) { + if (entry.isClone() == false && entry.state().completed() == false) { + for (ShardSnapshotStatus shardSnapshot : entry.shards().values()) { + if (nodeId.equals(shardSnapshot.nodeId())) { + if (shardSnapshot.state() == SnapshotsInProgress.ShardState.INIT) { + result++; + } else if (shardSnapshot.state() == SnapshotsInProgress.ShardState.ABORTED + && (shardSnapshot.reason() == null || shardSnapshot.reason().startsWith("assigned-queued aborted") == false)) { + result++; + } + } + } + } + } + return result; + } + + private ShardId randomShardId() { + return new ShardId(new Index(randomAlphaOfLength(12), randomUUID()), between(0, 5)); + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 79d880c5ef5e1..989a3caca9513 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -140,16 +140,16 @@ private ClusterState getClusterStateWithNodeShutdownMetadata(List nodeId ); } - private Entry randomSnapshot() { - return randomSnapshot(randomProjectIdOrDefault()); + public static Entry randomSnapshot() { + return randomSnapshot(randomProjectIdOrDefault(), "repo-" + randomInt(5), () -> randomAlphaOfLength(10)); } - private Entry randomSnapshot(ProjectId projectId) { - Snapshot snapshot = new Snapshot( - projectId, - "repo-" + randomInt(5), - new SnapshotId(randomAlphaOfLength(10), randomAlphaOfLength(10)) - ); + public static Entry randomSnapshot(ProjectId projectId) { + return randomSnapshot(projectId, "repo-" + randomInt(5), () -> randomAlphaOfLength(10)); + } + + public static Entry randomSnapshot(ProjectId projectId, String repoName, Supplier nodeIdSupplier) { + Snapshot snapshot = new Snapshot(projectId, repoName, new SnapshotId(randomAlphaOfLength(10), randomAlphaOfLength(10))); boolean includeGlobalState = randomBoolean(); boolean partial = randomBoolean(); int numberOfIndices = randomIntBetween(0, 10); @@ -166,7 +166,7 @@ private Entry randomSnapshot(ProjectId projectId) { for (Index idx : esIndices) { int shardsCount = randomIntBetween(1, 10); for (int j = 0; j < shardsCount; j++) { - shards.put(new ShardId(idx, j), randomShardSnapshotStatus(randomAlphaOfLength(10))); + shards.put(new ShardId(idx, j), randomShardSnapshotStatus(nodeIdSupplier.get())); } } List featureStates = randomList(5, SnapshotFeatureInfoTests::randomSnapshotFeatureInfo); @@ -187,10 +187,14 @@ private Entry randomSnapshot(ProjectId projectId) { ); } - private SnapshotsInProgress.ShardSnapshotStatus randomShardSnapshotStatus(String nodeId) { + public static SnapshotsInProgress.ShardSnapshotStatus randomShardSnapshotStatus(String nodeId) { ShardState shardState = randomFrom(ShardState.values()); if (shardState == ShardState.QUEUED) { - return SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED; + if (randomBoolean()) { + return SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED; + } else { + return SnapshotsInProgress.ShardSnapshotStatus.assignedQueued(nodeId, randomBoolean() ? new ShardGeneration(1L) : null); + } } else if (shardState == ShardState.SUCCESS) { final ShardSnapshotResult shardSnapshotResult = new ShardSnapshotResult(new ShardGeneration(1L), ByteSizeValue.ofBytes(1L), 1); return SnapshotsInProgress.ShardSnapshotStatus.success(nodeId, shardSnapshotResult); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index 7181cda1b5e0b..43a5e058b3dde 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -744,7 +744,9 @@ private static ClusterState applyUpdates(ClusterState state, SnapshotsService.Sn final SnapshotsInProgress existing = SnapshotsInProgress.get(batchExecutionContext.initialState()); final var context = new SnapshotsService.SnapshotShardsUpdateContext( batchExecutionContext, - /* on completion handler */ (shardSnapshotUpdateResult, newlyCompletedEntries, updatedRepositories) -> {} + /* on completion handler */ (shardSnapshotUpdateResult, newlyCompletedEntries, updatedRepositories) -> {}, + 0, + false ); final SnapshotsInProgress updated = context.computeUpdatedState(); context.setupSuccessfulPublicationCallbacks(updated);