1515import org .elasticsearch .action .ActionListener ;
1616import org .elasticsearch .action .admin .indices .alias .IndicesAliasesRequest ;
1717import org .elasticsearch .action .admin .indices .alias .IndicesAliasesRequestBuilder ;
18+ import org .elasticsearch .action .admin .indices .alias .IndicesAliasesResponse ;
1819import org .elasticsearch .action .admin .indices .create .CreateIndexClusterStateUpdateRequest ;
20+ import org .elasticsearch .action .admin .indices .readonly .AddIndexBlockRequest ;
1921import org .elasticsearch .action .admin .indices .settings .put .UpdateSettingsClusterStateUpdateRequest ;
2022import org .elasticsearch .action .support .ActiveShardCount ;
2123import org .elasticsearch .action .support .master .AcknowledgedResponse ;
3234import org .elasticsearch .cluster .metadata .MetadataIndexTemplateService ;
3335import org .elasticsearch .cluster .metadata .MetadataUpdateSettingsService ;
3436import org .elasticsearch .cluster .service .ClusterService ;
35- import org .elasticsearch .common .CheckedBiConsumer ;
3637import org .elasticsearch .common .Strings ;
3738import org .elasticsearch .common .settings .IndexScopedSettings ;
3839import org .elasticsearch .common .settings .Settings ;
5960import java .util .stream .Collectors ;
6061
6162import static org .elasticsearch .action .admin .cluster .migration .TransportGetFeatureUpgradeStatusAction .NO_UPGRADE_REQUIRED_INDEX_VERSION ;
63+ import static org .elasticsearch .cluster .metadata .IndexMetadata .APIBlock .WRITE ;
6264import static org .elasticsearch .cluster .metadata .IndexMetadata .State .CLOSE ;
6365import static org .elasticsearch .core .Strings .format ;
6466
@@ -448,12 +450,33 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
448450 logAndThrowExceptionForFailures (bulkByScrollResponse )
449451 );
450452 } else {
451- // Successful completion of reindexing - remove read only and delete old index
452- setWriteBlock (
453- oldIndex ,
454- false ,
455- delegate2 .delegateFailureAndWrap (setAliasAndRemoveOldIndex (migrationInfo , bulkByScrollResponse ))
456- );
453+ // Successful completion of reindexing. Now we need to set the alias and remove the old index.
454+ setAliasAndRemoveOldIndex (migrationInfo , ActionListener .wrap (aliasesResponse -> {
455+ if (aliasesResponse .hasErrors ()) {
456+ var e = new ElasticsearchException ("Aliases request had errors" );
457+ for (var error : aliasesResponse .getErrors ()) {
458+ e .addSuppressed (error );
459+ }
460+ throw e ;
461+ }
462+ logger .info (
463+ "Successfully migrated old index [{}] to new index [{}] from feature [{}]" ,
464+ oldIndexName ,
465+ migrationInfo .getNextIndexName (),
466+ migrationInfo .getFeatureName ()
467+ );
468+ delegate2 .onResponse (bulkByScrollResponse );
469+ }, e -> {
470+ logger .error (
471+ () -> format (
472+ "An error occurred while changing aliases and removing the old index [%s] from feature [%s]" ,
473+ oldIndexName ,
474+ migrationInfo .getFeatureName ()
475+ ),
476+ e
477+ );
478+ removeReadOnlyBlockOnReindexFailure (oldIndex , delegate2 , e );
479+ }));
457480 }
458481 }, e -> {
459482 logger .error (
@@ -511,10 +534,7 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
511534 );
512535 }
513536
514- private CheckedBiConsumer <ActionListener <BulkByScrollResponse >, AcknowledgedResponse , Exception > setAliasAndRemoveOldIndex (
515- SystemIndexMigrationInfo migrationInfo ,
516- BulkByScrollResponse bulkByScrollResponse
517- ) {
537+ private void setAliasAndRemoveOldIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <IndicesAliasesResponse > listener ) {
518538 final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo .createClient (baseClient ).admin ().indices ().prepareAliases ();
519539 aliasesRequest .removeIndex (migrationInfo .getCurrentIndexName ());
520540 aliasesRequest .addAlias (migrationInfo .getNextIndexName (), migrationInfo .getCurrentIndexName ());
@@ -533,30 +553,42 @@ private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResp
533553 );
534554 });
535555
536- // Technically this callback might have a different cluster state, but it shouldn't matter - these indices shouldn't be changing
537- // while we're trying to migrate them.
538- return (listener , unsetReadOnlyResponse ) -> aliasesRequest .execute (
539- listener .delegateFailureAndWrap ((l , deleteIndexResponse ) -> l .onResponse (bulkByScrollResponse ))
540- );
556+ aliasesRequest .execute (listener );
541557 }
542558
543559 /**
544- * Makes the index readonly if it's not set as a readonly yet
560+ * Sets the write block on the index to the given value.
545561 */
546562 private void setWriteBlock (Index index , boolean readOnlyValue , ActionListener <AcknowledgedResponse > listener ) {
547- final Settings readOnlySettings = Settings .builder ().put (IndexMetadata .INDEX_BLOCKS_WRITE_SETTING .getKey (), readOnlyValue ).build ();
548-
549- metadataUpdateSettingsService .updateSettings (
550- new UpdateSettingsClusterStateUpdateRequest (
551- MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ,
552- TimeValue .ZERO ,
553- readOnlySettings ,
554- UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
555- UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REJECT ,
556- index
557- ),
558- listener
559- );
563+ if (readOnlyValue ) {
564+ // Setting the Block with an AddIndexBlockRequest ensures all shards have accounted for the block and all
565+ // in-flight writes are completed before returning.
566+ baseClient .admin ()
567+ .indices ()
568+ .addBlock (
569+ new AddIndexBlockRequest (WRITE , index .getName ()).masterNodeTimeout (MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ),
570+ listener .delegateFailureAndWrap ((l , response ) -> {
571+ if (response .isAcknowledged () == false ) {
572+ throw new ElasticsearchException ("Failed to acknowledge read-only block index request" );
573+ }
574+ l .onResponse (response );
575+ })
576+ );
577+ } else {
578+ // The only way to remove a Block is via a settings update.
579+ final Settings readOnlySettings = Settings .builder ().put (IndexMetadata .INDEX_BLOCKS_WRITE_SETTING .getKey (), false ).build ();
580+ metadataUpdateSettingsService .updateSettings (
581+ new UpdateSettingsClusterStateUpdateRequest (
582+ MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ,
583+ TimeValue .ZERO ,
584+ readOnlySettings ,
585+ UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
586+ UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REJECT ,
587+ index
588+ ),
589+ listener
590+ );
591+ }
560592 }
561593
562594 private void reindex (SystemIndexMigrationInfo migrationInfo , ActionListener <BulkByScrollResponse > listener ) {
0 commit comments