@@ -3078,101 +3078,6 @@ public String toString() {
30783078 }
30793079 }
30803080
3081- private static SnapshotsInProgress maybeStartAssignedQueuedShardSnapshotsForRepo (
3082- ProjectRepo projectRepo ,
3083- ClusterState clusterState ,
3084- SnapshotsInProgress snapshotsInProgress ,
3085- PerNodeShardSnapshotCounter perNodeShardSnapshotCounter ,
3086- TriConsumer <Snapshot , ShardId , ShardSnapshotStatus > shardStatusUpdateConsumer ,
3087- Consumer <SnapshotsInProgress .Entry > newEntryConsumer ,
3088- Runnable changedCallback ,
3089- Runnable startedCallback
3090- ) {
3091- assert perNodeShardSnapshotCounter .hasCapacityOnAnyNode () : "no capacity left on any node " + perNodeShardSnapshotCounter ;
3092- final List <SnapshotsInProgress .Entry > oldEntries = snapshotsInProgress .forRepo (projectRepo );
3093- if (oldEntries .isEmpty () || oldEntries .stream ().allMatch (entry -> entry .hasAssignedQueuedShards () == false )) {
3094- return snapshotsInProgress ;
3095- }
3096- final List <SnapshotsInProgress .Entry > newEntries = new ArrayList <>(oldEntries .size ());
3097- for (SnapshotsInProgress .Entry entry : oldEntries ) {
3098- if (entry .hasAssignedQueuedShards () && perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
3099- final var shardsBuilder = ImmutableOpenMap .builder (entry .shards ());
3100- final var changed = maybeStartAssignedQueuedShardSnapshots (
3101- clusterState ,
3102- entry ,
3103- snapshotsInProgress ::isNodeIdForRemoval ,
3104- shardsBuilder ,
3105- perNodeShardSnapshotCounter ,
3106- shardStatusUpdateConsumer ,
3107- changedCallback ,
3108- startedCallback
3109- );
3110- if (changed ) {
3111- final var newEntry = entry .withShardStates (shardsBuilder .build ());
3112- newEntries .add (newEntry );
3113- newEntryConsumer .accept (newEntry );
3114- } else {
3115- newEntries .add (entry );
3116- }
3117- } else {
3118- newEntries .add (entry );
3119- }
3120- }
3121- return snapshotsInProgress .createCopyWithUpdatedEntriesForRepo (projectRepo .projectId (), projectRepo .name (), newEntries );
3122- }
3123-
3124- private static boolean maybeStartAssignedQueuedShardSnapshots (
3125- ClusterState clusterState ,
3126- SnapshotsInProgress .Entry entry ,
3127- Predicate <String > nodeIdRemovalPredicate ,
3128- ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shardsBuilder ,
3129- PerNodeShardSnapshotCounter perNodeShardSnapshotCounter ,
3130- TriConsumer <Snapshot , ShardId , ShardSnapshotStatus > shardStatusUpdateConsumer ,
3131- Runnable changedCallback ,
3132- Runnable startedCallback
3133- ) {
3134- assert entry .hasAssignedQueuedShards () : "entry has no assigned queued shards: " + entry ;
3135- assert perNodeShardSnapshotCounter .hasCapacityOnAnyNode () : "no capacity left on any node " + perNodeShardSnapshotCounter ;
3136- boolean changed = false ;
3137- for (var shardId : shardsBuilder .keys ()) {
3138- if (perNodeShardSnapshotCounter .hasCapacityOnAnyNode () == false ) {
3139- return changed ;
3140- }
3141- final var existingShardSnapshotStatus = shardsBuilder .get (shardId );
3142- if (existingShardSnapshotStatus .isAssignedQueued () == false ) {
3143- continue ;
3144- }
3145- final IndexRoutingTable indexRouting = clusterState .routingTable (entry .projectId ()).index (shardId .getIndex ());
3146- final ShardRouting shardRouting ;
3147- if (indexRouting == null ) {
3148- shardRouting = null ;
3149- } else {
3150- shardRouting = indexRouting .shard (shardId .id ()).primaryShard ();
3151- }
3152- final var newShardSnapshotStatus = initShardSnapshotStatus (
3153- existingShardSnapshotStatus .generation (),
3154- shardRouting ,
3155- nodeIdRemovalPredicate ,
3156- perNodeShardSnapshotCounter
3157- );
3158- if (newShardSnapshotStatus .state ().completed ()) {
3159- // It can become complete if the shard is unassigned or deleted, i.e. state == MISSING.
3160- // We cannot directly update its status here because there maybe another snapshot for
3161- // the same shard that is QUEUED which must be updated as well, i.e. vertical update.
3162- // So we submit the status update to let it be processed in a future cluster state update.
3163- shardStatusUpdateConsumer .apply (entry .snapshot (), shardId , newShardSnapshotStatus );
3164- } else if (newShardSnapshotStatus .equals (existingShardSnapshotStatus ) == false ) {
3165- changedCallback .run ();
3166- if (newShardSnapshotStatus .state () == ShardState .INIT ) {
3167- startedCallback .run ();
3168- }
3169- shardsBuilder .put (shardId , newShardSnapshotStatus );
3170- changed = true ;
3171- }
3172- }
3173- return changed ;
3174- }
3175-
31763081 /**
31773082 * Shortcut to build new {@link ClusterState} from the current state and updated values of {@link SnapshotsInProgress} and
31783083 * {@link SnapshotDeletionsInProgress}.
@@ -3570,14 +3475,7 @@ SnapshotsInProgress computeUpdatedState() {
35703475 initialState ,
35713476 updated ,
35723477 perNodeShardSnapshotCounter ,
3573- shardStatusUpdateConsumer ,
3574- newEntry -> {
3575- if (newEntry .state ().completed ()) {
3576- newlyCompletedEntries .add (newEntry );
3577- }
3578- },
3579- () -> changedCount ++,
3580- () -> startedCount ++
3478+ shardStatusUpdateConsumer
35813479 );
35823480 }
35833481
@@ -3592,6 +3490,96 @@ SnapshotsInProgress computeUpdatedState() {
35923490 return existing ;
35933491 }
35943492
3493+ private SnapshotsInProgress maybeStartAssignedQueuedShardSnapshotsForRepo (
3494+ ProjectRepo projectRepo ,
3495+ ClusterState clusterState ,
3496+ SnapshotsInProgress snapshotsInProgress ,
3497+ PerNodeShardSnapshotCounter perNodeShardSnapshotCounter ,
3498+ TriConsumer <Snapshot , ShardId , ShardSnapshotStatus > shardStatusUpdateConsumer
3499+ ) {
3500+ assert perNodeShardSnapshotCounter .hasCapacityOnAnyNode () : "no capacity left on any node " + perNodeShardSnapshotCounter ;
3501+ final List <SnapshotsInProgress .Entry > oldEntries = snapshotsInProgress .forRepo (projectRepo );
3502+ if (oldEntries .isEmpty () || oldEntries .stream ().allMatch (entry -> entry .hasAssignedQueuedShards () == false )) {
3503+ return snapshotsInProgress ;
3504+ }
3505+ final List <SnapshotsInProgress .Entry > newEntries = new ArrayList <>(oldEntries .size ());
3506+ for (SnapshotsInProgress .Entry entry : oldEntries ) {
3507+ if (entry .hasAssignedQueuedShards () && perNodeShardSnapshotCounter .hasCapacityOnAnyNode ()) {
3508+ final var shardsBuilder = ImmutableOpenMap .builder (entry .shards ());
3509+ final var changed = maybeStartAssignedQueuedShardSnapshots (
3510+ clusterState ,
3511+ entry ,
3512+ snapshotsInProgress ::isNodeIdForRemoval ,
3513+ shardsBuilder ,
3514+ perNodeShardSnapshotCounter ,
3515+ shardStatusUpdateConsumer
3516+ );
3517+ if (changed ) {
3518+ final var newEntry = entry .withShardStates (shardsBuilder .build ());
3519+ newEntries .add (newEntry );
3520+ if (newEntry .state ().completed ()) {
3521+ newlyCompletedEntries .add (newEntry );
3522+ }
3523+ } else {
3524+ newEntries .add (entry );
3525+ }
3526+ } else {
3527+ newEntries .add (entry );
3528+ }
3529+ }
3530+ return snapshotsInProgress .createCopyWithUpdatedEntriesForRepo (projectRepo .projectId (), projectRepo .name (), newEntries );
3531+ }
3532+
3533+ private boolean maybeStartAssignedQueuedShardSnapshots (
3534+ ClusterState clusterState ,
3535+ SnapshotsInProgress .Entry entry ,
3536+ Predicate <String > nodeIdRemovalPredicate ,
3537+ ImmutableOpenMap .Builder <ShardId , ShardSnapshotStatus > shardsBuilder ,
3538+ PerNodeShardSnapshotCounter perNodeShardSnapshotCounter ,
3539+ TriConsumer <Snapshot , ShardId , ShardSnapshotStatus > shardStatusUpdateConsumer
3540+ ) {
3541+ assert entry .hasAssignedQueuedShards () : "entry has no assigned queued shards: " + entry ;
3542+ assert perNodeShardSnapshotCounter .hasCapacityOnAnyNode () : "no capacity left on any node " + perNodeShardSnapshotCounter ;
3543+ boolean changed = false ;
3544+ for (var shardId : shardsBuilder .keys ()) {
3545+ if (perNodeShardSnapshotCounter .hasCapacityOnAnyNode () == false ) {
3546+ return changed ;
3547+ }
3548+ final var existingShardSnapshotStatus = shardsBuilder .get (shardId );
3549+ if (existingShardSnapshotStatus .isAssignedQueued () == false ) {
3550+ continue ;
3551+ }
3552+ final IndexRoutingTable indexRouting = clusterState .routingTable (entry .projectId ()).index (shardId .getIndex ());
3553+ final ShardRouting shardRouting ;
3554+ if (indexRouting == null ) {
3555+ shardRouting = null ;
3556+ } else {
3557+ shardRouting = indexRouting .shard (shardId .id ()).primaryShard ();
3558+ }
3559+ final var newShardSnapshotStatus = initShardSnapshotStatus (
3560+ existingShardSnapshotStatus .generation (),
3561+ shardRouting ,
3562+ nodeIdRemovalPredicate ,
3563+ perNodeShardSnapshotCounter
3564+ );
3565+ if (newShardSnapshotStatus .state ().completed ()) {
3566+ // It can become complete if the shard is unassigned or deleted, i.e. state == MISSING.
3567+ // We cannot directly update its status here because there maybe another snapshot for
3568+ // the same shard that is QUEUED which must be updated as well, i.e. vertical update.
3569+ // So we submit the status update to let it be processed in a future cluster state update.
3570+ shardStatusUpdateConsumer .apply (entry .snapshot (), shardId , newShardSnapshotStatus );
3571+ } else if (newShardSnapshotStatus .equals (existingShardSnapshotStatus ) == false ) {
3572+ changedCount ++;
3573+ if (newShardSnapshotStatus .state () == ShardState .INIT ) {
3574+ startedCount ++;
3575+ }
3576+ shardsBuilder .put (shardId , newShardSnapshotStatus );
3577+ changed = true ;
3578+ }
3579+ }
3580+ return changed ;
3581+ }
3582+
35953583 /**
35963584 * Sets up the final callback {@link #completionHandler} to run after the {@link MasterService} successfully publishes the batched
35973585 * update {@link #batchExecutionContext}. Also sets up the callers of the tasks within the {@link #batchExecutionContext} to receive
@@ -3707,9 +3695,7 @@ SnapshotsInProgress.Entry computeUpdatedSnapshotEntryFromShardUpdates() {
37073695 nodeIdRemovalPredicate ,
37083696 shardsBuilder ,
37093697 perNodeShardSnapshotCounter ,
3710- shardStatusUpdateConsumer ,
3711- () -> changedCount ++,
3712- () -> startedCount ++
3698+ shardStatusUpdateConsumer
37133699 );
37143700 }
37153701 return entry .withShardStates (shardsBuilder .build ());
0 commit comments