diff --git a/docs/changelog/124149.yaml b/docs/changelog/124149.yaml new file mode 100644 index 0000000000000..07c2f396efda3 --- /dev/null +++ b/docs/changelog/124149.yaml @@ -0,0 +1,5 @@ +pr: 124149 +summary: Retry ILM async action after reindexing data stream +area: Data streams +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 5c7cb711fdda7..574cc7d224a04 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -183,6 +183,7 @@ static TransportVersion def(int id) { public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING = def(8_839_0_00); public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00); public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_18 = def(8_840_0_01); + public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18 = def(8_840_0_02); public static final TransportVersion INITIAL_ELASTICSEARCH_8_19 = def(8_841_0_00); public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_X = def(8_841_0_01); public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_19 = def(8_841_0_02); @@ -190,6 +191,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS_BACKPORT_8_19 = def(8_841_0_04); public static final TransportVersion VOYAGE_AI_INTEGRATION_ADDED_BACKPORT_8_X = def(8_841_0_05); public static final TransportVersion JINA_AI_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_19 = def(8_841_0_06); + public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19 = def(8_841_0_07); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/action/RetryActionRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/action/RetryActionRequest.java new file mode 100644 index 0000000000000..d7a6f46c532d6 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/action/RetryActionRequest.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ilm.action; + +import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; + +public class RetryActionRequest extends AcknowledgedRequest implements IndicesRequest.Replaceable { + private String[] indices; + private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen(); + private boolean requireError = true; + + public RetryActionRequest(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) { + super(masterNodeTimeout, ackTimeout); + this.indices = indices; + } + + public RetryActionRequest(StreamInput in) throws IOException { + super(in); + this.indices = in.readStringArray(); + this.indicesOptions = IndicesOptions.readIndicesOptions(in); + if (in.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19) + || in.getTransportVersion().isPatchFrom(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) { + this.requireError = in.readBoolean(); + } + } + + @Override + public RetryActionRequest indices(String... indices) { + this.indices = indices; + return this; + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + public RetryActionRequest indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + public void requireError(boolean requireError) { + this.requireError = requireError; + } + + public boolean requireError() { + return requireError; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + if (out.getTransportVersion().onOrAfter(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_19) + || out.getTransportVersion().isPatchFrom(TransportVersions.RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR_8_18)) { + out.writeBoolean(requireError); + } + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(indices), indicesOptions, requireError); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + RetryActionRequest other = (RetryActionRequest) obj; + return Objects.deepEquals(indices, other.indices) + && Objects.equals(indicesOptions, other.indicesOptions) + && requireError == other.requireError; + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java index 43b1782cbc59b..3e465cf27ab83 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java @@ -31,6 +31,7 @@ import org.elasticsearch.index.reindex.ReindexAction; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction; +import org.elasticsearch.xpack.core.ilm.action.ILMActions; import org.elasticsearch.xpack.core.security.authz.RoleDescriptor; import org.elasticsearch.xpack.core.security.support.MetadataUtils; @@ -224,7 +225,8 @@ public class InternalUsers { TransportBulkAction.NAME, TransportIndexAction.NAME, TransportSearchScrollAction.TYPE.name(), - ModifyDataStreamsAction.NAME + ModifyDataStreamsAction.NAME, + ILMActions.RETRY.name() ) .allowRestrictedIndices(false) .build() }, diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index 85739dcd0dcfb..5ec48aee9dee9 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -322,7 +322,11 @@ void maybeRunAsyncAction(ClusterState currentState, IndexMetadata indexMetadata, logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized", currentStepKey, index, policy); return; } - + if (expectedStepKey.phase() == null && expectedStepKey.name() == null && expectedStepKey.action() == null) { + // ILM is stopped, so do not try to run async action + logger.debug("expected step for index [{}] with policy [{}] is [{}], not running async action", index, policy, expectedStepKey); + return; + } logger.trace( "[{}] maybe running async action step ({}) with current step {}", index, diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/RestRetryAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/RestRetryAction.java index 1000bd1e68249..6aea4fbf85091 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/RestRetryAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/RestRetryAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.ilm.action.ILMActions; +import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest; import java.util.List; @@ -37,7 +38,7 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) { final var indices = Strings.splitStringByCommaToArray(restRequest.param("index")); - final var request = new TransportRetryAction.Request(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices); + final var request = new RetryActionRequest(getMasterNodeTimeout(restRequest), getAckTimeout(restRequest), indices); request.indices(indices); request.indicesOptions(IndicesOptions.fromRequest(restRequest, IndicesOptions.strictExpandOpen())); return channel -> client.execute(ILMActions.RETRY, request, new RestToXContentListener<>(channel)); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java index 0d3a51a613569..47c8f50717936 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java @@ -10,11 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; @@ -25,24 +21,18 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.SuppressForbidden; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.action.ILMActions; +import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest; import org.elasticsearch.xpack.ilm.IndexLifecycleService; -import java.io.IOException; -import java.util.Arrays; -import java.util.Objects; - -public class TransportRetryAction extends TransportMasterNodeAction { +public class TransportRetryAction extends TransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportRetryAction.class); @@ -62,7 +52,7 @@ public TransportRetryAction( clusterService, threadPool, actionFilters, - Request::new, + RetryActionRequest::new, AcknowledgedResponse::readFrom, EsExecutors.DIRECT_EXECUTOR_SERVICE ); @@ -70,7 +60,17 @@ public TransportRetryAction( } @Override - protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { + protected void masterOperation( + Task task, + RetryActionRequest request, + ClusterState state, + ActionListener listener + ) { + if (request.requireError() == false) { + maybeRunAsyncAction(state, request.indices()); + listener.onResponse(AcknowledgedResponse.TRUE); + return; + } submitUnbatchedTask("ilm-re-run", new AckedClusterStateUpdateTask(request, listener) { @Override public ClusterState execute(ClusterState currentState) { @@ -79,101 +79,33 @@ public ClusterState execute(ClusterState currentState) { @Override public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - for (String index : request.indices()) { - IndexMetadata idxMeta = newState.metadata().index(index); - LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState(); - StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step()); - if (idxMeta == null) { - // The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case. - logger.debug( - "index [" - + index - + "] has been deleted after moving to step [" - + lifecycleState.step() - + "], skipping async action check" - ); - return; - } - indexLifecycleService.maybeRunAsyncAction(newState, idxMeta, retryStep); - } + maybeRunAsyncAction(newState, request.indices()); } }); } + private void maybeRunAsyncAction(ClusterState state, String[] indices) { + for (String index : indices) { + IndexMetadata idxMeta = state.metadata().index(index); + if (idxMeta == null) { + // The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case. + logger.debug("index [" + index + "] has been deleted, skipping async action check"); + return; + } + LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState(); + StepKey retryStep = new StepKey(lifecycleState.phase(), lifecycleState.action(), lifecycleState.step()); + indexLifecycleService.maybeRunAsyncAction(state, idxMeta, retryStep); + } + } + @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) { clusterService.submitUnbatchedStateUpdateTask(source, task); } @Override - protected ClusterBlockException checkBlock(Request request, ClusterState state) { + protected ClusterBlockException checkBlock(RetryActionRequest request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } - public static class Request extends AcknowledgedRequest implements IndicesRequest.Replaceable { - private String[] indices; - private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen(); - - public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) { - super(masterNodeTimeout, ackTimeout); - this.indices = indices; - } - - public Request(StreamInput in) throws IOException { - super(in); - this.indices = in.readStringArray(); - this.indicesOptions = IndicesOptions.readIndicesOptions(in); - } - - @Override - public Request indices(String... indices) { - this.indices = indices; - return this; - } - - @Override - public String[] indices() { - return indices; - } - - @Override - public IndicesOptions indicesOptions() { - return indicesOptions; - } - - public Request indicesOptions(IndicesOptions indicesOptions) { - this.indicesOptions = indicesOptions; - return this; - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeStringArray(indices); - indicesOptions.writeIndicesOptions(out); - } - - @Override - public int hashCode() { - return Objects.hash(Arrays.hashCode(indices), indicesOptions); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (obj.getClass() != getClass()) { - return false; - } - Request other = (Request) obj; - return Objects.deepEquals(indices, other.indices) && Objects.equals(indicesOptions, other.indicesOptions); - } - - } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/RetryRequestTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/RetryRequestTests.java index 4f053ddc2caa4..67fcc781f1edd 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/RetryRequestTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/RetryRequestTests.java @@ -11,14 +11,15 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest; import java.util.Arrays; -public class RetryRequestTests extends AbstractWireSerializingTestCase { +public class RetryRequestTests extends AbstractWireSerializingTestCase { @Override - protected TransportRetryAction.Request createTestInstance() { - final var request = new TransportRetryAction.Request( + protected RetryActionRequest createTestInstance() { + final var request = new RetryActionRequest( TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, randomBoolean() ? Strings.EMPTY_ARRAY : generateRandomStringArray(20, 20, false) @@ -36,19 +37,23 @@ protected TransportRetryAction.Request createTestInstance() { ); request.indicesOptions(indicesOptions); } + if (randomBoolean()) { + request.requireError(randomBoolean()); + } return request; } @Override - protected Writeable.Reader instanceReader() { - return TransportRetryAction.Request::new; + protected Writeable.Reader instanceReader() { + return RetryActionRequest::new; } @Override - protected TransportRetryAction.Request mutateInstance(TransportRetryAction.Request instance) { + protected RetryActionRequest mutateInstance(RetryActionRequest instance) { String[] indices = instance.indices(); IndicesOptions indicesOptions = instance.indicesOptions(); - switch (between(0, 1)) { + boolean requireError = instance.requireError(); + switch (between(0, 2)) { case 0 -> indices = randomValueOtherThanMany( i -> Arrays.equals(i, instance.indices()), () -> generateRandomStringArray(20, 10, false, true) @@ -66,10 +71,12 @@ protected TransportRetryAction.Request mutateInstance(TransportRetryAction.Reque randomBoolean() ) ); + case 2 -> requireError = requireError == false; default -> throw new AssertionError("Illegal randomisation branch"); } - final var newRequest = new TransportRetryAction.Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, indices); + final var newRequest = new RetryActionRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, indices); newRequest.indicesOptions(indicesOptions); + newRequest.requireError(requireError); return newRequest; } } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index e410daea8910d..0cb4ecec83f0f 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -40,6 +40,8 @@ import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ilm.action.ILMActions; +import org.elasticsearch.xpack.core.ilm.action.RetryActionRequest; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexAction; import java.util.ArrayList; @@ -264,13 +266,34 @@ private void copySettings(String oldIndex, String newIndex, ActionListener { + maybeRunILMAsyncAction(newIndex, delegate2, parentTaskId); + }) + ); } else { delegate.onResponse(null); } })); } + /** + * If ILM runs an async action on the source index shortly before reindexing, the results of the async action + * may not yet be in the source index. For example, if a force merge has just been started by ILM, the reindex + * will see the un-force-merged index. But the ILM state will be copied to destination index saying that an + * async action was started, and so ILM won't force merge the destination index. To be sure that the async + * action is run on the destination index, we force a retry on async actions after adding the ILM policy + * to the destination index. + */ + private void maybeRunILMAsyncAction(String newIndex, ActionListener listener, TaskId parentTaskId) { + var retryActionRequest = new RetryActionRequest(TimeValue.MAX_VALUE, TimeValue.MAX_VALUE, newIndex); + retryActionRequest.setParentTask(parentTaskId); + retryActionRequest.requireError(false); + client.execute(ILMActions.RETRY, retryActionRequest, listener); + } + private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener listener) { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); deleteIndexRequest.setParentTask(parentTaskId); diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java index 8a05eff36481e..dd9fa8c082fa3 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java @@ -194,15 +194,29 @@ public void testUpgradeDataStream() throws Exception { String dataStreamName = "reindex_test_data_stream"; String dataStreamFromNonDataStreamIndices = "index_first_reindex_test_data_stream"; int numRollovers = randomIntBetween(0, 5); + boolean hasILMPolicy = minimumTransportVersion().before(TransportVersions.V_8_11_X) || randomBoolean(); + boolean ilmEnabled = hasILMPolicy && randomBoolean(); + + if (ilmEnabled) { + startILM(); + } else { + stopILM(); + } + if (CLUSTER_TYPE == ClusterType.OLD) { - createAndRolloverDataStream(dataStreamName, numRollovers); + createAndRolloverDataStream(dataStreamName, numRollovers, hasILMPolicy, ilmEnabled); createDataStreamFromNonDataStreamIndices(dataStreamFromNonDataStreamIndices); } else if (CLUSTER_TYPE == ClusterType.UPGRADED) { Map> oldIndicesMetadata = getIndicesMetadata(dataStreamName); - upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0); - upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 1, 0); + upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled); + upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 1, 0, ilmEnabled); Map> upgradedIndicesMetadata = getIndicesMetadata(dataStreamName); - compareIndexMetadata(oldIndicesMetadata, upgradedIndicesMetadata); + + if (ilmEnabled) { + checkILMPhase(dataStreamName, upgradedIndicesMetadata); + } else { + compareIndexMetadata(oldIndicesMetadata, upgradedIndicesMetadata); + } } } @@ -232,6 +246,28 @@ private void compareIndexMetadata( } } + @SuppressWarnings("unchecked") + private void checkILMPhase(String dataStreamName, Map> upgradedIndicesMetadata) throws Exception { + var writeIndex = getWriteIndexFromDataStreamIndexMetadata(upgradedIndicesMetadata); + assertBusy(() -> { + + Request request = new Request("GET", dataStreamName + "/_ilm/explain"); + Response response = client().performRequest(request); + Map responseMap = XContentHelper.convertToMap( + JsonXContent.jsonXContent, + response.getEntity().getContent(), + false + ); + Map indices = (Map) responseMap.get("indices"); + for (var index : indices.keySet()) { + if (index.equals(writeIndex) == false) { + Map ilmInfo = (Map) indices.get(index); + assertThat("Index has not moved to cold ILM phase", ilmInfo.get("phase"), equalTo("cold")); + } + } + }, 30, TimeUnit.SECONDS); + } + private String getWriteIndexFromDataStreamIndexMetadata(Map> indexMetadataForDataStream) { return indexMetadataForDataStream.entrySet() .stream() @@ -241,6 +277,25 @@ private String getWriteIndexFromDataStreamIndexMetadata(Map indexMetadata) { return Long.parseLong( @@ -274,9 +329,9 @@ private Map getIndexSettingsFromIndexMetadata(Map) ((Map) indexMetadata.get("settings")).get("index"); } - private void createAndRolloverDataStream(String dataStreamName, int numRollovers) throws IOException { - boolean useIlm = minimumTransportVersion().before(TransportVersions.V_8_11_X) || randomBoolean(); - if (useIlm) { + private void createAndRolloverDataStream(String dataStreamName, int numRollovers, boolean hasILMPolicy, boolean ilmEnabled) + throws IOException { + if (hasILMPolicy) { createIlmPolicy(); } // We want to create a data stream and roll it over several times so that we have several indices to upgrade @@ -306,8 +361,7 @@ private void createAndRolloverDataStream(String dataStreamName, int numRollovers "type": "date" }, "metricset": { - "type": "keyword", - "time_series_dimension": true + "type": "keyword" }, "k8s": { "properties": { @@ -334,7 +388,7 @@ private void createAndRolloverDataStream(String dataStreamName, int numRollovers } } """; - if (useIlm) { + if (hasILMPolicy) { template = template.replace("$ILM_SETTING", """ "lifecycle.name": "test-lifecycle-policy", """); @@ -360,7 +414,7 @@ private void createAndRolloverDataStream(String dataStreamName, int numRollovers bulkLoadData(dataStreamName); for (int i = 0; i < numRollovers; i++) { String oldIndexName = rollover(dataStreamName); - if (randomBoolean()) { + if (ilmEnabled == false && randomBoolean()) { closeOrFreezeIndex(oldIndexName); } bulkLoadData(dataStreamName); @@ -389,21 +443,18 @@ private static void createIlmPolicy() throws IOException { { "policy": { "phases": { - "hot": { + "warm": { + "min_age": "1s", "actions": { - "rollover": { - "max_primary_shard_size": "50kb" + "forcemerge": { + "max_num_segments": 1 } } }, - "warm": { - "min_age": "30d", + "cold": { "actions": { - "shrink": { - "number_of_shards": 1 - }, - "forcemerge": { - "max_num_segments": 1 + "set_priority" : { + "priority": 50 } } } @@ -557,13 +608,18 @@ private void createDataStreamFromNonDataStreamIndices(String dataStreamFromNonDa } @SuppressWarnings("unchecked") - private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster, int expectedSuccessesCount, int expectedErrorCount) - throws Exception { + private void upgradeDataStream( + String dataStreamName, + int numRolloversOnOldCluster, + int expectedSuccessesCount, + int expectedErrorCount, + boolean ilmEnabled + ) throws Exception { Set indicesNeedingUpgrade = getDataStreamIndices(dataStreamName); final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2); for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) { String oldIndexName = rollover(dataStreamName); - if (randomBoolean()) { + if (ilmEnabled == false && randomBoolean()) { closeIndex(oldIndexName); } }