Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/130300.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130300
summary: Waiting for desired allocation before starting shard snapshot
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,217 @@

import org.apache.logging.log4j.Level;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardAssignment;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class SnapshotsServiceIT extends AbstractSnapshotIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
}

public void testShardSnapshotWaitForDesiredAllocation() throws Exception {
createRepository("test-repo", "fs");
internalCluster().ensureAtLeastNumDataNodes(3);
final int numDataNodes = internalCluster().numDataNodes();
final var indexName = "index";
createIndex(indexName, indexSettings(numDataNodes, 0).build());
indexRandomDocs(indexName, between(42, 100));
ensureGreen(indexName);
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final Set<String> dataNodeIds = clusterService.state().nodes().getDataNodes().keySet();
logger.info("--> all data nodes: {}", dataNodeIds);

final Index index = clusterService().state().routingTable(ProjectId.DEFAULT).index(indexName).getIndex();
final var shardsAllocator = (DesiredBalanceShardsAllocator) internalCluster().getCurrentMasterNodeInstance(ShardsAllocator.class);

// Get all the node IDs that hosting at least one shard of the index
final var initialNodeIdsForAllocation = internalCluster().nodesInclude(indexName)
.stream()
.map(ESIntegTestCase::getNodeId)
.collect(Collectors.toUnmodifiableSet());

// Delay shard snapshot status updates so that we control when snapshot is completed
final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName());
final Set<CheckedRunnable<Exception>> updateShardRunnables = ConcurrentCollections.newConcurrentSet();
final var allRunnablesReadyLatch = new AtomicReference<>(new CountDownLatch(1));
masterTransportService.addRequestHandlingBehavior(
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
(handler, request, channel, task) -> {
final var updateRequest = asInstanceOf(UpdateIndexShardSnapshotStatusRequest.class, request);
if (updateRequest.shardId().getIndex().equals(index)) {
updateShardRunnables.add(() -> handler.messageReceived(request, channel, task));
if (updateShardRunnables.size() == numDataNodes) {
allRunnablesReadyLatch.get().countDown();
}
} else {
handler.messageReceived(request, channel, task);
}
}
);

// Start snap-0 asynchronously and it will be delayed at shard snapshot status updates
safeGet(
client().admin()
.cluster()
.prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "snap-0")
.setWaitForCompletion(false)
.setIndices(indexName)
.execute()
);
safeAwait(allRunnablesReadyLatch.get());

// Start snap-1 asynchronously and all shards will be queued because snap-0 is in progress
safeGet(
client().admin()
.cluster()
.prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "snap-1")
.setWaitForCompletion(false)
.setIndices(indexName)
.execute()
);

// Ensure all shards of snap-1 are queued
safeAwait(ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
final var snapshotsInProgress = SnapshotsInProgress.get(clusterState);
final SnapshotsInProgress.Entry snapshotEntry = snapshotsInProgress.asStream()
.filter(entry -> entry.snapshot().getSnapshotId().getName().equals("snap-1"))
.findFirst()
.orElse(null);
if (snapshotEntry == null) {
return false;
}
final List<SnapshotsInProgress.ShardState> states = snapshotEntry.shards()
.values()
.stream()
.map(SnapshotsInProgress.ShardSnapshotStatus::state)
.toList();
if (states.size() != numDataNodes) {
return false;
}
return states.stream().allMatch(state -> state == SnapshotsInProgress.ShardState.QUEUED);
}));

// Create undesired allocation by allowing allocation on only one node
final String nodeIdAllowAllocation = randomFrom(dataNodeIds);
logger.info("--> update settings to allow allocation on node [{}]", nodeIdAllowAllocation);
updateIndexSettings(
Settings.builder()
.put(
"index.routing.allocation.exclude._id",
Strings.collectionToCommaDelimitedString(
dataNodeIds.stream().filter(nodeId -> nodeId.equals(nodeIdAllowAllocation) == false).toList()
)
)
);
ClusterRerouteUtils.reroute(client());
// Desired balance should be updated to allow allocation on nodeIdAllowAllocation
final DesiredBalance desiredBalance = shardsAllocator.getDesiredBalance();
for (int j = 0; j < numDataNodes; j++) {
final ShardAssignment assignment = desiredBalance.getAssignment(new ShardId(index, j));
assertThat(assignment.nodeIds(), contains(nodeIdAllowAllocation));
logger.info("--> after update settings shard [{}] Assignment {}", j, assignment);
}

// All shards are still not moved because they are locked down by snap-0
assertThat(
internalCluster().nodesInclude(indexName).stream().map(ESIntegTestCase::getNodeId).collect(Collectors.toUnmodifiableSet()),
equalTo(initialNodeIdsForAllocation)
);
final var hostingNodeIds = internalCluster().nodesInclude(indexName)
.stream()
.map(ESIntegTestCase::getNodeId)
.collect(Collectors.toUnmodifiableSet());

// Add a listener to ensure shards of snap-1 going through the state of waiting for desired allocation
final var waitingForDesiredAllocationListener = ClusterServiceUtils.addTemporaryStateListener(clusterService, state -> {
final var snapshotsInProgress = SnapshotsInProgress.get(state);
final SnapshotsInProgress.Entry snapshotEntry = snapshotsInProgress.asStream()
.filter(entry -> entry.snapshot().getSnapshotId().getName().equals("snap-1"))
.findFirst()
.orElseThrow();
return snapshotEntry.shards().entrySet().stream().anyMatch(entry -> entry.getValue().isWaitingForDesiredAllocation());
});

// Complete snap-0 to allow relocation of shards
final var snap0UpdateShardRunnables = Set.copyOf(updateShardRunnables);
updateShardRunnables.clear();
allRunnablesReadyLatch.set(new CountDownLatch(1));
for (CheckedRunnable<Exception> runnable : snap0UpdateShardRunnables) {
runnable.run();
}

// We should see shards of snap-1 change to waiting for desired allocation as an intermediate state
safeAwait(waitingForDesiredAllocationListener);

// Wait for snap-0 to be ready to complete. This means relocations are completed.
safeAwait(allRunnablesReadyLatch.get());

// Shards should be relocated to the desired node before snap-1 is completed
assertThat(
internalCluster().nodesInclude(indexName).stream().map(ESIntegTestCase::getNodeId).collect(Collectors.toUnmodifiableSet()),
contains(nodeIdAllowAllocation)
);

// Let snap-1 complete
for (var runnable : updateShardRunnables) {
runnable.run();
}
safeAwait(ClusterServiceUtils.addTemporaryStateListener(clusterService, state -> SnapshotsInProgress.get(state).isEmpty()));

// Both snapshots are completed successfully
final var getSnapshotsResponse = safeGet(
client().admin().cluster().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, "test-repo", "snap-*").execute()
);
assertThat(
getSnapshotsResponse.getSnapshots().stream().map(info -> info.snapshotId().getName()).toList(),
containsInAnyOrder("snap-0", "snap-1")
);
assertTrue(getSnapshotsResponse.getSnapshots().stream().map(SnapshotInfo::state).allMatch(state -> state == SnapshotState.SUCCESS));
}

public void testDeletingSnapshotsIsLoggedAfterClusterStateIsProcessed() throws Exception {
createRepository("test-repo", "fs");
createIndexWithRandomDocs("test-index", randomIntBetween(1, 42));
Expand Down
14 changes: 10 additions & 4 deletions server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction;
import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory;
Expand Down Expand Up @@ -104,6 +105,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -144,7 +146,8 @@ public ClusterModule(
SystemIndices systemIndices,
ProjectResolver projectResolver,
WriteLoadForecaster writeLoadForecaster,
TelemetryProvider telemetryProvider
TelemetryProvider telemetryProvider,
Supplier<Consumer<DesiredBalance>> reconciledDesiredBalancerConsumer
) {
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
Expand All @@ -170,7 +173,8 @@ public ClusterModule(
this::reconcile,
writeLoadForecaster,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator
nodeAllocationStatsAndWeightsCalculator,
reconciledDesiredBalancerConsumer
);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices, projectResolver);
Expand Down Expand Up @@ -480,7 +484,8 @@ private static ShardsAllocator createShardsAllocator(
DesiredBalanceReconcilerAction reconciler,
WriteLoadForecaster writeLoadForecaster,
TelemetryProvider telemetryProvider,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
Supplier<Consumer<DesiredBalance>> reconciledDesiredBalancerConsumer
) {
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
allocators.put(
Expand All @@ -496,7 +501,8 @@ private static ShardsAllocator createShardsAllocator(
clusterService,
reconciler,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator
nodeAllocationStatsAndWeightsCalculator,
reconciledDesiredBalancerConsumer
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,8 @@ public record ShardSnapshotStatus(
"missing index"
);

public static final String WAITING_FOR_DESIRED_ALLOCATION = "waiting for desired allocation";

/**
* Initializes status with state {@link ShardState#INIT}.
*/
Expand All @@ -747,6 +749,14 @@ public static ShardSnapshotStatus success(String nodeId, ShardSnapshotResult sha
return new ShardSnapshotStatus(nodeId, ShardState.SUCCESS, shardSnapshotResult.getGeneration(), null, shardSnapshotResult);
}

public static ShardSnapshotStatus waitForDesiredAllocation(ShardGeneration shardGeneration) {
return new ShardSnapshotStatus(null, ShardState.WAITING, shardGeneration, WAITING_FOR_DESIRED_ALLOCATION);
}

public boolean isWaitingForDesiredAllocation() {
return nodeId == null && state == ShardState.WAITING && WAITING_FOR_DESIRED_ALLOCATION.equals(reason);
}

public ShardSnapshotStatus(
@Nullable String nodeId,
ShardState state,
Expand All @@ -765,8 +775,9 @@ public ShardSnapshotStatus(
private boolean assertConsistent() {
// If the state is failed we have to have a reason for this failure
assert state.failed() == false || reason != null;
assert (state != ShardState.INIT && state != ShardState.WAITING && state != ShardState.PAUSED_FOR_NODE_REMOVAL)
|| nodeId != null : "Null node id for state [" + state + "]";
assert isWaitingForDesiredAllocation()
|| (state != ShardState.INIT && state != ShardState.WAITING && state != ShardState.PAUSED_FOR_NODE_REMOVAL)
|| nodeId != null : "Mis-match node id [" + nodeId + "] for state [" + state + "]";
assert state != ShardState.QUEUED || (nodeId == null && generation == null && reason == null)
: "Found unexpected non-null values for queued state shard nodeId[" + nodeId + "][" + generation + "][" + reason + "]";
assert state == ShardState.SUCCESS || shardSnapshotResult == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* A {@link ShardsAllocator} which asynchronously refreshes the desired balance held by the {@link DesiredBalanceComputer} and then takes
Expand Down Expand Up @@ -90,6 +92,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
private volatile boolean resetCurrentDesiredBalance = false;
private final Set<String> processedNodeShutdowns = new HashSet<>();
private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator;
private final Supplier<Consumer<DesiredBalance>> reconciledDesiredBalancerConsumer;
private final DesiredBalanceMetrics desiredBalanceMetrics;
/**
* Manages balancer round results in order to report on the balancer activity in a configurable manner.
Expand All @@ -116,7 +119,8 @@ public DesiredBalanceShardsAllocator(
ClusterService clusterService,
DesiredBalanceReconcilerAction reconciler,
TelemetryProvider telemetryProvider,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
Supplier<Consumer<DesiredBalance>> reconciledDesiredBalancerConsumer
) {
this(
delegateAllocator,
Expand All @@ -125,7 +129,8 @@ public DesiredBalanceShardsAllocator(
new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator),
reconciler,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator
nodeAllocationStatsAndWeightsCalculator,
reconciledDesiredBalancerConsumer
);
}

Expand All @@ -136,10 +141,12 @@ public DesiredBalanceShardsAllocator(
DesiredBalanceComputer desiredBalanceComputer,
DesiredBalanceReconcilerAction reconciler,
TelemetryProvider telemetryProvider,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
Supplier<Consumer<DesiredBalance>> reconciledDesiredBalancerConsumer
) {
this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry());
this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator;
this.reconciledDesiredBalancerConsumer = reconciledDesiredBalancerConsumer;
this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService(threadPool, clusterService.getClusterSettings());
this.delegateAllocator = delegateAllocator;
this.threadPool = threadPool;
Expand Down Expand Up @@ -473,6 +480,7 @@ public ClusterState execute(BatchExecutionContext<ReconcileDesiredBalanceTask> b
var latest = findLatest(batchExecutionContext.taskContexts());
var newState = applyBalance(batchExecutionContext, latest);
discardSupersededTasks(batchExecutionContext.taskContexts(), latest);
reconciledDesiredBalancerConsumer.get().accept(latest.getTask().desiredBalance);
return newState;
}

Expand Down
Loading
Loading