Skip to content

Commit fd74362

Browse files
committed
Set source index to read-only
1 parent 894db68 commit fd74362

File tree

3 files changed

+52
-34
lines changed

3 files changed

+52
-34
lines changed

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.xcontent.XContentType;
5050
import org.elasticsearch.xpack.migrate.MigratePlugin;
5151
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
52+
import org.junit.After;
5253
import org.junit.Before;
5354

5455
import java.io.IOException;
@@ -68,12 +69,22 @@
6869

6970
public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
7071

72+
private String sourceIndex;
73+
7174
@Before
7275
private void setup() throws Exception {
76+
sourceIndex = null;
7377
deletePipeline(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME);
7478
assertBusy(() -> { assertTrue(getPipelines(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME).isFound()); });
7579
}
7680

81+
@After
82+
private void cleanup() {
83+
if (sourceIndex != null) {
84+
cleanupMetadataBlocks(sourceIndex);
85+
}
86+
}
87+
7788
private static final String MAPPING = """
7889
{
7990
"_doc":{
@@ -110,7 +121,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
110121
""";
111122

112123
public void testTimestamp0AddedIfMissing() {
113-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
124+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
114125
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
115126

116127
// add doc without timestamp
@@ -135,7 +146,7 @@ public void testTimestamp0AddedIfMissing() {
135146

136147
public void testTimestampNotAddedIfExists() {
137148

138-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
149+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
139150
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
140151

141152
// add doc with timestamp
@@ -185,7 +196,7 @@ public void testCustomReindexPipeline() {
185196

186197
safeGet(clusterAdmin().execute(PutPipelineTransportAction.TYPE, putRequest));
187198

188-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
199+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
189200
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
190201

191202
// add doc with timestamp
@@ -212,7 +223,7 @@ public void testCustomReindexPipeline() {
212223

213224
public void testDestIndexDeletedIfExists() throws Exception {
214225
// empty source index
215-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
226+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
216227
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
217228

218229
// dest index with docs
@@ -231,7 +242,7 @@ public void testDestIndexDeletedIfExists() throws Exception {
231242
}
232243

233244
public void testDestIndexNameSet_noDotPrefix() throws Exception {
234-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
245+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
235246
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
236247

237248
// call reindex
@@ -243,9 +254,8 @@ public void testDestIndexNameSet_noDotPrefix() throws Exception {
243254
assertEquals(expectedDestIndexName, response.getDestIndex());
244255
}
245256

246-
public void testDestIndexNameSet_withDotPrefix() throws Exception {
247-
248-
var sourceIndex = "." + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
257+
public void testDestIndexNameSet_withDotPrefix() {
258+
sourceIndex = "." + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
249259
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
250260

251261
// call reindex
@@ -257,10 +267,10 @@ public void testDestIndexNameSet_withDotPrefix() throws Exception {
257267
assertEquals(expectedDestIndexName, response.getDestIndex());
258268
}
259269

260-
public void testDestIndexContainsDocs() throws Exception {
270+
public void testDestIndexContainsDocs() {
261271
// source index with docs
262272
var numDocs = randomIntBetween(1, 100);
263-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
273+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
264274
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
265275
indexDocs(sourceIndex, numDocs);
266276

@@ -274,19 +284,19 @@ public void testDestIndexContainsDocs() throws Exception {
274284
assertHitCount(prepareSearch(response.getDestIndex()).setSize(0), numDocs);
275285
}
276286

277-
public void testSetSourceToBlockWrites() throws Exception {
278-
var settings = randomBoolean() ? Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build() : Settings.EMPTY;
287+
public void testSetSourceToReadOnly() throws Exception {
288+
var settings = randomBoolean() ? Settings.builder().put(IndexMetadata.SETTING_READ_ONLY, true).build() : Settings.EMPTY;
279289

280290
// empty source index
281-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
291+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
282292
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));
283293

284294
// call reindex
285295
safeGet(client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)));
286296

287297
// Assert that source index is now read-only but not verified read-only
288298
GetSettingsResponse getSettingsResponse = safeGet(admin().indices().getSettings(new GetSettingsRequest().indices(sourceIndex)));
289-
assertTrue(parseBoolean(getSettingsResponse.getSetting(sourceIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));
299+
assertTrue(parseBoolean(getSettingsResponse.getSetting(sourceIndex, IndexMetadata.SETTING_READ_ONLY)));
290300
assertFalse(
291301
parseBoolean(getSettingsResponse.getSetting(sourceIndex, MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING.getKey()))
292302
);
@@ -309,7 +319,7 @@ public void testSettingsAddedBeforeReindex() {
309319
// start with a static setting
310320
var numShards = randomIntBetween(1, 10);
311321
var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build();
312-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
322+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
313323
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, staticSettings)));
314324

315325
// update with a dynamic setting
@@ -334,7 +344,7 @@ public void testSettingsAddedBeforeReindex() {
334344
}
335345

336346
public void testMappingsAddedToDestIndex() {
337-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
347+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
338348
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(MAPPING)));
339349

340350
// call reindex
@@ -355,7 +365,7 @@ public void testMappingsAddedToDestIndex() {
355365
}
356366

357367
public void testFailIfMetadataBlockSet() {
358-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
368+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
359369
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_METADATA, true).build();
360370
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));
361371

@@ -364,12 +374,10 @@ public void testFailIfMetadataBlockSet() {
364374
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
365375
);
366376
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));
367-
368-
cleanupMetadataBlocks(sourceIndex);
369377
}
370378

371379
public void testFailIfReadBlockSet() {
372-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
380+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
373381
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_READ, true).build();
374382
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));
375383

@@ -378,12 +386,10 @@ public void testFailIfReadBlockSet() {
378386
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
379387
);
380388
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));
381-
382-
cleanupMetadataBlocks(sourceIndex);
383389
}
384390

385391
public void testReadOnlyBlocksNotAddedBack() {
386-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
392+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
387393
var settings = Settings.builder()
388394
.put(IndexMetadata.SETTING_READ_ONLY, randomBoolean())
389395
.put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, randomBoolean())
@@ -401,7 +407,6 @@ public void testReadOnlyBlocksNotAddedBack() {
401407
assertFalse(parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)));
402408
assertFalse(parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));
403409

404-
cleanupMetadataBlocks(sourceIndex);
405410
cleanupMetadataBlocks(destIndex);
406411
}
407412

@@ -414,7 +419,7 @@ public void testUpdateSettingsDefaultsRestored() {
414419
indicesAdmin().execute(TransportDeleteIndexTemplateAction.TYPE, new DeleteIndexTemplateRequest("random_index_template"))
415420
);
416421

417-
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
422+
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
418423
assertAcked(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
419424

420425
// call reindex
@@ -453,7 +458,7 @@ public void testSettingsAndMappingsFromTemplate() throws IOException {
453458
request.indexTemplate(template);
454459
safeGet(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request));
455460

456-
var sourceIndex = "logs-" + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
461+
sourceIndex = "logs-" + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
457462
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
458463

459464
{
@@ -588,9 +593,8 @@ private static void cleanupMetadataBlocks(String index) {
588593
var settings = Settings.builder()
589594
.putNull(IndexMetadata.SETTING_READ_ONLY)
590595
.putNull(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
591-
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA)
592-
.build();
593-
safeGet(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, index)));
596+
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA);
597+
updateIndexSettings(settings, index);
594598
}
595599

596600
private static void indexDocs(String index, int numDocs) {

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
import java.util.Map;
6060
import java.util.Objects;
6161

62-
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
62+
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
6363

6464
public class ReindexDataStreamIndexTransportAction extends HandledTransportAction<
6565
ReindexDataStreamIndexAction.Request,
@@ -156,7 +156,7 @@ protected void doExecute(
156156
return;
157157
}
158158
final boolean wasClosed = isClosed(sourceIndex);
159-
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
159+
SubscribableListener.<AcknowledgedResponse>newForked(l -> setReadOnly(sourceIndexName, l, taskId))
160160
.<OpenIndexResponse>andThen(l -> openIndexIfClosed(sourceIndexName, wasClosed, l, taskId))
161161
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
162162
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
@@ -201,9 +201,9 @@ private static boolean isClosed(IndexMetadata indexMetadata) {
201201
return indexMetadata.getState().equals(IndexMetadata.State.CLOSE);
202202
}
203203

204-
private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
205-
logger.debug("Setting write block on source index [{}]", sourceIndexName);
206-
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
204+
private void setReadOnly(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
205+
logger.debug("Setting read-only on source index [{}]", sourceIndexName);
206+
addBlockToIndex(READ_ONLY, sourceIndexName, new ActionListener<>() {
207207
@Override
208208
public void onResponse(AddIndexBlockResponse response) {
209209
if (response.isAcknowledged()) {

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
1616
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
1717
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
18+
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
19+
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
1820
import org.elasticsearch.action.datastreams.GetDataStreamAction;
1921
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
2022
import org.elasticsearch.action.support.CountDownActionListener;
@@ -23,8 +25,10 @@
2325
import org.elasticsearch.client.internal.Client;
2426
import org.elasticsearch.cluster.metadata.DataStream;
2527
import org.elasticsearch.cluster.metadata.DataStreamAction;
28+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2629
import org.elasticsearch.cluster.service.ClusterService;
2730
import org.elasticsearch.common.settings.Setting;
31+
import org.elasticsearch.common.settings.Settings;
2832
import org.elasticsearch.core.Nullable;
2933
import org.elasticsearch.core.TimeValue;
3034
import org.elasticsearch.index.Index;
@@ -218,6 +222,7 @@ private void maybeProcessNextIndex(
218222
.<AcknowledgedResponse>andThen(
219223
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId)
220224
)
225+
.<AcknowledgedResponse>andThen(l -> removeReadOnlyBlock(index.getName(), parentTaskId, l))
221226
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), parentTaskId, l))
222227
.addListener(ActionListener.wrap(unused -> {
223228
reindexDataStreamTask.reindexSucceeded(index.getName());
@@ -246,6 +251,15 @@ private void updateDataStream(
246251
client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
247252
}
248253

254+
private void removeReadOnlyBlock(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
255+
var updateSettingsRequest = new UpdateSettingsRequest(
256+
Settings.builder().putNull(IndexMetadata.SETTING_READ_ONLY).build(),
257+
indexName
258+
);
259+
updateSettingsRequest.setParentTask(parentTaskId);
260+
client.execute(TransportUpdateSettingsAction.TYPE, updateSettingsRequest, listener);
261+
}
262+
249263
private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
250264
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
251265
deleteIndexRequest.setParentTask(parentTaskId);

0 commit comments

Comments
 (0)