@@ -1630,22 +1630,13 @@ public ClusterState execute(ClusterState currentState) {
16301630 }
16311631 return abortedEntry ;
16321632 } else {
1633- if (existing .isClone ()) {
1634- final var clonesBuilder = maybeUpdateCloneWithCompletedAssignedQueuedShards (
1635- existing ,
1636- completedAssignedQueuedShards ,
1637- currentState .nodes ().getLocalNodeId (),
1638- () -> ImmutableOpenMap .builder (existing .shardSnapshotStatusByRepoShardId ())
1639- );
1640- return clonesBuilder == null ? existing : existing .withClones (clonesBuilder .build ());
1641- } else {
1642- final var shardsBuilder = maybeUpdateSnapshotWithCompletedAssignedQueuedShards (
1643- existing ,
1644- completedAssignedQueuedShards ,
1645- () -> ImmutableOpenMap .builder (existing .shards ())
1646- );
1647- return shardsBuilder == null ? existing : existing .withShardStates (shardsBuilder .build ());
1648- }
1633+ final var shardsBuilder = maybeUpdateWithCompletedAssignedQueuedShards (
1634+ existing ,
1635+ completedAssignedQueuedShards ,
1636+ true ,
1637+ () -> ImmutableOpenMap .builder (existing .shards ())
1638+ );
1639+ return shardsBuilder == null ? existing : existing .withShardStates (shardsBuilder .build ());
16491640 }
16501641 }).filter (Objects ::nonNull ).toList ()
16511642 );
@@ -1760,86 +1751,43 @@ public String toString() {
17601751 }
17611752
17621753 /**
1763- * Update a snapshot entry with the completed assigned queue shard snapshots accumulated from processing earlier snapshots.
1754+ * Update the snapshot entry with the completed assigned queue shard snapshots accumulated from processing earlier snapshots.
17641755 * These shard snapshots are completed because either their snapshots are deleted or the associated shards become unassigned.
17651756 * In both cases, the shard snapshots for the same shards must be updated to propagate the changes.
17661757 * @param entry The snapshot entry to update.
17671758 * @param completedAssignedQueuedShards Completed assigned queued shard snapshots accumulated from processing earlier snapshots.
1768- * An entry is removed if it is not MISSING state so that it only updates to the first match.
1759+ * @param removeAfterUpdate If true, the completed assigned queued shards will be removed from completedAssignedQueuedShards
1760+ * after updating the entry.
17691761 * @param shardsBuilderSupplier Supplier to get a builder for new shard snapshots if any update is applicable.
17701762 * @return The updated map of shard snapshots for the snapshot entry, or null if no updates were made.
17711763 */
1772- static ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > maybeUpdateSnapshotWithCompletedAssignedQueuedShards (
1764+ static ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > maybeUpdateWithCompletedAssignedQueuedShards (
17731765 SnapshotsInProgress .Entry entry ,
17741766 Map <ShardId , ShardSnapshotStatus > completedAssignedQueuedShards ,
1767+ boolean removeAfterUpdate ,
17751768 Supplier <ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus >> shardsBuilderSupplier
17761769 ) {
1777- assert entry .isClone () == false : "unexpected clone " + entry ;
1778- if (entry .state ().completed () || completedAssignedQueuedShards .isEmpty ()) {
1779- return null ;
1780- }
1781- ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shardsBuilder = null ;
1782- final var iterator = completedAssignedQueuedShards .keySet ().iterator ();
1783- while (iterator .hasNext ()) {
1784- final var shardId = iterator .next ();
1785- final var shardSnapshotStatus = entry .shards ().get (shardId );
1786- if (shardSnapshotStatus != null ) {
1787- assert shardSnapshotStatus .isUnassignedQueued () : shardSnapshotStatus ;
1788- if (shardsBuilder == null ) {
1789- shardsBuilder = shardsBuilderSupplier .get ();
1790- }
1791- final ShardSnapshotStatus completedStatus = completedAssignedQueuedShards .get (shardId );
1792- if (completedStatus .state () == ShardState .MISSING ) {
1793- shardsBuilder .put (shardId , completedStatus );
1794- } else {
1795- shardsBuilder .put (shardId , ShardSnapshotStatus .assignedQueued (completedStatus .nodeId (), completedStatus .generation ()));
1796- iterator .remove ();
1797- }
1798- }
1799- }
1800- return shardsBuilder ;
1801- }
1802-
1803- /**
1804- * Update a clone entry with the completed assigned queue shard snapshots accumulated from processing earlier snapshots.
1805- * These shard snapshots are completed because either their snapshots are deleted or the associated shards become unassigned.
1806- * In both cases, previous queued clone shard snapshots for the same shards can now start.
1807- * @param entry The snapshot entry to update.
1808- * @param completedAssignedQueuedShards Completed assigned queued shard snapshots accumulated from processing earlier snapshots.
1809- * Once an entry is applied for update, it is removed from the list.
1810- * @param clonesBuilderSupplier Supplier to get a builder for new clone shard snapshots if any update is applicable.
1811- * @return The updated map of clone shard snapshots for the snapshot entry, or null if no updates were made.
1812- */
1813- static ImmutableOpenMap .Builder <RepositoryShardId , ShardSnapshotStatus > maybeUpdateCloneWithCompletedAssignedQueuedShards (
1814- SnapshotsInProgress .Entry entry ,
1815- Map <ShardId , ShardSnapshotStatus > completedAssignedQueuedShards ,
1816- String localNodeId ,
1817- Supplier <ImmutableOpenMap .Builder <RepositoryShardId , ShardSnapshotStatus >> clonesBuilderSupplier
1818- ) {
1819- assert entry .isClone () : "expected a clone, but got " + entry ;
1820- if (entry .state ().completed () || completedAssignedQueuedShards .isEmpty ()) {
1821- return null ;
1822- }
1823- ImmutableOpenMap .Builder <RepositoryShardId , ShardSnapshotStatus > clonesBuilder = null ;
1824- final var iterator = completedAssignedQueuedShards .keySet ().iterator ();
1825- while (iterator .hasNext ()) {
1826- final var shardId = iterator .next ();
1827- final IndexId indexId = entry .indices ().get (shardId .getIndexName ());
1828- if (indexId != null ) {
1829- final RepositoryShardId repoShardId = new RepositoryShardId (indexId , shardId .id ());
1830- if (SnapshotsServiceUtils .isQueued (entry .shardSnapshotStatusByRepoShardId ().get (repoShardId ))) {
1831- if (clonesBuilder == null ) {
1832- clonesBuilder = clonesBuilderSupplier .get ();
1770+ if (entry .isClone () == false && completedAssignedQueuedShards .isEmpty () == false ) {
1771+ ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shardsBuilder = null ;
1772+ final var iterator = completedAssignedQueuedShards .keySet ().iterator ();
1773+ while (iterator .hasNext ()) {
1774+ final var shardId = iterator .next ();
1775+ final var shardSnapshotStatus = entry .shards ().get (shardId );
1776+ if (shardSnapshotStatus != null ) {
1777+ assert shardSnapshotStatus .isUnassignedQueued () : shardSnapshotStatus ;
1778+ if (shardsBuilder == null ) {
1779+ shardsBuilder = shardsBuilderSupplier .get ();
1780+ }
1781+ shardsBuilder .put (shardId , completedAssignedQueuedShards .get (shardId ));
1782+ if (removeAfterUpdate ) {
1783+ iterator .remove ();
18331784 }
1834- clonesBuilder .put (
1835- repoShardId ,
1836- new ShardSnapshotStatus (localNodeId , completedAssignedQueuedShards .get (shardId ).generation ())
1837- );
1838- iterator .remove ();
18391785 }
18401786 }
1787+ return shardsBuilder ;
1788+ } else {
1789+ return null ;
18411790 }
1842- return clonesBuilder ;
18431791 }
18441792
18451793 private void addDeleteListener (String deleteUUID , ActionListener <Void > listener ) {
@@ -2572,49 +2520,36 @@ private SnapshotsInProgress maybeStartAssignedQueuedShardSnapshotsForRepo(
25722520 final List <SnapshotsInProgress .Entry > newEntries = new ArrayList <>(oldEntries .size ());
25732521 final Map <ShardId , ShardSnapshotStatus > completedAssignedQueuedShards = new HashMap <>();
25742522 for (SnapshotsInProgress .Entry entry : oldEntries ) {
2575- if (entry .isClone ()) {
2576- final var clonesBuilder = maybeUpdateCloneWithCompletedAssignedQueuedShards (
2523+ ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shardsBuilder = maybeUpdateWithCompletedAssignedQueuedShards (
2524+ entry ,
2525+ completedAssignedQueuedShards ,
2526+ false ,
2527+ () -> ImmutableOpenMap .builder (entry .shards ())
2528+ );
2529+ boolean changed = shardsBuilder != null ;
2530+
2531+ if (entry .hasAssignedQueuedShards () && perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
2532+ if (shardsBuilder == null ) {
2533+ shardsBuilder = ImmutableOpenMap .builder (entry .shards ());
2534+ }
2535+ changed |= maybeStartAssignedQueuedShardSnapshots (
2536+ clusterState ,
25772537 entry ,
2578- completedAssignedQueuedShards ,
2579- initialState .nodes ().getLocalNodeId (),
2580- () -> ImmutableOpenMap .builder (entry .shardSnapshotStatusByRepoShardId ())
2538+ snapshotsInProgress ::isNodeIdForRemoval ,
2539+ shardsBuilder ,
2540+ perNodeShardSnapshotCounter ,
2541+ completedAssignedQueuedShards
25812542 );
2582- if (clonesBuilder != null ) {
2583- newEntries .add (entry .withClones (clonesBuilder .build ()));
2584- } else {
2585- newEntries .add (entry );
2586- }
2587- } else {
2588- ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shardsBuilder =
2589- maybeUpdateSnapshotWithCompletedAssignedQueuedShards (
2590- entry ,
2591- completedAssignedQueuedShards ,
2592- () -> ImmutableOpenMap .builder (entry .shards ())
2593- );
2594- boolean changed = shardsBuilder != null ;
2543+ }
25952544
2596- if (entry .hasAssignedQueuedShards () && perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
2597- if (shardsBuilder == null ) {
2598- shardsBuilder = ImmutableOpenMap .builder (entry .shards ());
2599- }
2600- changed |= maybeStartAssignedQueuedShardSnapshots (
2601- clusterState ,
2602- entry ,
2603- snapshotsInProgress ::isNodeIdForRemoval ,
2604- shardsBuilder ,
2605- perNodeShardSnapshotCounter ,
2606- completedAssignedQueuedShards
2607- );
2608- }
2609- if (changed ) {
2610- final var newEntry = entry .withShardStates (shardsBuilder .build ());
2611- newEntries .add (newEntry );
2612- if (newEntry .state ().completed ()) {
2613- newlyCompletedEntries .add (newEntry );
2614- }
2615- } else {
2616- newEntries .add (entry );
2545+ if (changed ) {
2546+ final var newEntry = entry .withShardStates (shardsBuilder .build ());
2547+ newEntries .add (newEntry );
2548+ if (newEntry .state ().completed ()) {
2549+ newlyCompletedEntries .add (newEntry );
26172550 }
2551+ } else {
2552+ newEntries .add (entry );
26182553 }
26192554 }
26202555 return snapshotsInProgress .createCopyWithUpdatedEntriesForRepo (projectRepo .projectId (), projectRepo .name (), newEntries );
@@ -2654,17 +2589,10 @@ private boolean maybeStartAssignedQueuedShardSnapshots(
26542589 );
26552590 if (newShardSnapshotStatus .equals (existingShardSnapshotStatus ) == false ) {
26562591 if (newShardSnapshotStatus .state ().completed ()) {
2657- assert newShardSnapshotStatus .state () == ShardState .MISSING
2658- : shardId + " has unexpected completion state " + newShardSnapshotStatus + " in snapshot entry " + entry ;
2592+ assert newShardSnapshotStatus .state () == ShardState .MISSING : newShardSnapshotStatus ;
26592593 // There might be QUEUED shard snapshots of this shard in later snapshots and we need
26602594 // to propagate the completion to them.
2661- final var old = completedAssignedQueuedShards .put (shardId , newShardSnapshotStatus );
2662- assert old == null
2663- : shardId
2664- + " has unexpected previous completed shard snapshot status: "
2665- + old
2666- + " that conflicts with the one from "
2667- + entry ;
2595+ completedAssignedQueuedShards .put (shardId , newShardSnapshotStatus );
26682596 }
26692597 changedCount ++;
26702598 if (newShardSnapshotStatus .state () == ShardState .INIT ) {
@@ -2721,7 +2649,7 @@ private SnapshotsInProgress.Entry applyUpdatesToEntry(
27212649 ) {
27222650 // Completed snapshots do not require any updates so we just add them to the output list and keep going.
27232651 // Also we short circuit if there are no more unconsumed updates to apply.
2724- if (entry .state ().completed () || ( shardUpdates .isEmpty () && completedAssignedQueuedShards . isEmpty () )) {
2652+ if (entry .state ().completed () || shardUpdates .isEmpty ()) {
27252653 return entry ;
27262654 }
27272655 return new EntryContext (entry , shardUpdates , completedAssignedQueuedShards ).computeUpdatedSnapshotEntryFromShardUpdates ();
@@ -2789,16 +2717,7 @@ SnapshotsInProgress.Entry computeUpdatedSnapshotEntryFromShardUpdates() {
27892717 }
27902718 }
27912719
2792- if (entry .isClone ()) {
2793- maybeUpdateCloneWithCompletedAssignedQueuedShards (
2794- entry ,
2795- completedAssignedQueuedShards ,
2796- initialState .nodes ().getLocalNodeId (),
2797- this ::clonesBuilder
2798- );
2799- } else {
2800- maybeUpdateSnapshotWithCompletedAssignedQueuedShards (entry , completedAssignedQueuedShards , this ::shardsBuilder );
2801- }
2720+ maybeUpdateWithCompletedAssignedQueuedShards (entry , completedAssignedQueuedShards , false , this ::shardsBuilder );
28022721
28032722 if (shardsBuilder != null ) {
28042723 assert clonesBuilder == null
0 commit comments