Skip to content

Commit a803462

Browse files
[8.x] Add read-block to source index during data stream reindex (#122887) (#123179)
* Add read-block to source index during data stream reindex (#122887) When reindexing a data stream, we currently add a write block to the source indices so that new documents cannot be added to the index while it is being reindexed. A write block still allows the index to be deleted and for the metadata to be updated. It is possible that ILM could delete a backing index or update a backing index's lifecycle metadata while it is being reindexed. To avoid this, this PR sets a read-only block on the source index. This block must be removed before source index can be deleted after it is replaced with the destination index. (cherry picked from commit 2cc86b3) # Conflicts: # x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java # x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java * fix broken test
1 parent f2d5918 commit a803462

File tree

2 files changed

+45
-79
lines changed

2 files changed

+45
-79
lines changed

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 10 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.migrate.action;
99

10-
import org.elasticsearch.ElasticsearchException;
1110
import org.elasticsearch.ResourceNotFoundException;
1211
import org.elasticsearch.action.ActionRequest;
1312
import org.elasticsearch.action.ActionResponse;
@@ -18,7 +17,6 @@
1817
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1918
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
2019
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
21-
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
2220
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
2321
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
2422
import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteIndexTemplateAction;
@@ -36,11 +34,9 @@
3634
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
3735
import org.elasticsearch.action.support.IndicesOptions;
3836
import org.elasticsearch.action.support.master.AcknowledgedRequest;
39-
import org.elasticsearch.cluster.block.ClusterBlockException;
4037
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
4138
import org.elasticsearch.cluster.metadata.IndexMetadata;
4239
import org.elasticsearch.cluster.metadata.MappingMetadata;
43-
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
4440
import org.elasticsearch.cluster.metadata.Template;
4541
import org.elasticsearch.common.bytes.BytesArray;
4642
import org.elasticsearch.common.compress.CompressedXContent;
@@ -97,7 +93,6 @@
9793
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
9894
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
9995
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
100-
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
10196
import static org.hamcrest.Matchers.equalTo;
10297
import static org.hamcrest.Matchers.not;
10398

@@ -304,8 +299,7 @@ public void testDestIndexNameSet_noDotPrefix() throws Exception {
304299
assertEquals(expectedDestIndexName, response.getDestIndex());
305300
}
306301

307-
public void testDestIndexNameSet_withDotPrefix() throws Exception {
308-
302+
public void testDestIndexNameSet_withDotPrefix() {
309303
var sourceIndex = "." + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
310304
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
311305

@@ -318,13 +312,19 @@ public void testDestIndexNameSet_withDotPrefix() throws Exception {
318312
assertEquals(expectedDestIndexName, response.getDestIndex());
319313
}
320314

321-
public void testDestIndexContainsDocs() throws Exception {
315+
public void testDestIndexContainsDocs() {
322316
// source index with docs
323317
var numDocs = randomIntBetween(1, 100);
324318
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
325319
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
326320
indexDocs(sourceIndex, numDocs);
327321

322+
var settings = Settings.builder()
323+
.put(IndexMetadata.SETTING_BLOCKS_METADATA, randomBoolean())
324+
.put(IndexMetadata.SETTING_READ_ONLY, randomBoolean())
325+
.build();
326+
safeGet(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, sourceIndex)));
327+
328328
// call reindex
329329
var response = safeGet(
330330
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
@@ -335,29 +335,6 @@ public void testDestIndexContainsDocs() throws Exception {
335335
assertHitCount(prepareSearch(response.getDestIndex()).setSize(0), numDocs);
336336
}
337337

338-
public void testSetSourceToBlockWrites() throws Exception {
339-
var settings = randomBoolean() ? Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build() : Settings.EMPTY;
340-
341-
// empty source index
342-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
343-
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));
344-
345-
// call reindex
346-
safeGet(client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)));
347-
348-
// Assert that source index is now read-only but not verified read-only
349-
GetSettingsResponse getSettingsResponse = safeGet(admin().indices().getSettings(new GetSettingsRequest().indices(sourceIndex)));
350-
assertTrue(parseBoolean(getSettingsResponse.getSetting(sourceIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));
351-
assertFalse(
352-
parseBoolean(getSettingsResponse.getSetting(sourceIndex, MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING.getKey()))
353-
);
354-
355-
// assert that write to source fails
356-
var indexReq = new IndexRequest(sourceIndex).source(jsonBuilder().startObject().field("field", "1").endObject());
357-
expectThrows(ClusterBlockException.class, client().index(indexReq));
358-
assertHitCount(prepareSearch(sourceIndex).setSize(0), 0);
359-
}
360-
361338
public void testMissingSourceIndex() {
362339
var nonExistentSourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
363340
expectThrows(
@@ -413,34 +390,6 @@ public void testMappingsAddedToDestIndex() {
413390
assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings));
414391
}
415392

416-
public void testFailIfMetadataBlockSet() {
417-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
418-
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_METADATA, true).build();
419-
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));
420-
421-
ElasticsearchException e = expectThrows(
422-
ElasticsearchException.class,
423-
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
424-
);
425-
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));
426-
427-
cleanupMetadataBlocks(sourceIndex);
428-
}
429-
430-
public void testFailIfReadBlockSet() {
431-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
432-
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_READ, true).build();
433-
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));
434-
435-
ElasticsearchException e = expectThrows(
436-
ElasticsearchException.class,
437-
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
438-
);
439-
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));
440-
441-
cleanupMetadataBlocks(sourceIndex);
442-
}
443-
444393
public void testReadOnlyBlocksNotAddedBack() {
445394
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
446395
var settings = Settings.builder()
@@ -460,7 +409,6 @@ public void testReadOnlyBlocksNotAddedBack() {
460409
assertFalse(parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)));
461410
assertFalse(parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));
462411

463-
cleanupMetadataBlocks(sourceIndex);
464412
cleanupMetadataBlocks(destIndex);
465413
}
466414

@@ -807,9 +755,8 @@ private static void cleanupMetadataBlocks(String index) {
807755
var settings = Settings.builder()
808756
.putNull(IndexMetadata.SETTING_READ_ONLY)
809757
.putNull(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
810-
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA)
811-
.build();
812-
safeGet(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, index)));
758+
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA);
759+
updateIndexSettings(settings, index);
813760
}
814761

815762
private static void indexDocs(String index, int numDocs) {

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
2525
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
2626
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
27+
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
2728
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
2829
import org.elasticsearch.action.bulk.BulkItemResponse;
2930
import org.elasticsearch.action.search.SearchRequest;
@@ -59,11 +60,13 @@
5960
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
6061
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
6162

63+
import java.util.Arrays;
6264
import java.util.Locale;
6365
import java.util.Map;
6466
import java.util.Objects;
6567

66-
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
68+
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA;
69+
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
6770

6871
public class ReindexDataStreamIndexTransportAction extends HandledTransportAction<
6972
ReindexDataStreamIndexAction.Request,
@@ -149,20 +152,12 @@ protected void doExecute(
149152
);
150153
}
151154

152-
if (settingsBefore.getAsBoolean(IndexMetadata.SETTING_BLOCKS_READ, false)) {
153-
var errorMessage = String.format(Locale.ROOT, "Cannot reindex index [%s] which has a read block.", destIndexName);
154-
listener.onFailure(new ElasticsearchException(errorMessage));
155-
return;
156-
}
157-
if (settingsBefore.getAsBoolean(IndexMetadata.SETTING_BLOCKS_METADATA, false)) {
158-
var errorMessage = String.format(Locale.ROOT, "Cannot reindex index [%s] which has a metadata block.", destIndexName);
159-
listener.onFailure(new ElasticsearchException(errorMessage));
160-
return;
161-
}
162155
final boolean wasClosed = isClosed(sourceIndex);
163-
SubscribableListener.<FreezeResponse>newForked(l -> unfreezeIfFrozen(sourceIndexName, sourceIndex, l, taskId))
164-
.<AcknowledgedResponse>andThen(l -> setBlockWrites(sourceIndexName, l, taskId))
156+
157+
SubscribableListener.<AcknowledgedResponse>newForked(l -> removeMetadataBlocks(sourceIndexName, taskId, l))
158+
.<FreezeResponse>andThen(l -> unfreezeIfFrozen(sourceIndexName, sourceIndex, l, taskId))
165159
.<OpenIndexResponse>andThen(l -> openIndexIfClosed(sourceIndexName, wasClosed, l, taskId))
160+
.<AcknowledgedResponse>andThen(l -> setReadOnly(sourceIndexName, l, taskId))
166161
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
167162
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
168163
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
@@ -171,6 +166,7 @@ protected void doExecute(
171166
.<AcknowledgedResponse>andThen(l -> copyIndexMetadataToDest(sourceIndexName, destIndexName, l, taskId))
172167
.<AcknowledgedResponse>andThen(l -> sanityCheck(sourceIndexName, destIndexName, l, taskId))
173168
.<CloseIndexResponse>andThen(l -> closeIndexIfWasClosed(destIndexName, wasClosed, l, taskId))
169+
.<AcknowledgedResponse>andThen(l -> removeAPIBlocks(sourceIndexName, taskId, l, READ_ONLY))
174170
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
175171
.addListener(listener);
176172
}
@@ -222,9 +218,9 @@ private void unfreezeIfFrozen(
222218
}
223219
}
224220

225-
private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
226-
logger.debug("Setting write block on source index [{}]", sourceIndexName);
227-
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
221+
private void setReadOnly(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
222+
logger.debug("Setting read-only on source index [{}]", sourceIndexName);
223+
addBlockToIndex(READ_ONLY, sourceIndexName, new ActionListener<>() {
228224
@Override
229225
public void onResponse(AddIndexBlockResponse response) {
230226
if (response.isAcknowledged()) {
@@ -420,6 +416,29 @@ private void addBlockToIndex(
420416
client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest, listener);
421417
}
422418

419+
/**
420+
* All metadata blocks need to be removed at the start for the following reasons:
421+
* 1) If the source index has a metadata only block, the read-only block can't be added.
422+
* 2) If the source index is read-only and closed, it can't be opened.
423+
*/
424+
private void removeMetadataBlocks(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
425+
logger.debug("Removing metadata blocks from index [{}]", indexName);
426+
removeAPIBlocks(indexName, parentTaskId, listener, METADATA, READ_ONLY);
427+
}
428+
429+
private void removeAPIBlocks(
430+
String indexName,
431+
TaskId parentTaskId,
432+
ActionListener<AcknowledgedResponse> listener,
433+
IndexMetadata.APIBlock... blocks
434+
) {
435+
Settings.Builder settings = Settings.builder();
436+
Arrays.stream(blocks).forEach(b -> settings.putNull(b.settingName()));
437+
var updateSettingsRequest = new UpdateSettingsRequest(settings.build(), indexName);
438+
updateSettingsRequest.setParentTask(parentTaskId);
439+
client.execute(TransportUpdateSettingsAction.TYPE, updateSettingsRequest, listener);
440+
}
441+
423442
private void getIndexDocCount(String index, TaskId parentTaskId, ActionListener<Long> listener) {
424443
SearchRequest countRequest = new SearchRequest(index);
425444
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);

0 commit comments

Comments
 (0)