Skip to content

Commit 9ce964c

Browse files
Add read-block to source index during data stream reindex (#122887) (#123180)
When reindexing a data stream, we currently add a write block to the source indices so that new documents cannot be added to the index while it is being reindexed. A write block still allows the index to be deleted and for the metadata to be updated. It is possible that ILM could delete a backing index or update a backing index's lifecycle metadata while it is being reindexed. To avoid this, this PR sets a read-only block on the source index. This block must be removed before source index can be deleted after it is replaced with the destination index. (cherry picked from commit 2cc86b3) # Conflicts: # x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java
1 parent 93df6c1 commit 9ce964c

File tree

3 files changed

+46
-78
lines changed

3 files changed

+46
-78
lines changed

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 10 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.migrate.action;
99

10-
import org.elasticsearch.ElasticsearchException;
1110
import org.elasticsearch.ResourceNotFoundException;
1211
import org.elasticsearch.action.DocWriteRequest;
1312
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -16,7 +15,6 @@
1615
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1716
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
1817
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
19-
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
2018
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
2119
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
2220
import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteIndexTemplateAction;
@@ -26,11 +24,9 @@
2624
import org.elasticsearch.action.index.IndexRequest;
2725
import org.elasticsearch.action.ingest.PutPipelineRequest;
2826
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
29-
import org.elasticsearch.cluster.block.ClusterBlockException;
3027
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
3128
import org.elasticsearch.cluster.metadata.IndexMetadata;
3229
import org.elasticsearch.cluster.metadata.MappingMetadata;
33-
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
3430
import org.elasticsearch.cluster.metadata.Template;
3531
import org.elasticsearch.common.bytes.BytesArray;
3632
import org.elasticsearch.common.compress.CompressedXContent;
@@ -63,7 +59,6 @@
6359
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6460
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
6561
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
66-
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
6762
import static org.hamcrest.Matchers.equalTo;
6863

6964
public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
@@ -243,8 +238,7 @@ public void testDestIndexNameSet_noDotPrefix() throws Exception {
243238
assertEquals(expectedDestIndexName, response.getDestIndex());
244239
}
245240

246-
public void testDestIndexNameSet_withDotPrefix() throws Exception {
247-
241+
public void testDestIndexNameSet_withDotPrefix() {
248242
var sourceIndex = "." + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
249243
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
250244

@@ -257,13 +251,19 @@ public void testDestIndexNameSet_withDotPrefix() throws Exception {
257251
assertEquals(expectedDestIndexName, response.getDestIndex());
258252
}
259253

260-
public void testDestIndexContainsDocs() throws Exception {
254+
public void testDestIndexContainsDocs() {
261255
// source index with docs
262256
var numDocs = randomIntBetween(1, 100);
263257
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
264258
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
265259
indexDocs(sourceIndex, numDocs);
266260

261+
var settings = Settings.builder()
262+
.put(IndexMetadata.SETTING_BLOCKS_METADATA, randomBoolean())
263+
.put(IndexMetadata.SETTING_READ_ONLY, randomBoolean())
264+
.build();
265+
safeGet(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, sourceIndex)));
266+
267267
// call reindex
268268
var response = safeGet(
269269
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
@@ -274,29 +274,6 @@ public void testDestIndexContainsDocs() throws Exception {
274274
assertHitCount(prepareSearch(response.getDestIndex()).setSize(0), numDocs);
275275
}
276276

277-
public void testSetSourceToBlockWrites() throws Exception {
278-
var settings = randomBoolean() ? Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build() : Settings.EMPTY;
279-
280-
// empty source index
281-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
282-
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));
283-
284-
// call reindex
285-
safeGet(client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)));
286-
287-
// Assert that source index is now read-only but not verified read-only
288-
GetSettingsResponse getSettingsResponse = safeGet(admin().indices().getSettings(new GetSettingsRequest().indices(sourceIndex)));
289-
assertTrue(parseBoolean(getSettingsResponse.getSetting(sourceIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));
290-
assertFalse(
291-
parseBoolean(getSettingsResponse.getSetting(sourceIndex, MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING.getKey()))
292-
);
293-
294-
// assert that write to source fails
295-
var indexReq = new IndexRequest(sourceIndex).source(jsonBuilder().startObject().field("field", "1").endObject());
296-
expectThrows(ClusterBlockException.class, client().index(indexReq));
297-
assertHitCount(prepareSearch(sourceIndex).setSize(0), 0);
298-
}
299-
300277
public void testMissingSourceIndex() {
301278
var nonExistentSourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
302279
expectThrows(
@@ -354,34 +331,6 @@ public void testMappingsAddedToDestIndex() {
354331
assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings));
355332
}
356333

357-
public void testFailIfMetadataBlockSet() {
358-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
359-
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_METADATA, true).build();
360-
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));
361-
362-
ElasticsearchException e = expectThrows(
363-
ElasticsearchException.class,
364-
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
365-
);
366-
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));
367-
368-
cleanupMetadataBlocks(sourceIndex);
369-
}
370-
371-
public void testFailIfReadBlockSet() {
372-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
373-
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_READ, true).build();
374-
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));
375-
376-
ElasticsearchException e = expectThrows(
377-
ElasticsearchException.class,
378-
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
379-
);
380-
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));
381-
382-
cleanupMetadataBlocks(sourceIndex);
383-
}
384-
385334
public void testReadOnlyBlocksNotAddedBack() {
386335
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
387336
var settings = Settings.builder()
@@ -401,7 +350,6 @@ public void testReadOnlyBlocksNotAddedBack() {
401350
assertFalse(parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)));
402351
assertFalse(parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));
403352

404-
cleanupMetadataBlocks(sourceIndex);
405353
cleanupMetadataBlocks(destIndex);
406354
}
407355

@@ -588,9 +536,8 @@ private static void cleanupMetadataBlocks(String index) {
588536
var settings = Settings.builder()
589537
.putNull(IndexMetadata.SETTING_READ_ONLY)
590538
.putNull(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
591-
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA)
592-
.build();
593-
safeGet(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, index)));
539+
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA);
540+
updateIndexSettings(settings, index);
594541
}
595542

596543
private static void indexDocs(String index, int numDocs) {

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
2525
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
2626
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
27+
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
2728
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
2829
import org.elasticsearch.action.bulk.BulkItemResponse;
2930
import org.elasticsearch.action.search.SearchRequest;
@@ -55,11 +56,13 @@
5556
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
5657
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
5758

59+
import java.util.Arrays;
5860
import java.util.Locale;
5961
import java.util.Map;
6062
import java.util.Objects;
6163

62-
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
64+
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA;
65+
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
6366

6467
public class ReindexDataStreamIndexTransportAction extends HandledTransportAction<
6568
ReindexDataStreamIndexAction.Request,
@@ -145,19 +148,10 @@ protected void doExecute(
145148
);
146149
}
147150

148-
if (settingsBefore.getAsBoolean(IndexMetadata.SETTING_BLOCKS_READ, false)) {
149-
var errorMessage = String.format(Locale.ROOT, "Cannot reindex index [%s] which has a read block.", destIndexName);
150-
listener.onFailure(new ElasticsearchException(errorMessage));
151-
return;
152-
}
153-
if (settingsBefore.getAsBoolean(IndexMetadata.SETTING_BLOCKS_METADATA, false)) {
154-
var errorMessage = String.format(Locale.ROOT, "Cannot reindex index [%s] which has a metadata block.", destIndexName);
155-
listener.onFailure(new ElasticsearchException(errorMessage));
156-
return;
157-
}
158151
final boolean wasClosed = isClosed(sourceIndex);
159-
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
152+
SubscribableListener.<AcknowledgedResponse>newForked(l -> removeMetadataBlocks(sourceIndexName, taskId, l))
160153
.<OpenIndexResponse>andThen(l -> openIndexIfClosed(sourceIndexName, wasClosed, l, taskId))
154+
.<AcknowledgedResponse>andThen(l -> setReadOnly(sourceIndexName, l, taskId))
161155
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
162156
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
163157
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
@@ -166,6 +160,7 @@ protected void doExecute(
166160
.<AcknowledgedResponse>andThen(l -> copyIndexMetadataToDest(sourceIndexName, destIndexName, l, taskId))
167161
.<AcknowledgedResponse>andThen(l -> sanityCheck(sourceIndexName, destIndexName, l, taskId))
168162
.<CloseIndexResponse>andThen(l -> closeIndexIfWasClosed(destIndexName, wasClosed, l, taskId))
163+
.<AcknowledgedResponse>andThen(l -> removeAPIBlocks(sourceIndexName, taskId, l, READ_ONLY))
169164
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
170165
.addListener(listener);
171166
}
@@ -201,9 +196,9 @@ private static boolean isClosed(IndexMetadata indexMetadata) {
201196
return indexMetadata.getState().equals(IndexMetadata.State.CLOSE);
202197
}
203198

204-
private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
205-
logger.debug("Setting write block on source index [{}]", sourceIndexName);
206-
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
199+
private void setReadOnly(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
200+
logger.debug("Setting read-only on source index [{}]", sourceIndexName);
201+
addBlockToIndex(READ_ONLY, sourceIndexName, new ActionListener<>() {
207202
@Override
208203
public void onResponse(AddIndexBlockResponse response) {
209204
if (response.isAcknowledged()) {
@@ -397,6 +392,29 @@ private void addBlockToIndex(
397392
client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest, listener);
398393
}
399394

395+
/**
396+
* All metadata blocks need to be removed at the start for the following reasons:
397+
* 1) If the source index has a metadata only block, the read-only block can't be added.
398+
* 2) If the source index is read-only and closed, it can't be opened.
399+
*/
400+
private void removeMetadataBlocks(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
401+
logger.debug("Removing metadata blocks from index [{}]", indexName);
402+
removeAPIBlocks(indexName, parentTaskId, listener, METADATA, READ_ONLY);
403+
}
404+
405+
private void removeAPIBlocks(
406+
String indexName,
407+
TaskId parentTaskId,
408+
ActionListener<AcknowledgedResponse> listener,
409+
IndexMetadata.APIBlock... blocks
410+
) {
411+
Settings.Builder settings = Settings.builder();
412+
Arrays.stream(blocks).forEach(b -> settings.putNull(b.settingName()));
413+
var updateSettingsRequest = new UpdateSettingsRequest(settings.build(), indexName);
414+
updateSettingsRequest.setParentTask(parentTaskId);
415+
client.execute(TransportUpdateSettingsAction.TYPE, updateSettingsRequest, listener);
416+
}
417+
400418
private void getIndexDocCount(String index, TaskId parentTaskId, ActionListener<Long> listener) {
401419
SearchRequest countRequest = new SearchRequest(index);
402420
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,9 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
548548
if (randomBoolean()) {
549549
closeIndex(oldIndexName);
550550
}
551+
if (randomBoolean()) {
552+
assertOK(client().performRequest(new Request("PUT", oldIndexName + "/_block/read_only")));
553+
}
551554
}
552555
Request reindexRequest = new Request("POST", "/_migration/reindex");
553556
reindexRequest.setJsonEntity(Strings.format("""

0 commit comments

Comments
 (0)