@@ -2851,6 +2851,7 @@ public ClusterState execute(ClusterState currentState) {
28512851
28522852 ClusterState updatedState = res .v1 ();
28532853
2854+ // TODO: deduplicate the code for starting queued-with-gen shards across repos
28542855 final var snapshotsInProgress = SnapshotsInProgress .get (updatedState );
28552856 final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter (
28562857 shardSnapshotPerNodeLimit ,
@@ -2865,33 +2866,14 @@ public ClusterState execute(ClusterState currentState) {
28652866 if (repo .equals (repoForDeletedEntry )) {
28662867 continue ;
28672868 }
2868- final List <SnapshotsInProgress .Entry > oldEntries = snapshotsInProgress .forRepo (repo );
2869- if (oldEntries .isEmpty ()) {
2870- continue ;
2871- }
2872- final List <SnapshotsInProgress .Entry > newEntries = new ArrayList <>(oldEntries .size ());
2873- for (SnapshotsInProgress .Entry entry : oldEntries ) {
2874- if (entry .isClone () == false && perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
2875- final var shardsBuilder = ImmutableOpenMap .builder (entry .shards ());
2876- maybeStartQueuedWithGenerationShardSnapshots (
2877- updatedState ,
2878- entry ,
2879- snapshotsInProgress ::isNodeIdForRemoval ,
2880- shardsBuilder ,
2881- perNodeShardSnapshotCounter ,
2882- () -> {},
2883- () -> {}
2884- );
2885- final var newEntry = entry .withShardStates (shardsBuilder .build ());
2886- newEntries .add (newEntry );
2887- } else {
2888- newEntries .add (entry );
2889- }
2890- }
2891- updatedSnapshotsInProgress = updatedSnapshotsInProgress .createCopyWithUpdatedEntriesForRepo (
2892- repo .projectId (),
2893- repo .name (),
2894- newEntries
2869+ updatedSnapshotsInProgress = maybeStartQueuedWithGenerationShardSnapshotsForRepo (
2870+ repo ,
2871+ updatedState ,
2872+ updatedSnapshotsInProgress ,
2873+ perNodeShardSnapshotCounter ,
2874+ ignore -> {},
2875+ () -> {},
2876+ () -> {}
28952877 );
28962878 }
28972879 if (updatedSnapshotsInProgress != snapshotsInProgress ) {
@@ -3119,6 +3101,44 @@ public String toString() {
31193101 }
31203102 }
31213103
3104+ private static SnapshotsInProgress maybeStartQueuedWithGenerationShardSnapshotsForRepo (
3105+ ProjectRepo projectRepo ,
3106+ ClusterState clusterState ,
3107+ SnapshotsInProgress snapshotsInProgress ,
3108+ PerNodeShardSnapshotCounter perNodeShardSnapshotCounter ,
3109+ Consumer <SnapshotsInProgress .Entry > newEntryConsumer ,
3110+ Runnable changedCallback ,
3111+ Runnable startedCallback
3112+ ) {
3113+ final List <SnapshotsInProgress .Entry > oldEntries = snapshotsInProgress .forRepo (projectRepo );
3114+ if (oldEntries .isEmpty ()) {
3115+ return snapshotsInProgress ;
3116+ }
3117+ final List <SnapshotsInProgress .Entry > newEntries = new ArrayList <>(oldEntries .size ());
3118+ for (SnapshotsInProgress .Entry entry : oldEntries ) {
3119+ if (entry .isClone () == false && perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
3120+ final var shardsBuilder = ImmutableOpenMap .builder (entry .shards ());
3121+ maybeStartQueuedWithGenerationShardSnapshots (
3122+ clusterState ,
3123+ entry ,
3124+ snapshotsInProgress ::isNodeIdForRemoval ,
3125+ shardsBuilder ,
3126+ perNodeShardSnapshotCounter ,
3127+ changedCallback ,
3128+ startedCallback
3129+ );
3130+ final var newEntry = entry .withShardStates (shardsBuilder .build ());
3131+ newEntries .add (newEntry );
3132+ if (newEntry != entry ) {
3133+ newEntryConsumer .accept (newEntry );
3134+ }
3135+ } else {
3136+ newEntries .add (entry );
3137+ }
3138+ }
3139+ return snapshotsInProgress .createCopyWithUpdatedEntriesForRepo (projectRepo .projectId (), projectRepo .name (), newEntries );
3140+ }
3141+
31223142 private static void maybeStartQueuedWithGenerationShardSnapshots (
31233143 ClusterState clusterState ,
31243144 SnapshotsInProgress .Entry entry ,
@@ -3537,37 +3557,24 @@ SnapshotsInProgress computeUpdatedState() {
35373557 updated = updated .createCopyWithUpdatedEntriesForRepo (projectRepo .projectId (), projectRepo .name (), newEntries );
35383558 }
35393559
3560+ // TODO: deduplicate the code for starting queued-with-gen shards across repos
35403561 for (var notUpdatedRepo : Sets .difference (existing .repos (), updatesByRepo .keySet ())) {
35413562 if (perNodeShardSnapshotCounter .hasCapacityOnAnyNode () == false ) {
35423563 break ;
35433564 }
3544- final List <SnapshotsInProgress .Entry > oldEntries = existing .forRepo (notUpdatedRepo );
3545- if (oldEntries .isEmpty ()) {
3546- continue ;
3547- }
3548- final List <SnapshotsInProgress .Entry > newEntries = new ArrayList <>(oldEntries .size ());
3549- for (SnapshotsInProgress .Entry entry : oldEntries ) {
3550- if (entry .isClone () == false && perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
3551- final var shardsBuilder = ImmutableOpenMap .builder (entry .shards ());
3552- maybeStartQueuedWithGenerationShardSnapshots (
3553- initialState ,
3554- entry ,
3555- nodeIdRemovalPredicate ,
3556- shardsBuilder ,
3557- perNodeShardSnapshotCounter ,
3558- () -> changedCount ++,
3559- () -> startedCount ++
3560- );
3561- final var newEntry = entry .withShardStates (shardsBuilder .build ());
3562- newEntries .add (newEntry );
3563- if (newEntry != entry && newEntry .state ().completed ()) {
3565+ updated = maybeStartQueuedWithGenerationShardSnapshotsForRepo (
3566+ notUpdatedRepo ,
3567+ initialState ,
3568+ updated ,
3569+ perNodeShardSnapshotCounter ,
3570+ newEntry -> {
3571+ if (newEntry .state ().completed ()) {
35643572 newlyCompletedEntries .add (newEntry );
35653573 }
3566- } else {
3567- newEntries .add (entry );
3568- }
3569- }
3570- updated = updated .createCopyWithUpdatedEntriesForRepo (notUpdatedRepo .projectId (), notUpdatedRepo .name (), newEntries );
3574+ },
3575+ () -> changedCount ++,
3576+ () -> startedCount ++
3577+ );
35713578 }
35723579
35733580 if (changedCount > 0 ) {
0 commit comments