1515import org .elasticsearch .action .ActionListener ;
1616import org .elasticsearch .action .admin .indices .alias .IndicesAliasesRequest ;
1717import org .elasticsearch .action .admin .indices .alias .IndicesAliasesRequestBuilder ;
18+ import org .elasticsearch .action .admin .indices .alias .IndicesAliasesResponse ;
1819import org .elasticsearch .action .admin .indices .create .CreateIndexClusterStateUpdateRequest ;
20+ import org .elasticsearch .action .admin .indices .readonly .AddIndexBlockRequest ;
1921import org .elasticsearch .action .admin .indices .settings .put .UpdateSettingsClusterStateUpdateRequest ;
2022import org .elasticsearch .action .support .ActiveShardCount ;
2123import org .elasticsearch .action .support .master .AcknowledgedResponse ;
3133import org .elasticsearch .cluster .metadata .MetadataIndexTemplateService ;
3234import org .elasticsearch .cluster .metadata .MetadataUpdateSettingsService ;
3335import org .elasticsearch .cluster .service .ClusterService ;
34- import org .elasticsearch .common .CheckedBiConsumer ;
3536import org .elasticsearch .common .Strings ;
3637import org .elasticsearch .common .settings .IndexScopedSettings ;
3738import org .elasticsearch .common .settings .Settings ;
5859import java .util .stream .Collectors ;
5960
6061import static org .elasticsearch .action .admin .cluster .migration .TransportGetFeatureUpgradeStatusAction .NO_UPGRADE_REQUIRED_INDEX_VERSION ;
62+ import static org .elasticsearch .cluster .metadata .IndexMetadata .APIBlock .WRITE ;
6163import static org .elasticsearch .cluster .metadata .IndexMetadata .State .CLOSE ;
6264import static org .elasticsearch .core .Strings .format ;
6365
@@ -447,12 +449,33 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
447449 logAndThrowExceptionForFailures (bulkByScrollResponse )
448450 );
449451 } else {
450- // Successful completion of reindexing - remove read only and delete old index
451- setWriteBlock (
452- oldIndex ,
453- false ,
454- delegate2 .delegateFailureAndWrap (setAliasAndRemoveOldIndex (migrationInfo , bulkByScrollResponse ))
455- );
452+ // Successful completion of reindexing. Now we need to set the alias and remove the old index.
453+ setAliasAndRemoveOldIndex (migrationInfo , ActionListener .wrap (aliasesResponse -> {
454+ if (aliasesResponse .hasErrors ()) {
455+ var e = new ElasticsearchException ("Aliases request had errors" );
456+ for (var error : aliasesResponse .getErrors ()) {
457+ e .addSuppressed (error );
458+ }
459+ throw e ;
460+ }
461+ logger .info (
462+ "Successfully migrated old index [{}] to new index [{}] from feature [{}]" ,
463+ oldIndexName ,
464+ migrationInfo .getNextIndexName (),
465+ migrationInfo .getFeatureName ()
466+ );
467+ delegate2 .onResponse (bulkByScrollResponse );
468+ }, e -> {
469+ logger .error (
470+ () -> format (
471+ "An error occurred while changing aliases and removing the old index [%s] from feature [%s]" ,
472+ oldIndexName ,
473+ migrationInfo .getFeatureName ()
474+ ),
475+ e
476+ );
477+ removeReadOnlyBlockOnReindexFailure (oldIndex , delegate2 , e );
478+ }));
456479 }
457480 }, e -> {
458481 logger .error (
@@ -504,10 +527,7 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
504527 metadataCreateIndexService .createIndex (TimeValue .MINUS_ONE , TimeValue .ZERO , null , createRequest , listener );
505528 }
506529
507- private CheckedBiConsumer <ActionListener <BulkByScrollResponse >, AcknowledgedResponse , Exception > setAliasAndRemoveOldIndex (
508- SystemIndexMigrationInfo migrationInfo ,
509- BulkByScrollResponse bulkByScrollResponse
510- ) {
530+ private void setAliasAndRemoveOldIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <IndicesAliasesResponse > listener ) {
511531 final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo .createClient (baseClient ).admin ().indices ().prepareAliases ();
512532 aliasesRequest .removeIndex (migrationInfo .getCurrentIndexName ());
513533 aliasesRequest .addAlias (migrationInfo .getNextIndexName (), migrationInfo .getCurrentIndexName ());
@@ -526,30 +546,42 @@ private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResp
526546 );
527547 });
528548
529- // Technically this callback might have a different cluster state, but it shouldn't matter - these indices shouldn't be changing
530- // while we're trying to migrate them.
531- return (listener , unsetReadOnlyResponse ) -> aliasesRequest .execute (
532- listener .delegateFailureAndWrap ((l , deleteIndexResponse ) -> l .onResponse (bulkByScrollResponse ))
533- );
549+ aliasesRequest .execute (listener );
534550 }
535551
536552 /**
537- * Makes the index readonly if it's not set as a readonly yet
553+ * Sets the write block on the index to the given value.
538554 */
539555 private void setWriteBlock (Index index , boolean readOnlyValue , ActionListener <AcknowledgedResponse > listener ) {
540- final Settings readOnlySettings = Settings .builder ().put (IndexMetadata .INDEX_BLOCKS_WRITE_SETTING .getKey (), readOnlyValue ).build ();
541-
542- metadataUpdateSettingsService .updateSettings (
543- new UpdateSettingsClusterStateUpdateRequest (
544- TimeValue .MINUS_ONE ,
545- TimeValue .ZERO ,
546- readOnlySettings ,
547- UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
548- UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REJECT ,
549- index
550- ),
551- listener
552- );
556+ if (readOnlyValue ) {
557+ // Setting the Block with an AddIndexBlockRequest ensures all shards have accounted for the block and all
558+ // in-flight writes are completed before returning.
559+ baseClient .admin ()
560+ .indices ()
561+ .addBlock (
562+ new AddIndexBlockRequest (WRITE , index .getName ()).masterNodeTimeout (TimeValue .MINUS_ONE ),
563+ listener .delegateFailureAndWrap ((l , response ) -> {
564+ if (response .isAcknowledged () == false ) {
565+ throw new ElasticsearchException ("Failed to acknowledge read-only block index request" );
566+ }
567+ l .onResponse (response );
568+ })
569+ );
570+ } else {
571+ // The only way to remove a Block is via a settings update.
572+ final Settings readOnlySettings = Settings .builder ().put (IndexMetadata .INDEX_BLOCKS_WRITE_SETTING .getKey (), false ).build ();
573+ metadataUpdateSettingsService .updateSettings (
574+ new UpdateSettingsClusterStateUpdateRequest (
575+ TimeValue .MINUS_ONE ,
576+ TimeValue .ZERO ,
577+ readOnlySettings ,
578+ UpdateSettingsClusterStateUpdateRequest .OnExisting .OVERWRITE ,
579+ UpdateSettingsClusterStateUpdateRequest .OnStaticSetting .REJECT ,
580+ index
581+ ),
582+ listener
583+ );
584+ }
553585 }
554586
555587 private void reindex (SystemIndexMigrationInfo migrationInfo , ActionListener <BulkByScrollResponse > listener ) {
0 commit comments