diff --git a/docs/changelog/130300.yaml b/docs/changelog/130300.yaml new file mode 100644 index 0000000000000..6a1647b9c04a5 --- /dev/null +++ b/docs/changelog/130300.yaml @@ -0,0 +1,5 @@ +pr: 130300 +summary: Waiting for desired allocation before starting shard snapshot +area: Snapshot/Restore +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java index 24539a270c03e..458f477f9ee42 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotsServiceIT.java @@ -11,27 +11,217 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; +import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardAssignment; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.transport.MockTransportService; +import java.util.Collection; import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; public class SnapshotsServiceIT extends AbstractSnapshotIntegTestCase { + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class); + } + + public void testShardSnapshotWaitForDesiredAllocation() throws Exception { + createRepository("test-repo", "fs"); + internalCluster().ensureAtLeastNumDataNodes(3); + final int numDataNodes = internalCluster().numDataNodes(); + final var indexName = "index"; + createIndex(indexName, indexSettings(numDataNodes, 0).build()); + indexRandomDocs(indexName, between(42, 100)); + ensureGreen(indexName); + final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final Set dataNodeIds = clusterService.state().nodes().getDataNodes().keySet(); + logger.info("--> all data nodes: {}", dataNodeIds); + + final Index index = clusterService().state().routingTable(ProjectId.DEFAULT).index(indexName).getIndex(); + final var shardsAllocator = (DesiredBalanceShardsAllocator) internalCluster().getCurrentMasterNodeInstance(ShardsAllocator.class); + + // Get all the node IDs that hosting at least one shard of the index + final var initialNodeIdsForAllocation = internalCluster().nodesInclude(indexName) + .stream() + .map(ESIntegTestCase::getNodeId) + .collect(Collectors.toUnmodifiableSet()); + + // Delay shard snapshot status updates so that we control when snapshot is completed + final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName()); + final Set> updateShardRunnables = ConcurrentCollections.newConcurrentSet(); + final var allRunnablesReadyLatch = new AtomicReference<>(new CountDownLatch(1)); + masterTransportService.addRequestHandlingBehavior( + SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, + (handler, request, channel, task) -> { + final var updateRequest = asInstanceOf(UpdateIndexShardSnapshotStatusRequest.class, request); + if (updateRequest.shardId().getIndex().equals(index)) { + updateShardRunnables.add(() -> handler.messageReceived(request, channel, task)); + if (updateShardRunnables.size() == numDataNodes) { + allRunnablesReadyLatch.get().countDown(); + } + } else { + handler.messageReceived(request, channel, task); + } + } + ); + + // Start snap-0 asynchronously and it will be delayed at shard snapshot status updates + safeGet( + client().admin() + .cluster() + .prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "snap-0") + .setWaitForCompletion(false) + .setIndices(indexName) + .execute() + ); + safeAwait(allRunnablesReadyLatch.get()); + + // Start snap-1 asynchronously and all shards will be queued because snap-0 is in progress + safeGet( + client().admin() + .cluster() + .prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "snap-1") + .setWaitForCompletion(false) + .setIndices(indexName) + .execute() + ); + + // Ensure all shards of snap-1 are queued + safeAwait(ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> { + final var snapshotsInProgress = SnapshotsInProgress.get(clusterState); + final SnapshotsInProgress.Entry snapshotEntry = snapshotsInProgress.asStream() + .filter(entry -> entry.snapshot().getSnapshotId().getName().equals("snap-1")) + .findFirst() + .orElse(null); + if (snapshotEntry == null) { + return false; + } + final List states = snapshotEntry.shards() + .values() + .stream() + .map(SnapshotsInProgress.ShardSnapshotStatus::state) + .toList(); + if (states.size() != numDataNodes) { + return false; + } + return states.stream().allMatch(state -> state == SnapshotsInProgress.ShardState.QUEUED); + })); + + // Create undesired allocation by allowing allocation on only one node + final String nodeIdAllowAllocation = randomFrom(dataNodeIds); + logger.info("--> update settings to allow allocation on node [{}]", nodeIdAllowAllocation); + updateIndexSettings( + Settings.builder() + .put( + "index.routing.allocation.exclude._id", + Strings.collectionToCommaDelimitedString( + dataNodeIds.stream().filter(nodeId -> nodeId.equals(nodeIdAllowAllocation) == false).toList() + ) + ) + ); + ClusterRerouteUtils.reroute(client()); + // Desired balance should be updated to allow allocation on nodeIdAllowAllocation + final DesiredBalance desiredBalance = shardsAllocator.getDesiredBalance(); + for (int j = 0; j < numDataNodes; j++) { + final ShardAssignment assignment = desiredBalance.getAssignment(new ShardId(index, j)); + assertThat(assignment.nodeIds(), contains(nodeIdAllowAllocation)); + logger.info("--> after update settings shard [{}] Assignment {}", j, assignment); + } + + // All shards are still not moved because they are locked down by snap-0 + assertThat( + internalCluster().nodesInclude(indexName).stream().map(ESIntegTestCase::getNodeId).collect(Collectors.toUnmodifiableSet()), + equalTo(initialNodeIdsForAllocation) + ); + final var hostingNodeIds = internalCluster().nodesInclude(indexName) + .stream() + .map(ESIntegTestCase::getNodeId) + .collect(Collectors.toUnmodifiableSet()); + + // Add a listener to ensure shards of snap-1 going through the state of waiting for desired allocation + final var waitingForDesiredAllocationListener = ClusterServiceUtils.addTemporaryStateListener(clusterService, state -> { + final var snapshotsInProgress = SnapshotsInProgress.get(state); + final SnapshotsInProgress.Entry snapshotEntry = snapshotsInProgress.asStream() + .filter(entry -> entry.snapshot().getSnapshotId().getName().equals("snap-1")) + .findFirst() + .orElseThrow(); + return snapshotEntry.shards().entrySet().stream().anyMatch(entry -> entry.getValue().isWaitingForDesiredAllocation()); + }); + + // Complete snap-0 to allow relocation of shards + final var snap0UpdateShardRunnables = Set.copyOf(updateShardRunnables); + updateShardRunnables.clear(); + allRunnablesReadyLatch.set(new CountDownLatch(1)); + for (CheckedRunnable runnable : snap0UpdateShardRunnables) { + runnable.run(); + } + + // We should see shards of snap-1 change to waiting for desired allocation as an intermediate state + safeAwait(waitingForDesiredAllocationListener); + + // Wait for snap-0 to be ready to complete. This means relocations are completed. + safeAwait(allRunnablesReadyLatch.get()); + + // Shards should be relocated to the desired node before snap-1 is completed + assertThat( + internalCluster().nodesInclude(indexName).stream().map(ESIntegTestCase::getNodeId).collect(Collectors.toUnmodifiableSet()), + contains(nodeIdAllowAllocation) + ); + + // Let snap-1 complete + for (var runnable : updateShardRunnables) { + runnable.run(); + } + safeAwait(ClusterServiceUtils.addTemporaryStateListener(clusterService, state -> SnapshotsInProgress.get(state).isEmpty())); + + // Both snapshots are completed successfully + final var getSnapshotsResponse = safeGet( + client().admin().cluster().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, "test-repo", "snap-*").execute() + ); + assertThat( + getSnapshotsResponse.getSnapshots().stream().map(info -> info.snapshotId().getName()).toList(), + containsInAnyOrder("snap-0", "snap-1") + ); + assertTrue(getSnapshotsResponse.getSnapshots().stream().map(SnapshotInfo::state).allMatch(state -> state == SnapshotState.SUCCESS)); + } + public void testDeletingSnapshotsIsLoggedAfterClusterStateIsProcessed() throws Exception { createRepository("test-repo", "fs"); createIndexWithRandomDocs("test-index", randomIntBetween(1, 42)); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 5633bd8b89e1e..fd4aa6a7c40ce 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory; +import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction; import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory; @@ -104,6 +105,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -144,7 +146,8 @@ public ClusterModule( SystemIndices systemIndices, ProjectResolver projectResolver, WriteLoadForecaster writeLoadForecaster, - TelemetryProvider telemetryProvider + TelemetryProvider telemetryProvider, + Supplier> reconciledDesiredBalancerConsumer ) { this.clusterPlugins = clusterPlugins; this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins); @@ -170,7 +173,8 @@ public ClusterModule( this::reconcile, writeLoadForecaster, telemetryProvider, - nodeAllocationStatsAndWeightsCalculator + nodeAllocationStatsAndWeightsCalculator, + reconciledDesiredBalancerConsumer ); this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices, projectResolver); @@ -480,7 +484,8 @@ private static ShardsAllocator createShardsAllocator( DesiredBalanceReconcilerAction reconciler, WriteLoadForecaster writeLoadForecaster, TelemetryProvider telemetryProvider, - NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator + NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator, + Supplier> reconciledDesiredBalancerConsumer ) { Map> allocators = new HashMap<>(); allocators.put( @@ -496,7 +501,8 @@ private static ShardsAllocator createShardsAllocator( clusterService, reconciler, telemetryProvider, - nodeAllocationStatsAndWeightsCalculator + nodeAllocationStatsAndWeightsCalculator, + reconciledDesiredBalancerConsumer ) ); diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 8390c00fc420a..e9baed6c878b4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -721,6 +721,8 @@ public record ShardSnapshotStatus( "missing index" ); + public static final String WAITING_FOR_DESIRED_ALLOCATION = "waiting for desired allocation"; + /** * Initializes status with state {@link ShardState#INIT}. */ @@ -747,6 +749,14 @@ public static ShardSnapshotStatus success(String nodeId, ShardSnapshotResult sha return new ShardSnapshotStatus(nodeId, ShardState.SUCCESS, shardSnapshotResult.getGeneration(), null, shardSnapshotResult); } + public static ShardSnapshotStatus waitForDesiredAllocation(ShardGeneration shardGeneration) { + return new ShardSnapshotStatus(null, ShardState.WAITING, shardGeneration, WAITING_FOR_DESIRED_ALLOCATION); + } + + public boolean isWaitingForDesiredAllocation() { + return nodeId == null && state == ShardState.WAITING && WAITING_FOR_DESIRED_ALLOCATION.equals(reason); + } + public ShardSnapshotStatus( @Nullable String nodeId, ShardState state, @@ -765,8 +775,9 @@ public ShardSnapshotStatus( private boolean assertConsistent() { // If the state is failed we have to have a reason for this failure assert state.failed() == false || reason != null; - assert (state != ShardState.INIT && state != ShardState.WAITING && state != ShardState.PAUSED_FOR_NODE_REMOVAL) - || nodeId != null : "Null node id for state [" + state + "]"; + assert isWaitingForDesiredAllocation() + || (state != ShardState.INIT && state != ShardState.WAITING && state != ShardState.PAUSED_FOR_NODE_REMOVAL) + || nodeId != null : "Mis-match node id [" + nodeId + "] for state [" + state + "]"; assert state != ShardState.QUEUED || (nodeId == null && generation == null && reason == null) : "Found unexpected non-null values for queued state shard nodeId[" + nodeId + "][" + generation + "][" + reason + "]"; assert state == ShardState.SUCCESS || shardSnapshotResult == null; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 515da761d8696..21a0f9eb823e4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -48,6 +48,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Supplier; /** * A {@link ShardsAllocator} which asynchronously refreshes the desired balance held by the {@link DesiredBalanceComputer} and then takes @@ -90,6 +92,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator { private volatile boolean resetCurrentDesiredBalance = false; private final Set processedNodeShutdowns = new HashSet<>(); private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator; + private final Supplier> reconciledDesiredBalancerConsumer; private final DesiredBalanceMetrics desiredBalanceMetrics; /** * Manages balancer round results in order to report on the balancer activity in a configurable manner. @@ -116,7 +119,8 @@ public DesiredBalanceShardsAllocator( ClusterService clusterService, DesiredBalanceReconcilerAction reconciler, TelemetryProvider telemetryProvider, - NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator + NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator, + Supplier> reconciledDesiredBalancerConsumer ) { this( delegateAllocator, @@ -125,7 +129,8 @@ public DesiredBalanceShardsAllocator( new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator), reconciler, telemetryProvider, - nodeAllocationStatsAndWeightsCalculator + nodeAllocationStatsAndWeightsCalculator, + reconciledDesiredBalancerConsumer ); } @@ -136,10 +141,12 @@ public DesiredBalanceShardsAllocator( DesiredBalanceComputer desiredBalanceComputer, DesiredBalanceReconcilerAction reconciler, TelemetryProvider telemetryProvider, - NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator + NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator, + Supplier> reconciledDesiredBalancerConsumer ) { this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry()); this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator; + this.reconciledDesiredBalancerConsumer = reconciledDesiredBalancerConsumer; this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService(threadPool, clusterService.getClusterSettings()); this.delegateAllocator = delegateAllocator; this.threadPool = threadPool; @@ -473,6 +480,7 @@ public ClusterState execute(BatchExecutionContext b var latest = findLatest(batchExecutionContext.taskContexts()); var newState = applyBalance(batchExecutionContext, latest); discardSupersededTasks(batchExecutionContext.taskContexts(), latest); + reconciledDesiredBalancerConsumer.get().accept(latest.getTask().desiredBalance); return newState; } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 73a2efabb9e81..321612f844a70 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -61,6 +61,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; +import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.version.CompatibilityVersions; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -239,6 +240,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -754,6 +756,7 @@ private void construct( repositoriesService, rerouteServiceReference::get ); + final SetOnce> reconciliationFuncRef = new SetOnce<>(); final ClusterModule clusterModule = new ClusterModule( settings, clusterService, @@ -764,7 +767,8 @@ private void construct( systemIndices, projectResolver, getWriteLoadForecaster(threadPool, settings, clusterService.getClusterSettings()), - telemetryProvider + telemetryProvider, + reconciliationFuncRef::get ); modules.add(clusterModule); @@ -1117,6 +1121,7 @@ public Map queryFields() { actionModule.getActionFilters(), systemIndices ); + reconciliationFuncRef.set(snapshotsService::updateReconciledDesiredBalance); SnapshotShardsService snapshotShardsService = new SnapshotShardsService( settings, clusterService, diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 8a1f1ad79a17f..920c26061ea95 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -58,6 +58,8 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardAssignment; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; @@ -125,6 +127,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -201,6 +204,8 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement private final ShardSnapshotUpdateCompletionHandler shardSnapshotUpdateCompletionHandler; + private final AtomicReference reconciledDesiredBalance = new AtomicReference<>(DesiredBalance.NOT_MASTER); + /** * Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the * cluster state. The number of concurrent operations in a cluster state is defined as the sum of @@ -1192,7 +1197,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) * @param knownFailures already known failed shard snapshots, but more may be found in this method * @return an updated map of shard statuses */ - private static ImmutableOpenMap processWaitingShardsAndRemovedNodes( + private ImmutableOpenMap processWaitingShardsAndRemovedNodes( SnapshotsInProgress.Entry snapshotEntry, RoutingTable routingTable, DiscoveryNodes nodes, @@ -1249,13 +1254,30 @@ private static ImmutableOpenMap processWaitingShar } continue; } else if (shardRouting.primaryShard().started()) { - // Shard that we were waiting for has started on a node, let's process it - snapshotChanged = true; - logger.debug(""" - Starting shard [{}] with shard generation [{}] that we were waiting to start on node [{}]. Previous \ - shard state [{}] - """, shardId, shardStatus.generation(), shardStatus.nodeId(), shardStatus.state()); - shards.put(shardId, new ShardSnapshotStatus(primaryNodeId, shardStatus.generation())); + if (shardStatus.isWaitingForDesiredAllocation()) { + final ShardAssignment assignment = reconciledDesiredBalance.get().getAssignment(shardId); + if (assignment == null || assignment.nodeIds().contains(primaryNodeId)) { + snapshotChanged = true; + logger.debug( + "Shard [{}] started on desired allocation node [{}] for shard generation [{}]", + shardId, + primaryNodeId, + shardStatus.generation() + ); + shards.put(shardId, new ShardSnapshotStatus(primaryNodeId, shardStatus.generation())); + } else { + // Keep it as waiting for balance + shards.put(shardId, shardStatus); + } + } else { + // Shard that we were waiting for has started on a node, let's process it + snapshotChanged = true; + logger.debug(""" + Starting shard [{}] with shard generation [{}] that we were waiting to start on node [{}]. Previous \ + shard state [{}] + """, shardId, shardStatus.generation(), shardStatus.nodeId(), shardStatus.state()); + shards.put(shardId, new ShardSnapshotStatus(primaryNodeId, shardStatus.generation())); + } continue; } else if (shardRouting.primaryShard().initializing() || shardRouting.primaryShard().relocating()) { // Shard that we were waiting for hasn't started yet or still relocating - will continue to wait @@ -1354,6 +1376,11 @@ private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsIn }); } + public void updateReconciledDesiredBalance(DesiredBalance desiredBalance) { + MasterService.assertMasterUpdateOrTestThread(); + reconciledDesiredBalance.set(desiredBalance); + } + /** * Finalizes the snapshot in the repository. * @@ -2895,7 +2922,8 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState entry.indices().values(), entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), repositoryData, - repoName + repoName, + reconciledDesiredBalance.get() ); final ImmutableOpenMap.Builder updatedAssignmentsBuilder = ImmutableOpenMap .builder(entry.shards()); @@ -3014,7 +3042,8 @@ private static ImmutableOpenMap indices, boolean useShardGenerations, RepositoryData repositoryData, - String repoName + String repoName, + DesiredBalance desiredBalance ) { ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); final ShardGenerations shardGenerations = repositoryData.shardGenerations(); @@ -3058,7 +3087,8 @@ private static ImmutableOpenMap nodeIdRemovalPredicate + Predicate nodeIdRemovalPredicate, + DesiredBalance desiredBalance ) { ShardSnapshotStatus shardSnapshotStatus; if (primary == null || primary.assignedToNode() == false) { @@ -3097,7 +3128,12 @@ private static ShardSnapshotStatus initShardSnapshotStatus( "primary shard hasn't been started yet" ); } else { - shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration); + final ShardAssignment assignment = desiredBalance.getAssignment(primary.shardId()); + if (assignment == null || assignment.nodeIds().contains(primary.currentNodeId())) { + shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration); + } else { + shardSnapshotStatus = ShardSnapshotStatus.waitForDesiredAllocation(shardRepoGeneration); + } } return shardSnapshotStatus; } @@ -3242,6 +3278,7 @@ static final class SnapshotShardsUpdateContext { // initial cluster state for update computation private final ClusterState initialState; + private final DesiredBalance desiredBalance; // tests whether node IDs are currently marked for removal private final Predicate nodeIdRemovalPredicate; @@ -3263,10 +3300,12 @@ static final class SnapshotShardsUpdateContext { */ SnapshotShardsUpdateContext( ClusterStateTaskExecutor.BatchExecutionContext batchExecutionContext, - ShardSnapshotUpdateCompletionHandler completionHandler + ShardSnapshotUpdateCompletionHandler completionHandler, + DesiredBalance desiredBalance ) { this.batchExecutionContext = batchExecutionContext; this.initialState = batchExecutionContext.initialState(); + this.desiredBalance = desiredBalance; this.nodeIdRemovalPredicate = SnapshotsInProgress.get(initialState)::isNodeIdForRemoval; this.completionHandler = completionHandler; @@ -3611,7 +3650,12 @@ private void startShardSnapshot(RepositoryShardId repoShardId, ShardGeneration g } else { shardRouting = indexRouting.shard(repoShardId.shardId()).primaryShard(); } - final ShardSnapshotStatus shardSnapshotStatus = initShardSnapshotStatus(generation, shardRouting, nodeIdRemovalPredicate); + final ShardSnapshotStatus shardSnapshotStatus = initShardSnapshotStatus( + generation, + shardRouting, + nodeIdRemovalPredicate, + desiredBalance + ); final ShardId routingShardId = shardRouting != null ? shardRouting.shardId() : new ShardId(index, repoShardId.shardId()); if (shardSnapshotStatus.isActive()) { startShardOperation(shardsBuilder(), routingShardId, shardSnapshotStatus); @@ -4087,7 +4131,8 @@ public ClusterState execute(BatchExecutionContext batchExecutionCo final ClusterState state = batchExecutionContext.initialState(); final SnapshotShardsUpdateContext shardsUpdateContext = new SnapshotShardsUpdateContext( batchExecutionContext, - shardSnapshotUpdateCompletionHandler + shardSnapshotUpdateCompletionHandler, + reconciledDesiredBalance.get() ); final SnapshotsInProgress initialSnapshots = SnapshotsInProgress.get(state); @@ -4287,7 +4332,8 @@ private SnapshotsInProgress createSnapshot( indexIds.values(), useShardGenerations(version), repositoryData, - repositoryName + repositoryName, + reconciledDesiredBalance.get() ); if (request.partial() == false) { Set missing = new TreeSet<>(); // sorted for more usable message diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java index 999845fa73610..ec32055f575d8 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java @@ -121,7 +121,8 @@ public DesiredBalance compute( computer, (state, action) -> state, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ); var allocationService = new MockAllocationService( randomAllocationDeciders(settings, clusterSettings), diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 2908bff995340..c59fc43437329 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -168,7 +168,8 @@ public Collection createAllocationDeciders(Settings settings, EmptySystemIndices.INSTANCE, TestProjectResolvers.alwaysThrow(), WriteLoadForecaster.DEFAULT, - TelemetryProvider.NOOP + TelemetryProvider.NOOP, + () -> (desiredBalance) -> {} ) ); assertEquals(e.getMessage(), "Cannot specify allocation decider [" + EnableAllocationDecider.class.getName() + "] twice"); @@ -187,7 +188,8 @@ public Collection createAllocationDeciders(Settings settings, EmptySystemIndices.INSTANCE, TestProjectResolvers.alwaysThrow(), WriteLoadForecaster.DEFAULT, - TelemetryProvider.NOOP + TelemetryProvider.NOOP, + () -> (desiredBalance) -> {} ); assertTrue(module.deciderList.stream().anyMatch(d -> d.getClass().equals(FakeAllocationDecider.class))); } @@ -205,7 +207,8 @@ public Map> getShardsAllocators(Settings setti EmptySystemIndices.INSTANCE, TestProjectResolvers.alwaysThrow(), WriteLoadForecaster.DEFAULT, - TelemetryProvider.NOOP + TelemetryProvider.NOOP, + () -> (desiredBalance) -> {} ); } @@ -241,7 +244,8 @@ public void testUnknownShardsAllocator() { EmptySystemIndices.INSTANCE, TestProjectResolvers.alwaysThrow(), WriteLoadForecaster.DEFAULT, - TelemetryProvider.NOOP + TelemetryProvider.NOOP, + () -> (desiredBalance) -> {} ) ); assertEquals("Unknown ShardsAllocator [dne]", e.getMessage()); @@ -305,7 +309,8 @@ public void testRejectsReservedExistingShardsAllocatorName() { EmptySystemIndices.INSTANCE, TestProjectResolvers.alwaysThrow(), WriteLoadForecaster.DEFAULT, - TelemetryProvider.NOOP + TelemetryProvider.NOOP, + () -> (desiredBalance) -> {} ); expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator())); } @@ -321,7 +326,8 @@ public void testRejectsDuplicateExistingShardsAllocatorName() { EmptySystemIndices.INSTANCE, TestProjectResolvers.alwaysThrow(), WriteLoadForecaster.DEFAULT, - TelemetryProvider.NOOP + TelemetryProvider.NOOP, + () -> (desiredBalance) -> {} ); expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator())); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java index 14e0aaa253749..6f6dbdced15f9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java @@ -182,7 +182,8 @@ public void testUndesiredShardCount() { clusterService, (innerState, strategy) -> innerState, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ) { @Override public DesiredBalance getDesiredBalance() { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java index 277521c5832a1..16c7505c3d414 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java @@ -490,7 +490,8 @@ private Map.Entry createNewAllocationSer (clusterState, routingAllocationAction) -> strategyRef.get() .executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", routingAllocationAction), TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ) { @Override public void allocate(RoutingAllocation allocation, ActionListener listener) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java index 21d547c1593b8..6878df0ca06d9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java @@ -174,7 +174,8 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo clusterService, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ); assertValidStats(desiredBalanceShardsAllocator.getStats()); var allocationService = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator(allocateUnassigned)); @@ -302,7 +303,8 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo clusterService, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ); var allocationService = new AllocationService( new AllocationDeciders(List.of()), @@ -421,7 +423,8 @@ boolean hasEnoughIterations(int currentIteration) { }, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ); var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator); allocationServiceRef.set(allocationService); @@ -549,7 +552,8 @@ public DesiredBalance compute( }, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ); var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator); allocationServiceRef.set(allocationService); @@ -653,7 +657,8 @@ public DesiredBalance compute( }, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ); var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator); @@ -746,7 +751,8 @@ public DesiredBalance compute( desiredBalanceComputer, (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ); var service = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator()); @@ -800,7 +806,8 @@ public void testResetDesiredBalanceOnNoLongerMaster() { desiredBalanceComputer, (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ); var service = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator()); @@ -850,7 +857,8 @@ public void testResetDesiredBalanceOnNodeShutdown() { desiredBalanceComputer, (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ) { @Override public void resetDesiredBalance() { @@ -946,7 +954,8 @@ public DesiredBalance compute( }, (clusterState, rerouteStrategy) -> null, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ) { private ActionListener lastListener; diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index a3f3647fdb45b..40f889abeaeab 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -745,7 +746,8 @@ private static ClusterState applyUpdates(ClusterState state, SnapshotsService.Sn final SnapshotsInProgress existing = SnapshotsInProgress.get(batchExecutionContext.initialState()); final var context = new SnapshotsService.SnapshotShardsUpdateContext( batchExecutionContext, - /* on completion handler */ (shardSnapshotUpdateResult, newlyCompletedEntries, updatedRepositories) -> {} + /* on completion handler */ (shardSnapshotUpdateResult, newlyCompletedEntries, updatedRepositories) -> {}, + DesiredBalance.BECOME_MASTER_INITIAL ); final SnapshotsInProgress updated = context.computeUpdatedState(); context.setupSuccessfulPublicationCallbacks(updated); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 8a49db652374e..e07c9c33e4628 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -175,7 +175,8 @@ private static DesiredBalanceShardsAllocator createDesiredBalanceShardsAllocator clusterService, null, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + () -> (desiredBalance) -> {} ) { private RoutingAllocation lastAllocation;