15
15
import org .elasticsearch .action .ActionListener ;
16
16
import org .elasticsearch .action .admin .indices .alias .IndicesAliasesRequest ;
17
17
import org .elasticsearch .action .admin .indices .alias .IndicesAliasesRequestBuilder ;
18
+ import org .elasticsearch .action .admin .indices .alias .IndicesAliasesResponse ;
18
19
import org .elasticsearch .action .admin .indices .create .CreateIndexClusterStateUpdateRequest ;
20
+ import org .elasticsearch .action .admin .indices .readonly .AddIndexBlockRequest ;
19
21
import org .elasticsearch .action .admin .indices .settings .put .UpdateSettingsClusterStateUpdateRequest ;
20
22
import org .elasticsearch .action .support .ActiveShardCount ;
21
23
import org .elasticsearch .action .support .master .AcknowledgedResponse ;
32
34
import org .elasticsearch .cluster .metadata .MetadataIndexTemplateService ;
33
35
import org .elasticsearch .cluster .metadata .MetadataUpdateSettingsService ;
34
36
import org .elasticsearch .cluster .service .ClusterService ;
35
- import org .elasticsearch .common .CheckedBiConsumer ;
36
37
import org .elasticsearch .common .Strings ;
37
38
import org .elasticsearch .common .settings .IndexScopedSettings ;
38
39
import org .elasticsearch .common .settings .Settings ;
59
60
import java .util .stream .Collectors ;
60
61
61
62
import static org .elasticsearch .action .admin .cluster .migration .TransportGetFeatureUpgradeStatusAction .NO_UPGRADE_REQUIRED_INDEX_VERSION ;
63
+ import static org .elasticsearch .cluster .metadata .IndexMetadata .APIBlock .WRITE ;
62
64
import static org .elasticsearch .cluster .metadata .IndexMetadata .State .CLOSE ;
63
65
import static org .elasticsearch .core .Strings .format ;
64
66
@@ -448,12 +450,33 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
448
450
logAndThrowExceptionForFailures (bulkByScrollResponse )
449
451
);
450
452
} else {
451
- // Successful completion of reindexing - remove read only and delete old index
452
- setWriteBlock (
453
- oldIndex ,
454
- false ,
455
- delegate2 .delegateFailureAndWrap (setAliasAndRemoveOldIndex (migrationInfo , bulkByScrollResponse ))
456
- );
453
+ // Successful completion of reindexing. Now we need to set the alias and remove the old index.
454
+ setAliasAndRemoveOldIndex (migrationInfo , ActionListener .wrap (aliasesResponse -> {
455
+ if (aliasesResponse .hasErrors ()) {
456
+ var e = new ElasticsearchException ("Aliases request had errors" );
457
+ for (var error : aliasesResponse .getErrors ()) {
458
+ e .addSuppressed (error );
459
+ }
460
+ throw e ;
461
+ }
462
+ logger .info (
463
+ "Successfully migrated old index [{}] to new index [{}] from feature [{}]" ,
464
+ oldIndexName ,
465
+ migrationInfo .getNextIndexName (),
466
+ migrationInfo .getFeatureName ()
467
+ );
468
+ delegate2 .onResponse (bulkByScrollResponse );
469
+ }, e -> {
470
+ logger .error (
471
+ () -> format (
472
+ "An error occurred while changing aliases and removing the old index [%s] from feature [%s]" ,
473
+ oldIndexName ,
474
+ migrationInfo .getFeatureName ()
475
+ ),
476
+ e
477
+ );
478
+ removeReadOnlyBlockOnReindexFailure (oldIndex , delegate2 , e );
479
+ }));
457
480
}
458
481
}, e -> {
459
482
logger .error (
@@ -511,10 +534,7 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
511
534
);
512
535
}
513
536
514
- private CheckedBiConsumer <ActionListener <BulkByScrollResponse >, AcknowledgedResponse , Exception > setAliasAndRemoveOldIndex (
515
- SystemIndexMigrationInfo migrationInfo ,
516
- BulkByScrollResponse bulkByScrollResponse
517
- ) {
537
+ private void setAliasAndRemoveOldIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <IndicesAliasesResponse > listener ) {
518
538
final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo .createClient (baseClient ).admin ().indices ().prepareAliases ();
519
539
aliasesRequest .removeIndex (migrationInfo .getCurrentIndexName ());
520
540
aliasesRequest .addAlias (migrationInfo .getNextIndexName (), migrationInfo .getCurrentIndexName ());
@@ -533,30 +553,42 @@ private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResp
533
553
);
534
554
});
535
555
536
- // Technically this callback might have a different cluster state, but it shouldn't matter - these indices shouldn't be changing
537
- // while we're trying to migrate them.
538
- return (listener , unsetReadOnlyResponse ) -> aliasesRequest .execute (
539
- listener .delegateFailureAndWrap ((l , deleteIndexResponse ) -> l .onResponse (bulkByScrollResponse ))
540
- );
556
+ aliasesRequest .execute (listener );
541
557
}
542
558
543
559
/**
544
- * Makes the index readonly if it's not set as a readonly yet
560
+ * Sets the write block on the index to the given value.
545
561
*/
546
562
private void setWriteBlock (Index index , boolean readOnlyValue , ActionListener <AcknowledgedResponse > listener ) {
547
- final Settings readOnlySettings = Settings .builder ().put (IndexMetadata .INDEX_BLOCKS_WRITE_SETTING .getKey (), readOnlyValue ).build ();
548
-
549
- metadataUpdateSettingsService .updateSettings (
550
- new UpdateSettingsClusterStateUpdateRequest (
551
- MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ,
552
- TimeValue .ZERO ,
553
- readOnlySettings ,
554
- UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
555
- UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REJECT ,
556
- index
557
- ),
558
- listener
559
- );
563
+ if (readOnlyValue ) {
564
+ // Setting the Block with an AddIndexBlockRequest ensures all shards have accounted for the block and all
565
+ // in-flight writes are completed before returning.
566
+ baseClient .admin ()
567
+ .indices ()
568
+ .addBlock (
569
+ new AddIndexBlockRequest (WRITE , index .getName ()).masterNodeTimeout (MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ),
570
+ listener .delegateFailureAndWrap ((l , response ) -> {
571
+ if (response .isAcknowledged () == false ) {
572
+ throw new ElasticsearchException ("Failed to acknowledge read-only block index request" );
573
+ }
574
+ l .onResponse (response );
575
+ })
576
+ );
577
+ } else {
578
+ // The only way to remove a Block is via a settings update.
579
+ final Settings readOnlySettings = Settings .builder ().put (IndexMetadata .INDEX_BLOCKS_WRITE_SETTING .getKey (), false ).build ();
580
+ metadataUpdateSettingsService .updateSettings (
581
+ new UpdateSettingsClusterStateUpdateRequest (
582
+ MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ,
583
+ TimeValue .ZERO ,
584
+ readOnlySettings ,
585
+ UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
586
+ UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REJECT ,
587
+ index
588
+ ),
589
+ listener
590
+ );
591
+ }
560
592
}
561
593
562
594
private void reindex (SystemIndexMigrationInfo migrationInfo , ActionListener <BulkByScrollResponse > listener ) {
0 commit comments