@@ -273,100 +273,96 @@ public void createSnapshotLegacy(final CreateSnapshotRequest request, final Acti
273
273
final SnapshotId snapshotId = new SnapshotId (snapshotName , UUIDs .randomBase64UUID ()); // new UUID for the snapshot
274
274
Repository repository = repositoriesService .repository (request .repository ());
275
275
final Map <String , Object > userMeta = repository .adaptUserMetadata (request .userMetadata ());
276
- clusterService .submitStateUpdateTask (
277
- "create_snapshot [" + snapshotName + ']' ,
278
- new ClusterStateUpdateTask (request .masterNodeTimeout ()) {
279
-
280
- private List <String > indices ;
281
-
282
- private SnapshotsInProgress .Entry newEntry ;
276
+ repository .executeConsistentStateUpdate (repositoryData -> new ClusterStateUpdateTask (request .masterNodeTimeout ()) {
283
277
284
- @ Override
285
- public ClusterState execute (ClusterState currentState ) {
286
- ensureRepositoryExists (repositoryName , currentState );
287
- validate (repositoryName , snapshotName , currentState );
288
- SnapshotDeletionsInProgress deletionsInProgress = currentState .custom (SnapshotDeletionsInProgress .TYPE );
289
- if (deletionsInProgress != null && deletionsInProgress .hasDeletionsInProgress ()) {
290
- throw new ConcurrentSnapshotExecutionException (
291
- repositoryName ,
292
- snapshotName ,
293
- "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"
294
- );
295
- }
296
- ensureNoCleanupInProgress (currentState , repositoryName , snapshotName , "create snapshot" );
297
- SnapshotsInProgress snapshots = currentState .custom (SnapshotsInProgress .TYPE );
298
- // Fail if there are any concurrently running snapshots. The only exception to this being a snapshot in INIT state from
299
- // a
300
- // previous master that we can simply ignore and remove from the cluster state because we would clean it up from the
301
- // cluster state anyway in #applyClusterState.
302
- if (snapshots != null
303
- && snapshots .asStream ()
304
- .anyMatch (
305
- entry -> (entry .state () == State .INIT && initializingSnapshots .contains (entry .snapshot ()) == false ) == false
306
- )) {
307
- throw new ConcurrentSnapshotExecutionException (repositoryName , snapshotName , " a snapshot is already running" );
308
- }
309
- // Store newSnapshot here to be processed in clusterStateProcessed
310
- indices = Arrays .asList (indexNameExpressionResolver .concreteIndexNames (currentState , request ));
278
+ private List <String > indices ;
311
279
312
- final List <String > dataStreams = indexNameExpressionResolver .dataStreamNames (
313
- currentState ,
314
- request .indicesOptions (),
315
- request .indices ()
316
- );
280
+ private SnapshotsInProgress .Entry newEntry ;
317
281
318
- logger .trace ("[{}][{}] creating snapshot for indices [{}]" , repositoryName , snapshotName , indices );
319
- newEntry = new SnapshotsInProgress .Entry (
320
- new Snapshot (repositoryName , snapshotId ),
321
- request .includeGlobalState (),
322
- request .partial (),
323
- State .INIT ,
324
- Collections .emptyMap (), // We'll resolve the list of indices when moving to the STARTED state in #beginSnapshot
325
- dataStreams ,
326
- threadPool .absoluteTimeInMillis (),
327
- RepositoryData .UNKNOWN_REPO_GEN ,
328
- ImmutableOpenMap .of (),
329
- userMeta ,
330
- Version .CURRENT
282
+ @ Override
283
+ public ClusterState execute (ClusterState currentState ) {
284
+ ensureRepositoryExists (repositoryName , currentState );
285
+ validate (repositoryName , snapshotName , currentState );
286
+ SnapshotDeletionsInProgress deletionsInProgress = currentState .custom (SnapshotDeletionsInProgress .TYPE );
287
+ if (deletionsInProgress != null && deletionsInProgress .hasDeletionsInProgress ()) {
288
+ throw new ConcurrentSnapshotExecutionException (
289
+ repositoryName ,
290
+ snapshotName ,
291
+ "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"
331
292
);
332
- initializingSnapshots .add (newEntry .snapshot ());
333
- snapshots = SnapshotsInProgress .EMPTY .withAddedEntry (newEntry );
334
- return ClusterState .builder (currentState ).putCustom (SnapshotsInProgress .TYPE , snapshots ).build ();
335
293
}
294
+ ensureNoCleanupInProgress (currentState , repositoryName , snapshotName , "create snapshot" );
295
+ SnapshotsInProgress snapshots = currentState .custom (SnapshotsInProgress .TYPE );
296
+ // Fail if there are any concurrently running snapshots. The only exception to this being a snapshot in INIT state from a
297
+ // previous master that we can simply ignore and remove from the cluster state because we would clean it up from the
298
+ // cluster state anyway in #applyClusterState.
299
+ if (snapshots != null
300
+ && snapshots .asStream ()
301
+ .anyMatch (
302
+ entry -> (entry .state () == State .INIT && initializingSnapshots .contains (entry .snapshot ()) == false ) == false
303
+ )) {
304
+ throw new ConcurrentSnapshotExecutionException (repositoryName , snapshotName , " a snapshot is already running" );
305
+ }
306
+ // Store newSnapshot here to be processed in clusterStateProcessed
307
+ indices = Arrays .asList (indexNameExpressionResolver .concreteIndexNames (currentState , request ));
336
308
337
- @ Override
338
- public void onFailure (String source , Exception e ) {
339
- logger .warn (() -> new ParameterizedMessage ("[{}][{}] failed to create snapshot" , repositoryName , snapshotName ), e );
340
- if (newEntry != null ) {
341
- initializingSnapshots .remove (newEntry .snapshot ());
342
- }
343
- newEntry = null ;
344
- listener .onFailure (e );
309
+ final List <String > dataStreams = indexNameExpressionResolver .dataStreamNames (
310
+ currentState ,
311
+ request .indicesOptions (),
312
+ request .indices ()
313
+ );
314
+
315
+ logger .trace ("[{}][{}] creating snapshot for indices [{}]" , repositoryName , snapshotName , indices );
316
+ newEntry = new SnapshotsInProgress .Entry (
317
+ new Snapshot (repositoryName , snapshotId ),
318
+ request .includeGlobalState (),
319
+ request .partial (),
320
+ State .INIT ,
321
+ Collections .emptyMap (), // We'll resolve the list of indices when moving to the STARTED state in #beginSnapshot
322
+ dataStreams ,
323
+ threadPool .absoluteTimeInMillis (),
324
+ RepositoryData .UNKNOWN_REPO_GEN ,
325
+ ImmutableOpenMap .of (),
326
+ userMeta ,
327
+ minCompatibleVersion (currentState .nodes ().getMinNodeVersion (), repositoryData , null )
328
+ );
329
+ initializingSnapshots .add (newEntry .snapshot ());
330
+ snapshots = SnapshotsInProgress .EMPTY .withAddedEntry (newEntry );
331
+ return ClusterState .builder (currentState ).putCustom (SnapshotsInProgress .TYPE , snapshots ).build ();
332
+ }
333
+
334
+ @ Override
335
+ public void onFailure (String source , Exception e ) {
336
+ logger .warn (() -> new ParameterizedMessage ("[{}][{}] failed to create snapshot" , repositoryName , snapshotName ), e );
337
+ if (newEntry != null ) {
338
+ initializingSnapshots .remove (newEntry .snapshot ());
345
339
}
340
+ newEntry = null ;
341
+ listener .onFailure (e );
342
+ }
346
343
347
- @ Override
348
- public void clusterStateProcessed (String source , ClusterState oldState , final ClusterState newState ) {
349
- if (newEntry != null ) {
350
- final Snapshot current = newEntry .snapshot ();
351
- assert initializingSnapshots .contains (current );
352
- assert indices != null ;
353
- beginSnapshot (newState , newEntry , request .partial (), indices , repository , new ActionListener <Snapshot >() {
354
- @ Override
355
- public void onResponse (final Snapshot snapshot ) {
356
- initializingSnapshots .remove (snapshot );
357
- listener .onResponse (snapshot );
358
- }
344
+ @ Override
345
+ public void clusterStateProcessed (String source , ClusterState oldState , final ClusterState newState ) {
346
+ if (newEntry != null ) {
347
+ final Snapshot current = newEntry .snapshot ();
348
+ assert initializingSnapshots .contains (current );
349
+ assert indices != null ;
350
+ beginSnapshot (newState , newEntry , request .partial (), indices , repository , new ActionListener <Snapshot >() {
351
+ @ Override
352
+ public void onResponse (final Snapshot snapshot ) {
353
+ initializingSnapshots .remove (snapshot );
354
+ listener .onResponse (snapshot );
355
+ }
359
356
360
- @ Override
361
- public void onFailure (final Exception e ) {
362
- initializingSnapshots .remove (current );
363
- listener .onFailure (e );
364
- }
365
- });
366
- }
357
+ @ Override
358
+ public void onFailure (final Exception e ) {
359
+ initializingSnapshots .remove (current );
360
+ listener .onFailure (e );
361
+ }
362
+ });
367
363
}
368
364
}
369
- );
365
+ }, "create_snapshot [" + snapshotName + ']' , listener :: onFailure );
370
366
}
371
367
372
368
/**
0 commit comments