Skip to content

Commit a29ba9b

Browse files
committed
Fix propagation for clones
1 parent 8bef3f0 commit a29ba9b

File tree

3 files changed

+168
-65
lines changed

3 files changed

+168
-65
lines changed

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,8 +1291,14 @@ public Entry abort(Map<ShardId, ShardSnapshotStatus> completedAssignedQueuedShar
12911291
} else {
12921292
// Record the deletion of the assigned-queued shard snapshot so that we can kick off the first QUEUED one
12931293
// in later snapshots.
1294-
final var old = completedAssignedQueuedShards.put(shardEntry.getKey(), status);
1295-
assert old == null : old;
1294+
final ShardId shardId = shardEntry.getKey();
1295+
final var old = completedAssignedQueuedShards.put(shardId, status);
1296+
assert old == null
1297+
: shardId
1298+
+ " has unexpected previous completed shard snapshot status: "
1299+
+ old
1300+
+ " that conflicts with the one from "
1301+
+ this;
12961302
assert isClone() == false
12971303
: "The state queued with generation should not be possible for a clone entry [" + this + "]";
12981304
final String reason = "assigned-queued aborted by snapshot deletion";

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 140 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1630,13 +1630,22 @@ public ClusterState execute(ClusterState currentState) {
16301630
}
16311631
return abortedEntry;
16321632
} else {
1633-
final var shardsBuilder = maybeUpdateWithCompletedAssignedQueuedShards(
1634-
existing,
1635-
completedAssignedQueuedShards,
1636-
true,
1637-
() -> ImmutableOpenMap.builder(existing.shards())
1638-
);
1639-
return shardsBuilder == null ? existing : existing.withShardStates(shardsBuilder.build());
1633+
if (existing.isClone()) {
1634+
final var clonesBuilder = maybeUpdateCloneWithCompletedAssignedQueuedShards(
1635+
existing,
1636+
completedAssignedQueuedShards,
1637+
currentState.nodes().getLocalNodeId(),
1638+
() -> ImmutableOpenMap.builder(existing.shardSnapshotStatusByRepoShardId())
1639+
);
1640+
return clonesBuilder == null ? existing : existing.withClones(clonesBuilder.build());
1641+
} else {
1642+
final var shardsBuilder = maybeUpdateSnapshotWithCompletedAssignedQueuedShards(
1643+
existing,
1644+
completedAssignedQueuedShards,
1645+
() -> ImmutableOpenMap.builder(existing.shards())
1646+
);
1647+
return shardsBuilder == null ? existing : existing.withShardStates(shardsBuilder.build());
1648+
}
16401649
}
16411650
}).filter(Objects::nonNull).toList()
16421651
);
@@ -1751,43 +1760,86 @@ public String toString() {
17511760
}
17521761

17531762
/**
1754-
* Update the snapshot entry with the completed assigned queue shard snapshots accumulated from processing earlier snapshots.
1763+
* Update a snapshot entry with the completed assigned queue shard snapshots accumulated from processing earlier snapshots.
17551764
* These shard snapshots are completed because either their snapshots are deleted or the associated shards become unassigned.
17561765
* In both cases, the shard snapshots for the same shards must be updated to propagate the changes.
17571766
* @param entry The snapshot entry to update.
17581767
* @param completedAssignedQueuedShards Completed assigned queued shard snapshots accumulated from processing earlier snapshots.
1759-
* @param removeAfterUpdate If true, the completed assigned queued shards will be removed from completedAssignedQueuedShards
1760-
* after updating the entry.
1768+
* An entry is removed if it is not MISSING state so that it only updates to the first match.
17611769
* @param shardsBuilderSupplier Supplier to get a builder for new shard snapshots if any update is applicable.
17621770
* @return The updated map of shard snapshots for the snapshot entry, or null if no updates were made.
17631771
*/
1764-
static ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> maybeUpdateWithCompletedAssignedQueuedShards(
1772+
static ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> maybeUpdateSnapshotWithCompletedAssignedQueuedShards(
17651773
SnapshotsInProgress.Entry entry,
17661774
Map<ShardId, ShardSnapshotStatus> completedAssignedQueuedShards,
1767-
boolean removeAfterUpdate,
17681775
Supplier<ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus>> shardsBuilderSupplier
17691776
) {
1770-
if (entry.isClone() == false && completedAssignedQueuedShards.isEmpty() == false) {
1771-
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = null;
1772-
final var iterator = completedAssignedQueuedShards.keySet().iterator();
1773-
while (iterator.hasNext()) {
1774-
final var shardId = iterator.next();
1775-
final var shardSnapshotStatus = entry.shards().get(shardId);
1776-
if (shardSnapshotStatus != null) {
1777-
assert shardSnapshotStatus.isUnassignedQueued() : shardSnapshotStatus;
1778-
if (shardsBuilder == null) {
1779-
shardsBuilder = shardsBuilderSupplier.get();
1780-
}
1781-
shardsBuilder.put(shardId, completedAssignedQueuedShards.get(shardId));
1782-
if (removeAfterUpdate) {
1783-
iterator.remove();
1784-
}
1777+
assert entry.isClone() == false : "unexpected clone " + entry;
1778+
if (entry.state().completed() || completedAssignedQueuedShards.isEmpty()) {
1779+
return null;
1780+
}
1781+
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = null;
1782+
final var iterator = completedAssignedQueuedShards.keySet().iterator();
1783+
while (iterator.hasNext()) {
1784+
final var shardId = iterator.next();
1785+
final var shardSnapshotStatus = entry.shards().get(shardId);
1786+
if (shardSnapshotStatus != null) {
1787+
assert shardSnapshotStatus.isUnassignedQueued() : shardSnapshotStatus;
1788+
if (shardsBuilder == null) {
1789+
shardsBuilder = shardsBuilderSupplier.get();
1790+
}
1791+
final ShardSnapshotStatus completedStatus = completedAssignedQueuedShards.get(shardId);
1792+
if (completedStatus.state() == ShardState.MISSING) {
1793+
shardsBuilder.put(shardId, completedStatus);
1794+
} else {
1795+
shardsBuilder.put(shardId, ShardSnapshotStatus.assignedQueued(completedStatus.nodeId(), completedStatus.generation()));
1796+
iterator.remove();
17851797
}
17861798
}
1787-
return shardsBuilder;
1788-
} else {
1799+
}
1800+
return shardsBuilder;
1801+
}
1802+
1803+
/**
1804+
* Update a clone entry with the completed assigned queue shard snapshots accumulated from processing earlier snapshots.
1805+
* These shard snapshots are completed because either their snapshots are deleted or the associated shards become unassigned.
1806+
* In both cases, previous queued clone shard snapshots for the same shards can now start.
1807+
* @param entry The snapshot entry to update.
1808+
* @param completedAssignedQueuedShards Completed assigned queued shard snapshots accumulated from processing earlier snapshots.
1809+
* Once an entry is applied for update, it is removed from the list.
1810+
* @param clonesBuilderSupplier Supplier to get a builder for new clone shard snapshots if any update is applicable.
1811+
* @return The updated map of clone shard snapshots for the snapshot entry, or null if no updates were made.
1812+
*/
1813+
static ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> maybeUpdateCloneWithCompletedAssignedQueuedShards(
1814+
SnapshotsInProgress.Entry entry,
1815+
Map<ShardId, ShardSnapshotStatus> completedAssignedQueuedShards,
1816+
String localNodeId,
1817+
Supplier<ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus>> clonesBuilderSupplier
1818+
) {
1819+
assert entry.isClone() : "expected a clone, but got " + entry;
1820+
if (entry.state().completed() || completedAssignedQueuedShards.isEmpty()) {
17891821
return null;
17901822
}
1823+
ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clonesBuilder = null;
1824+
final var iterator = completedAssignedQueuedShards.keySet().iterator();
1825+
while (iterator.hasNext()) {
1826+
final var shardId = iterator.next();
1827+
final IndexId indexId = entry.indices().get(shardId.getIndexName());
1828+
if (indexId != null) {
1829+
final RepositoryShardId repoShardId = new RepositoryShardId(indexId, shardId.id());
1830+
if (SnapshotsServiceUtils.isQueued(entry.shardSnapshotStatusByRepoShardId().get(repoShardId))) {
1831+
if (clonesBuilder == null) {
1832+
clonesBuilder = clonesBuilderSupplier.get();
1833+
}
1834+
clonesBuilder.put(
1835+
repoShardId,
1836+
new ShardSnapshotStatus(localNodeId, completedAssignedQueuedShards.get(shardId).generation())
1837+
);
1838+
iterator.remove();
1839+
}
1840+
}
1841+
}
1842+
return clonesBuilder;
17911843
}
17921844

17931845
private void addDeleteListener(String deleteUUID, ActionListener<Void> listener) {
@@ -2520,36 +2572,49 @@ private SnapshotsInProgress maybeStartAssignedQueuedShardSnapshotsForRepo(
25202572
final List<SnapshotsInProgress.Entry> newEntries = new ArrayList<>(oldEntries.size());
25212573
final Map<ShardId, ShardSnapshotStatus> completedAssignedQueuedShards = new HashMap<>();
25222574
for (SnapshotsInProgress.Entry entry : oldEntries) {
2523-
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = maybeUpdateWithCompletedAssignedQueuedShards(
2524-
entry,
2525-
completedAssignedQueuedShards,
2526-
false,
2527-
() -> ImmutableOpenMap.builder(entry.shards())
2528-
);
2529-
boolean changed = shardsBuilder != null;
2530-
2531-
if (entry.hasAssignedQueuedShards() && perNodeShardSnapshotCounter.hasCapacityOnAnyNode()) {
2532-
if (shardsBuilder == null) {
2533-
shardsBuilder = ImmutableOpenMap.builder(entry.shards());
2534-
}
2535-
changed |= maybeStartAssignedQueuedShardSnapshots(
2536-
clusterState,
2575+
if (entry.isClone()) {
2576+
final var clonesBuilder = maybeUpdateCloneWithCompletedAssignedQueuedShards(
25372577
entry,
2538-
snapshotsInProgress::isNodeIdForRemoval,
2539-
shardsBuilder,
2540-
perNodeShardSnapshotCounter,
2541-
completedAssignedQueuedShards
2578+
completedAssignedQueuedShards,
2579+
initialState.nodes().getLocalNodeId(),
2580+
() -> ImmutableOpenMap.builder(entry.shardSnapshotStatusByRepoShardId())
25422581
);
2543-
}
2544-
2545-
if (changed) {
2546-
final var newEntry = entry.withShardStates(shardsBuilder.build());
2547-
newEntries.add(newEntry);
2548-
if (newEntry.state().completed()) {
2549-
newlyCompletedEntries.add(newEntry);
2582+
if (clonesBuilder != null) {
2583+
newEntries.add(entry.withClones(clonesBuilder.build()));
2584+
} else {
2585+
newEntries.add(entry);
25502586
}
25512587
} else {
2552-
newEntries.add(entry);
2588+
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder =
2589+
maybeUpdateSnapshotWithCompletedAssignedQueuedShards(
2590+
entry,
2591+
completedAssignedQueuedShards,
2592+
() -> ImmutableOpenMap.builder(entry.shards())
2593+
);
2594+
boolean changed = shardsBuilder != null;
2595+
2596+
if (entry.hasAssignedQueuedShards() && perNodeShardSnapshotCounter.hasCapacityOnAnyNode()) {
2597+
if (shardsBuilder == null) {
2598+
shardsBuilder = ImmutableOpenMap.builder(entry.shards());
2599+
}
2600+
changed |= maybeStartAssignedQueuedShardSnapshots(
2601+
clusterState,
2602+
entry,
2603+
snapshotsInProgress::isNodeIdForRemoval,
2604+
shardsBuilder,
2605+
perNodeShardSnapshotCounter,
2606+
completedAssignedQueuedShards
2607+
);
2608+
}
2609+
if (changed) {
2610+
final var newEntry = entry.withShardStates(shardsBuilder.build());
2611+
newEntries.add(newEntry);
2612+
if (newEntry.state().completed()) {
2613+
newlyCompletedEntries.add(newEntry);
2614+
}
2615+
} else {
2616+
newEntries.add(entry);
2617+
}
25532618
}
25542619
}
25552620
return snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(projectRepo.projectId(), projectRepo.name(), newEntries);
@@ -2589,10 +2654,17 @@ private boolean maybeStartAssignedQueuedShardSnapshots(
25892654
);
25902655
if (newShardSnapshotStatus.equals(existingShardSnapshotStatus) == false) {
25912656
if (newShardSnapshotStatus.state().completed()) {
2592-
assert newShardSnapshotStatus.state() == ShardState.MISSING : newShardSnapshotStatus;
2657+
assert newShardSnapshotStatus.state() == ShardState.MISSING
2658+
: shardId + " has unexpected completion state " + newShardSnapshotStatus + " in snapshot entry " + entry;
25932659
// There might be QUEUED shard snapshots of this shard in later snapshots and we need
25942660
// to propagate the completion to them.
2595-
completedAssignedQueuedShards.put(shardId, newShardSnapshotStatus);
2661+
final var old = completedAssignedQueuedShards.put(shardId, newShardSnapshotStatus);
2662+
assert old == null
2663+
: shardId
2664+
+ " has unexpected previous completed shard snapshot status: "
2665+
+ old
2666+
+ " that conflicts with the one from "
2667+
+ entry;
25962668
}
25972669
changedCount++;
25982670
if (newShardSnapshotStatus.state() == ShardState.INIT) {
@@ -2649,7 +2721,7 @@ private SnapshotsInProgress.Entry applyUpdatesToEntry(
26492721
) {
26502722
// Completed snapshots do not require any updates so we just add them to the output list and keep going.
26512723
// Also we short circuit if there are no more unconsumed updates to apply.
2652-
if (entry.state().completed() || shardUpdates.isEmpty()) {
2724+
if (entry.state().completed() || (shardUpdates.isEmpty() && completedAssignedQueuedShards.isEmpty())) {
26532725
return entry;
26542726
}
26552727
return new EntryContext(entry, shardUpdates, completedAssignedQueuedShards).computeUpdatedSnapshotEntryFromShardUpdates();
@@ -2717,7 +2789,16 @@ SnapshotsInProgress.Entry computeUpdatedSnapshotEntryFromShardUpdates() {
27172789
}
27182790
}
27192791

2720-
maybeUpdateWithCompletedAssignedQueuedShards(entry, completedAssignedQueuedShards, false, this::shardsBuilder);
2792+
if (entry.isClone()) {
2793+
maybeUpdateCloneWithCompletedAssignedQueuedShards(
2794+
entry,
2795+
completedAssignedQueuedShards,
2796+
initialState.nodes().getLocalNodeId(),
2797+
this::clonesBuilder
2798+
);
2799+
} else {
2800+
maybeUpdateSnapshotWithCompletedAssignedQueuedShards(entry, completedAssignedQueuedShards, this::shardsBuilder);
2801+
}
27212802

27222803
if (shardsBuilder != null) {
27232804
assert clonesBuilder == null

server/src/main/java/org/elasticsearch/snapshots/SnapshotsServiceUtils.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ public static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus>
413413
.entrySet()) {
414414
SnapshotsInProgress.ShardSnapshotStatus shardStatus = shardSnapshotEntry.getValue();
415415
ShardId shardId = snapshotEntry.shardId(shardSnapshotEntry.getKey());
416-
if (shardStatus.equals(SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)) {
416+
if (shardStatus.isUnassignedQueued() || shardStatus.isAssignedQueued()) {
417417
// this shard snapshot is waiting for a previous snapshot to finish execution for this shard
418418
final SnapshotsInProgress.ShardSnapshotStatus knownFailure = knownFailures.get(shardSnapshotEntry.getKey());
419419
if (knownFailure == null) {
@@ -422,14 +422,30 @@ public static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus>
422422
// shard became unassigned while queued after a delete or clone operation so we can fail as missing here
423423
assert snapshotEntry.partial();
424424
snapshotChanged = true;
425-
logger.debug("failing snapshot of shard [{}] because index got deleted", shardId);
426-
shards.put(shardId, SnapshotsInProgress.ShardSnapshotStatus.MISSING);
427-
knownFailures.put(shardSnapshotEntry.getKey(), SnapshotsInProgress.ShardSnapshotStatus.MISSING);
425+
logger.debug("failing snapshot of shard [{}] with status [{}] because index got deleted", shardId, shardStatus);
426+
final SnapshotsInProgress.ShardSnapshotStatus newShardStatus = shardStatus.isAssignedQueued()
427+
? new SnapshotsInProgress.ShardSnapshotStatus(
428+
shardStatus.nodeId(),
429+
SnapshotsInProgress.ShardState.FAILED,
430+
shardStatus.generation(),
431+
"shard is deleted"
432+
)
433+
: SnapshotsInProgress.ShardSnapshotStatus.MISSING;
434+
shards.put(shardId, newShardStatus);
435+
knownFailures.put(shardSnapshotEntry.getKey(), newShardStatus);
428436
} else {
429437
// if no failure is known for the shard we keep waiting
430438
shards.put(shardId, shardStatus);
431439
}
432440
} else {
441+
assert shardStatus.isAssignedQueued() == false
442+
: shardId
443+
+ " with status "
444+
+ shardStatus
445+
+ " has unexpected known failure "
446+
+ knownFailure
447+
+ " in snapshot entry "
448+
+ snapshotEntry;
433449
// If a failure is known for an execution we waited on for this shard then we fail with the same exception here
434450
// as well
435451
snapshotChanged = true;

0 commit comments

Comments
 (0)