Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.migrate.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
Expand All @@ -16,7 +15,6 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteIndexTemplateAction;
Expand All @@ -30,11 +28,9 @@
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.compress.CompressedXContent;
Expand Down Expand Up @@ -82,7 +78,6 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;

public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
Expand Down Expand Up @@ -274,8 +269,7 @@ public void testDestIndexNameSet_noDotPrefix() throws Exception {
assertEquals(expectedDestIndexName, response.getDestIndex());
}

public void testDestIndexNameSet_withDotPrefix() throws Exception {

public void testDestIndexNameSet_withDotPrefix() {
var sourceIndex = "." + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));

Expand All @@ -288,13 +282,19 @@ public void testDestIndexNameSet_withDotPrefix() throws Exception {
assertEquals(expectedDestIndexName, response.getDestIndex());
}

public void testDestIndexContainsDocs() throws Exception {
public void testDestIndexContainsDocs() {
// source index with docs
var numDocs = randomIntBetween(1, 100);
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
indexDocs(sourceIndex, numDocs);

var settings = Settings.builder()
.put(IndexMetadata.SETTING_BLOCKS_METADATA, randomBoolean())
.put(IndexMetadata.SETTING_READ_ONLY, randomBoolean())
.build();
safeGet(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, sourceIndex)));

// call reindex
var response = safeGet(
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
Expand All @@ -305,31 +305,6 @@ public void testDestIndexContainsDocs() throws Exception {
assertHitCount(prepareSearch(response.getDestIndex()).setSize(0), numDocs);
}

public void testSetSourceToBlockWrites() throws Exception {
var settings = randomBoolean() ? Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build() : Settings.EMPTY;

// empty source index
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));

// call reindex
safeGet(client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)));

// Assert that source index is now read-only but not verified read-only
GetSettingsResponse getSettingsResponse = safeGet(
admin().indices().getSettings(new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(sourceIndex))
);
assertTrue(parseBoolean(getSettingsResponse.getSetting(sourceIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));
assertFalse(
parseBoolean(getSettingsResponse.getSetting(sourceIndex, MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING.getKey()))
);

// assert that write to source fails
var indexReq = new IndexRequest(sourceIndex).source(jsonBuilder().startObject().field("field", "1").endObject());
expectThrows(ClusterBlockException.class, client().index(indexReq));
assertHitCount(prepareSearch(sourceIndex).setSize(0), 0);
}

public void testMissingSourceIndex() {
var nonExistentSourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
expectThrows(
Expand Down Expand Up @@ -387,34 +362,6 @@ public void testMappingsAddedToDestIndex() {
assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings));
}

public void testFailIfMetadataBlockSet() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_METADATA, true).build();
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));

ElasticsearchException e = expectThrows(
ElasticsearchException.class,
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
);
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));

cleanupMetadataBlocks(sourceIndex);
}

public void testFailIfReadBlockSet() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_READ, true).build();
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));

ElasticsearchException e = expectThrows(
ElasticsearchException.class,
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
);
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));

cleanupMetadataBlocks(sourceIndex);
}

public void testReadOnlyBlocksNotAddedBack() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder()
Expand All @@ -434,7 +381,6 @@ public void testReadOnlyBlocksNotAddedBack() {
assertFalse(parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)));
assertFalse(parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));

cleanupMetadataBlocks(sourceIndex);
cleanupMetadataBlocks(destIndex);
}

Expand Down Expand Up @@ -752,9 +698,8 @@ private static void cleanupMetadataBlocks(String index) {
var settings = Settings.builder()
.putNull(IndexMetadata.SETTING_READ_ONLY)
.putNull(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA)
.build();
safeGet(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, index)));
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA);
updateIndexSettings(settings, index);
}

private static void indexDocs(String index, int numDocs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -55,11 +56,13 @@
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;

import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;

public class ReindexDataStreamIndexTransportAction extends HandledTransportAction<
ReindexDataStreamIndexAction.Request,
Expand Down Expand Up @@ -145,19 +148,10 @@ protected void doExecute(
);
}

if (settingsBefore.getAsBoolean(IndexMetadata.SETTING_BLOCKS_READ, false)) {
var errorMessage = String.format(Locale.ROOT, "Cannot reindex index [%s] which has a read block.", destIndexName);
listener.onFailure(new ElasticsearchException(errorMessage));
return;
}
if (settingsBefore.getAsBoolean(IndexMetadata.SETTING_BLOCKS_METADATA, false)) {
var errorMessage = String.format(Locale.ROOT, "Cannot reindex index [%s] which has a metadata block.", destIndexName);
listener.onFailure(new ElasticsearchException(errorMessage));
return;
}
final boolean wasClosed = isClosed(sourceIndex);
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
SubscribableListener.<AcknowledgedResponse>newForked(l -> removeMetadataBlocks(sourceIndexName, taskId, l))
.<OpenIndexResponse>andThen(l -> openIndexIfClosed(sourceIndexName, wasClosed, l, taskId))
.<AcknowledgedResponse>andThen(l -> setReadOnly(sourceIndexName, l, taskId))
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
Expand All @@ -166,6 +160,7 @@ protected void doExecute(
.<AcknowledgedResponse>andThen(l -> copyIndexMetadataToDest(sourceIndexName, destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> sanityCheck(sourceIndexName, destIndexName, l, taskId))
.<CloseIndexResponse>andThen(l -> closeIndexIfWasClosed(destIndexName, wasClosed, l, taskId))
.<AcknowledgedResponse>andThen(l -> removeAPIBlocks(sourceIndexName, taskId, l, READ_ONLY))
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
.addListener(listener);
}
Expand Down Expand Up @@ -201,9 +196,9 @@ private static boolean isClosed(IndexMetadata indexMetadata) {
return indexMetadata.getState().equals(IndexMetadata.State.CLOSE);
}

private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
logger.debug("Setting write block on source index [{}]", sourceIndexName);
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
private void setReadOnly(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
logger.debug("Setting read-only on source index [{}]", sourceIndexName);
addBlockToIndex(READ_ONLY, sourceIndexName, new ActionListener<>() {
@Override
public void onResponse(AddIndexBlockResponse response) {
if (response.isAcknowledged()) {
Expand Down Expand Up @@ -399,6 +394,29 @@ private void addBlockToIndex(
client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest, listener);
}

/**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of a weird choice to make this one it's own function, but I wanted to have a comment describing why we need to remove both metadata and read_only

* All metadata blocks need to be removed at the start for the following reasons:
* 1) If the source index has a metadata only block, the read-only block can't be added.
* 2) If the source index is read-only and closed, it can't be opened.
*/
private void removeMetadataBlocks(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
logger.debug("Removing metadata blocks from index [{}]", indexName);
removeAPIBlocks(indexName, parentTaskId, listener, METADATA, READ_ONLY);
}

private void removeAPIBlocks(
String indexName,
TaskId parentTaskId,
ActionListener<AcknowledgedResponse> listener,
IndexMetadata.APIBlock... blocks
) {
Settings.Builder settings = Settings.builder();
Arrays.stream(blocks).forEach(b -> settings.putNull(b.settingName()));
var updateSettingsRequest = new UpdateSettingsRequest(settings.build(), indexName);
updateSettingsRequest.setParentTask(parentTaskId);
client.execute(TransportUpdateSettingsAction.TYPE, updateSettingsRequest, listener);
}

private void getIndexDocCount(String index, TaskId parentTaskId, ActionListener<Long> listener) {
SearchRequest countRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,9 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
if (randomBoolean()) {
closeIndex(oldIndexName);
}
if (randomBoolean()) {
assertOK(client().performRequest(new Request("PUT", oldIndexName + "/_block/read_only")));
}
}
Request reindexRequest = new Request("POST", "/_migration/reindex");
reindexRequest.setJsonEntity(Strings.format("""
Expand Down