Skip to content

Commit 0f6ab6c

Browse files
authored
Handling frozen indices in reindex data streams (#122279)
1 parent 67d2e25 commit 0f6ab6c

File tree

5 files changed

+113
-3
lines changed

5 files changed

+113
-3
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.action.search.TransportSearchScrollAction;
3030
import org.elasticsearch.index.reindex.ReindexAction;
3131
import org.elasticsearch.xpack.core.XPackPlugin;
32+
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
3233
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
3334
import org.elasticsearch.xpack.core.security.support.MetadataUtils;
3435

@@ -215,6 +216,7 @@ public class InternalUsers {
215216
TransportUpdateSettingsAction.TYPE.name(),
216217
RefreshAction.NAME,
217218
ReindexAction.NAME,
219+
FreezeIndexAction.NAME,
218220
TransportSearchAction.NAME,
219221
TransportBulkAction.NAME,
220222
TransportIndexAction.NAME,

x-pack/plugin/migrate/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ dependencies {
1717
compileOnly project(path: xpackModule('core'))
1818
testImplementation(testArtifact(project(xpackModule('core'))))
1919
testImplementation project(xpackModule('ccr'))
20+
testImplementation project(xpackModule('frozen-indices'))
2021
testImplementation project(':modules:data-streams')
2122
testImplementation project(path: ':modules:reindex')
2223
testImplementation project(path: ':modules:ingest-common')

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
import org.elasticsearch.ElasticsearchException;
1111
import org.elasticsearch.ResourceNotFoundException;
12+
import org.elasticsearch.action.ActionRequest;
13+
import org.elasticsearch.action.ActionResponse;
1214
import org.elasticsearch.action.DocWriteRequest;
1315
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
1416
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
@@ -38,25 +40,34 @@
3840
import org.elasticsearch.cluster.metadata.Template;
3941
import org.elasticsearch.common.bytes.BytesArray;
4042
import org.elasticsearch.common.compress.CompressedXContent;
43+
import org.elasticsearch.common.settings.Setting;
4144
import org.elasticsearch.common.settings.Settings;
4245
import org.elasticsearch.common.time.DateFormatter;
4346
import org.elasticsearch.common.time.FormatNames;
4447
import org.elasticsearch.common.xcontent.support.XContentMapValues;
4548
import org.elasticsearch.datastreams.DataStreamsPlugin;
4649
import org.elasticsearch.index.IndexSettings;
50+
import org.elasticsearch.index.engine.frozen.FrozenEngine;
4751
import org.elasticsearch.index.mapper.DateFieldMapper;
4852
import org.elasticsearch.ingest.common.IngestCommonPlugin;
53+
import org.elasticsearch.plugins.ActionPlugin;
4954
import org.elasticsearch.plugins.Plugin;
55+
import org.elasticsearch.protocol.xpack.frozen.FreezeRequest;
56+
import org.elasticsearch.protocol.xpack.frozen.FreezeResponse;
5057
import org.elasticsearch.reindex.ReindexPlugin;
5158
import org.elasticsearch.test.ESIntegTestCase;
5259
import org.elasticsearch.test.transport.MockTransportService;
5360
import org.elasticsearch.xcontent.XContentType;
61+
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
62+
import org.elasticsearch.xpack.frozen.action.TransportFreezeIndexAction;
5463
import org.elasticsearch.xpack.migrate.MigratePlugin;
5564
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
5665
import org.junit.Before;
5766

5867
import java.io.IOException;
5968
import java.time.Instant;
69+
import java.util.ArrayList;
70+
import java.util.Arrays;
6071
import java.util.Collection;
6172
import java.util.List;
6273
import java.util.Locale;
@@ -69,6 +80,7 @@
6980
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
7081
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
7182
import static org.hamcrest.Matchers.equalTo;
83+
import static org.hamcrest.Matchers.not;
7284

7385
public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
7486

@@ -112,7 +124,8 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
112124
ReindexPlugin.class,
113125
MockTransportService.TestPlugin.class,
114126
DataStreamsPlugin.class,
115-
IngestCommonPlugin.class
127+
IngestCommonPlugin.class,
128+
TestFrozenIndicesPlugin.class
116129
);
117130
}
118131

@@ -595,6 +608,43 @@ public void testTsdbStartEndSet() throws Exception {
595608
assertHitCount(prepareSearch(destIndex).setSize(0), 1);
596609
}
597610

611+
public void testIndexUnfrozen() {
612+
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
613+
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
614+
615+
// add doc with timestamp
616+
String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
617+
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
618+
addDoc(sourceIndex, doc);
619+
FreezeRequest freezeRequest = new FreezeRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, sourceIndex);
620+
freezeRequest.setFreeze(true);
621+
FreezeResponse freezeResponse = safeGet(client().execute(FreezeIndexAction.INSTANCE, freezeRequest));
622+
assertAcked(freezeResponse);
623+
assertThat(
624+
safeGet(admin().indices().getSettings(new GetSettingsRequest().indices(sourceIndex))).getIndexToSettings()
625+
.get(sourceIndex)
626+
.get(FrozenEngine.INDEX_FROZEN.getKey()),
627+
not(equalTo(null))
628+
);
629+
630+
String destIndex = safeGet(
631+
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
632+
).getDestIndex();
633+
634+
assertThat(
635+
safeGet(admin().indices().getSettings(new GetSettingsRequest().indices(sourceIndex))).getIndexToSettings()
636+
.get(sourceIndex)
637+
.get(FrozenEngine.INDEX_FROZEN.getKey()),
638+
equalTo(null)
639+
);
640+
assertThat(
641+
safeGet(admin().indices().getSettings(new GetSettingsRequest().indices(destIndex))).getIndexToSettings()
642+
.get(destIndex)
643+
.get(FrozenEngine.INDEX_FROZEN.getKey()),
644+
equalTo(null)
645+
);
646+
}
647+
598648
private static void cleanupMetadataBlocks(String index) {
599649
var settings = Settings.builder()
600650
.putNull(IndexMetadata.SETTING_READ_ONLY)
@@ -635,4 +685,22 @@ private void ensureHealth(String index) {
635685
ensureYellow(index);
636686
}
637687
}
688+
689+
/*
690+
* This takes the place of the real FrozenIndices plugin. We can't use that one because its EngineFactory conflicts with the one used
691+
* by this test. We only need the settings and the mapping of the FreezeIndexAction though.
692+
*/
693+
public static class TestFrozenIndicesPlugin extends Plugin implements ActionPlugin {
694+
@Override
695+
public List<Setting<?>> getSettings() {
696+
return Arrays.asList(FrozenEngine.INDEX_FROZEN);
697+
}
698+
699+
@Override
700+
public List<ActionPlugin.ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
701+
List<ActionPlugin.ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>();
702+
actions.add(new ActionPlugin.ActionHandler<>(FreezeIndexAction.INSTANCE, TransportFreezeIndexAction.class));
703+
return actions;
704+
}
705+
}
638706
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,21 @@
4242
import org.elasticsearch.core.Assertions;
4343
import org.elasticsearch.core.TimeValue;
4444
import org.elasticsearch.index.IndexSettings;
45+
import org.elasticsearch.index.engine.frozen.FrozenEngine;
4546
import org.elasticsearch.index.reindex.BulkByScrollResponse;
4647
import org.elasticsearch.index.reindex.ReindexAction;
4748
import org.elasticsearch.index.reindex.ReindexRequest;
4849
import org.elasticsearch.index.reindex.ScrollableHitSource;
4950
import org.elasticsearch.injection.guice.Inject;
51+
import org.elasticsearch.protocol.xpack.frozen.FreezeRequest;
52+
import org.elasticsearch.protocol.xpack.frozen.FreezeResponse;
5053
import org.elasticsearch.search.builder.SearchSourceBuilder;
5154
import org.elasticsearch.tasks.Task;
5255
import org.elasticsearch.tasks.TaskId;
5356
import org.elasticsearch.threadpool.ThreadPool;
5457
import org.elasticsearch.transport.TransportService;
5558
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
59+
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
5660
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
5761

5862
import java.util.Locale;
@@ -156,7 +160,8 @@ protected void doExecute(
156160
return;
157161
}
158162
final boolean wasClosed = isClosed(sourceIndex);
159-
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
163+
SubscribableListener.<FreezeResponse>newForked(l -> unfreezeIfFrozen(sourceIndexName, sourceIndex, l, taskId))
164+
.<AcknowledgedResponse>andThen(l -> setBlockWrites(sourceIndexName, l, taskId))
160165
.<OpenIndexResponse>andThen(l -> openIndexIfClosed(sourceIndexName, wasClosed, l, taskId))
161166
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
162167
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
@@ -200,6 +205,22 @@ private static boolean isClosed(IndexMetadata indexMetadata) {
200205
return indexMetadata.getState().equals(IndexMetadata.State.CLOSE);
201206
}
202207

208+
private void unfreezeIfFrozen(
209+
String sourceIndexName,
210+
IndexMetadata indexMetadata,
211+
ActionListener<FreezeResponse> listener,
212+
TaskId parentTaskId
213+
) {
214+
if (FrozenEngine.INDEX_FROZEN.get(indexMetadata.getSettings()).equals(Boolean.TRUE)) {
215+
logger.debug("Unfreezing source index [{}]", sourceIndexName);
216+
FreezeRequest freezeRequest = new FreezeRequest(TimeValue.MAX_VALUE, TimeValue.MAX_VALUE, sourceIndexName).setFreeze(false);
217+
freezeRequest.setParentTask(parentTaskId);
218+
client.execute(FreezeIndexAction.INSTANCE, freezeRequest, listener);
219+
} else {
220+
listener.onResponse(null);
221+
}
222+
}
223+
203224
private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
204225
logger.debug("Setting write block on source index [{}]", sourceIndexName);
205226
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.upgrades;
88

9+
import org.apache.http.client.methods.HttpPost;
910
import org.apache.http.util.EntityUtils;
1011
import org.elasticsearch.Build;
1112
import org.elasticsearch.TransportVersions;
@@ -342,12 +343,29 @@ private void createAndRolloverDataStream(String dataStreamName, int numRollovers
342343
for (int i = 0; i < numRollovers; i++) {
343344
String oldIndexName = rollover(dataStreamName);
344345
if (randomBoolean()) {
345-
closeIndex(oldIndexName);
346+
closeOrFreezeIndex(oldIndexName);
346347
}
347348
bulkLoadData(dataStreamName);
348349
}
349350
}
350351

352+
// Randomly either closes or freezes the index. If the cluster does not support the _freeze API, then this always closes the index.
353+
private void closeOrFreezeIndex(String indexName) throws IOException {
354+
boolean canFreeze = minimumTransportVersion().before(TransportVersions.V_8_0_0);
355+
if (canFreeze && randomBoolean()) {
356+
final Request freezeRequest = new Request(HttpPost.METHOD_NAME, "/" + indexName + "/_freeze");
357+
freezeRequest.setOptions(
358+
expectWarnings(
359+
"Frozen indices are deprecated because they provide no benefit given improvements in heap memory utilization. "
360+
+ "They will be removed in a future release."
361+
)
362+
);
363+
assertOK(client().performRequest(freezeRequest));
364+
} else {
365+
closeIndex(indexName);
366+
}
367+
}
368+
351369
private static void createIlmPolicy() throws IOException {
352370
String ilmPolicy = """
353371
{

0 commit comments

Comments
 (0)