23
23
import org .elasticsearch .action .support .ActionFilters ;
24
24
import org .elasticsearch .action .support .ContextPreservingActionListener ;
25
25
import org .elasticsearch .action .support .GroupedActionListener ;
26
+ import org .elasticsearch .action .support .RefCountingRunnable ;
26
27
import org .elasticsearch .action .support .master .TransportMasterNodeAction ;
27
28
import org .elasticsearch .cluster .ClusterChangedEvent ;
28
29
import org .elasticsearch .cluster .ClusterState ;
70
71
import org .elasticsearch .common .util .Maps ;
71
72
import org .elasticsearch .common .util .concurrent .EsExecutors ;
72
73
import org .elasticsearch .common .util .concurrent .ListenableFuture ;
73
- import org .elasticsearch .common .util .concurrent .RunOnce ;
74
74
import org .elasticsearch .core .Nullable ;
75
75
import org .elasticsearch .core .SuppressForbidden ;
76
76
import org .elasticsearch .core .Tuple ;
@@ -188,6 +188,8 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
188
188
189
189
private final MasterServiceTaskQueue <SnapshotTask > masterServiceTaskQueue ;
190
190
191
+ private final ShardSnapshotUpdateCompletionHandler shardSnapshotUpdateCompletionHandler ;
192
+
191
193
/**
192
194
* Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the
193
195
* cluster state. The number of concurrent operations in a cluster state is defined as the sum of
@@ -238,6 +240,7 @@ public SnapshotsService(
238
240
this .systemIndices = systemIndices ;
239
241
240
242
this .masterServiceTaskQueue = clusterService .createTaskQueue ("snapshots-service" , Priority .NORMAL , new SnapshotTaskExecutor ());
243
+ this .shardSnapshotUpdateCompletionHandler = this ::handleShardSnapshotUpdateCompletion ;
241
244
}
242
245
243
246
/**
@@ -3084,16 +3087,19 @@ static final class SnapshotShardsUpdateContext {
3084
3087
// updates that were used to update an existing in-progress shard snapshot
3085
3088
private final Set <ShardSnapshotUpdate > executedUpdates = new HashSet <>();
3086
3089
3087
- // enqueues a reroute because some shard snapshots finished
3088
- private final Runnable rerouteRunnable ;
3090
+ // handles the completion of some shard-snapshot updates, performing the next possible actions
3091
+ private final ShardSnapshotUpdateCompletionHandler completionHandler ;
3092
+
3093
+ // entries that became complete due to this batch of updates
3094
+ private final List <SnapshotsInProgress .Entry > newlyCompletedEntries = new ArrayList <>();
3089
3095
3090
3096
SnapshotShardsUpdateContext (
3091
3097
ClusterStateTaskExecutor .BatchExecutionContext <SnapshotTask > batchExecutionContext ,
3092
- Runnable rerouteRunnable
3098
+ ShardSnapshotUpdateCompletionHandler completionHandler
3093
3099
) {
3094
3100
this .batchExecutionContext = batchExecutionContext ;
3095
3101
this .initialState = batchExecutionContext .initialState ();
3096
- this .rerouteRunnable = new RunOnce ( rerouteRunnable ); // RunOnce to avoid enqueueing O(#shards) listeners
3102
+ this .completionHandler = completionHandler ;
3097
3103
this .updatesByRepo = new HashMap <>();
3098
3104
for (final var taskContext : batchExecutionContext .taskContexts ()) {
3099
3105
if (taskContext .getTask () instanceof ShardSnapshotUpdate task ) {
@@ -3113,7 +3119,11 @@ SnapshotsInProgress computeUpdatedState() {
3113
3119
}
3114
3120
final List <SnapshotsInProgress .Entry > newEntries = new ArrayList <>(oldEntries .size ());
3115
3121
for (SnapshotsInProgress .Entry entry : oldEntries ) {
3116
- newEntries .add (applyToEntry (entry , updates .getValue ()));
3122
+ final var newEntry = applyToEntry (entry , updates .getValue ());
3123
+ newEntries .add (newEntry );
3124
+ if (newEntry != entry && newEntry .state ().completed ()) {
3125
+ newlyCompletedEntries .add (newEntry );
3126
+ }
3117
3127
}
3118
3128
updated = updated .withUpdatedEntriesForRepo (repoName , newEntries );
3119
3129
}
@@ -3132,12 +3142,20 @@ SnapshotsInProgress computeUpdatedState() {
3132
3142
void completeWithUpdatedState (SnapshotsInProgress snapshotsInProgress ) {
3133
3143
if (updatesByRepo .isEmpty () == false ) {
3134
3144
final var result = new ShardSnapshotUpdateResult (initialState .metadata (), snapshotsInProgress );
3135
- for (final var taskContext : batchExecutionContext .taskContexts ()) {
3136
- if (taskContext .getTask () instanceof ShardSnapshotUpdate task ) {
3137
- taskContext .success (() -> {
3138
- rerouteRunnable .run ();
3139
- task .listener .onResponse (result );
3140
- });
3145
+ try (
3146
+ var onCompletionRefs = new RefCountingRunnable (
3147
+ () -> completionHandler .handleCompletion (result , newlyCompletedEntries , updatesByRepo .keySet ())
3148
+ )
3149
+ ) {
3150
+ for (final var taskContext : batchExecutionContext .taskContexts ()) {
3151
+ if (taskContext .getTask () instanceof ShardSnapshotUpdate task ) {
3152
+ final var ref = onCompletionRefs .acquire ();
3153
+ taskContext .success (() -> {
3154
+ try (ref ) {
3155
+ task .listener .onResponse (result );
3156
+ }
3157
+ });
3158
+ }
3141
3159
}
3142
3160
}
3143
3161
}
@@ -3376,6 +3394,37 @@ private ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder() {
3376
3394
*/
3377
3395
record ShardSnapshotUpdateResult (Metadata metadata , SnapshotsInProgress snapshotsInProgress ) {}
3378
3396
3397
+ interface ShardSnapshotUpdateCompletionHandler {
3398
+ void handleCompletion (
3399
+ ShardSnapshotUpdateResult shardSnapshotUpdateResult ,
3400
+ List <SnapshotsInProgress .Entry > newlyCompletedEntries ,
3401
+ Set <String > updatedRepositories
3402
+ );
3403
+ }
3404
+
3405
+ private void handleShardSnapshotUpdateCompletion (
3406
+ ShardSnapshotUpdateResult shardSnapshotUpdateResult ,
3407
+ List <SnapshotsInProgress .Entry > newlyCompletedEntries ,
3408
+ Set <String > updatedRepositories
3409
+ ) {
3410
+ // Maybe this state update completed one or more snapshots. If we are not already ending them because of some earlier update, end
3411
+ // them now.
3412
+ final var snapshotsInProgress = shardSnapshotUpdateResult .snapshotsInProgress ();
3413
+ for (final var newlyCompletedEntry : newlyCompletedEntries ) {
3414
+ if (endingSnapshots .contains (newlyCompletedEntry .snapshot ()) == false ) {
3415
+ endSnapshot (newlyCompletedEntry , shardSnapshotUpdateResult .metadata , null );
3416
+ }
3417
+ }
3418
+ // Likewise this state update may enable some new shard clones on any affected repository, so check them all.
3419
+ for (final var updatedRepository : updatedRepositories ) {
3420
+ startExecutableClones (snapshotsInProgress , updatedRepository );
3421
+ }
3422
+ // Also shard snapshot completions may free up some shards to move to other nodes, so we must trigger a reroute.
3423
+ if (updatedRepositories .isEmpty () == false ) {
3424
+ rerouteService .reroute ("after shards snapshot update" , Priority .NORMAL , ActionListener .noop ());
3425
+ }
3426
+ }
3427
+
3379
3428
/**
3380
3429
* An update to the snapshot state of a shard.
3381
3430
*
@@ -3455,23 +3504,13 @@ private void innerUpdateSnapshotState(
3455
3504
ShardSnapshotStatus updatedState ,
3456
3505
ActionListener <Void > listener
3457
3506
) {
3458
- var update = new ShardSnapshotUpdate (snapshot , shardId , repoShardId , updatedState , listener .delegateFailure ((delegate , result ) -> {
3459
- try {
3460
- delegate .onResponse (null );
3461
- } finally {
3462
- // Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
3463
- // state update we check if its state is completed and end it if it is.
3464
- final SnapshotsInProgress snapshotsInProgress = result .snapshotsInProgress ();
3465
- if (endingSnapshots .contains (snapshot ) == false ) {
3466
- final SnapshotsInProgress .Entry updatedEntry = snapshotsInProgress .snapshot (snapshot );
3467
- // If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
3468
- if (updatedEntry != null && updatedEntry .state ().completed ()) {
3469
- endSnapshot (updatedEntry , result .metadata (), null );
3470
- }
3471
- }
3472
- startExecutableClones (snapshotsInProgress , snapshot .getRepository ());
3473
- }
3474
- }));
3507
+ var update = new ShardSnapshotUpdate (
3508
+ snapshot ,
3509
+ shardId ,
3510
+ repoShardId ,
3511
+ updatedState ,
3512
+ listener .delegateFailure ((delegate , result ) -> delegate .onResponse (null ))
3513
+ );
3475
3514
logger .trace ("received updated snapshot restore state [{}]" , update );
3476
3515
masterServiceTaskQueue .submitTask ("update snapshot state" , update , null );
3477
3516
}
@@ -3751,7 +3790,7 @@ public ClusterState execute(BatchExecutionContext<SnapshotTask> batchExecutionCo
3751
3790
final ClusterState state = batchExecutionContext .initialState ();
3752
3791
final SnapshotShardsUpdateContext shardsUpdateContext = new SnapshotShardsUpdateContext (
3753
3792
batchExecutionContext ,
3754
- () -> rerouteService . reroute ( "after shards snapshot update" , Priority . NORMAL , ActionListener . noop ())
3793
+ shardSnapshotUpdateCompletionHandler
3755
3794
);
3756
3795
final SnapshotsInProgress initialSnapshots = SnapshotsInProgress .get (state );
3757
3796
SnapshotsInProgress snapshotsInProgress = shardsUpdateContext .computeUpdatedState ();
0 commit comments