Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public boolean isValidChange(OperationMode nextMode) {
},

/**
* this represents a state where only sensitive actions (like {@link ShrinkStep}) will be executed
* this represents a state where only sensitive actions (like {@link ResizeIndexStep}) will be executed
* until they finish, at which point the operation mode will move to <code>STOPPED</code>.
*/
STOPPING {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,46 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;

import java.util.Objects;

import static org.elasticsearch.xpack.core.ilm.ShrinkIndexNameSupplier.getShrinkIndexName;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* Shrinks an index, using a prefix prepended to the original index name for the name of the shrunken index.
* Resizes an index with the specified settings, using the name that was generated in a previous {@link GenerateUniqueIndexNameStep} step.
*/
public class ShrinkStep extends AsyncActionStep {
public static final String NAME = "shrink";
private static final Logger logger = LogManager.getLogger(ShrinkStep.class);
public class ResizeIndexStep extends AsyncActionStep {
private static final Logger logger = LogManager.getLogger(ResizeIndexStep.class);

private final Integer numberOfShards;
private final ResizeType resizeType;
private final BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier;
private final Function<IndexMetadata, Settings> targetIndexSettingsSupplier;
@Nullable
private final ByteSizeValue maxPrimaryShardSize;

public ShrinkStep(StepKey key, StepKey nextStepKey, Client client, Integer numberOfShards, ByteSizeValue maxPrimaryShardSize) {
public ResizeIndexStep(
StepKey key,
StepKey nextStepKey,
Client client,
ResizeType resizeType,
BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier,
Function<IndexMetadata, Settings> targetIndexSettingsSupplier,
@Nullable ByteSizeValue maxPrimaryShardSize
) {
super(key, nextStepKey, client);
this.numberOfShards = numberOfShards;
this.resizeType = resizeType;
this.targetIndexNameSupplier = targetIndexNameSupplier;
this.targetIndexSettingsSupplier = targetIndexSettingsSupplier;
this.maxPrimaryShardSize = maxPrimaryShardSize;
}

Expand All @@ -44,14 +58,6 @@ public boolean isRetryable() {
return true;
}

public Integer getNumberOfShards() {
return numberOfShards;
}

public ByteSizeValue getMaxPrimaryShardSize() {
return maxPrimaryShardSize;
}

@Override
public void performAction(
IndexMetadata indexMetadata,
Expand All @@ -64,52 +70,58 @@ public void performAction(
throw new IllegalStateException("source index [" + indexMetadata.getIndex().getName() + "] is missing lifecycle date");
}

String shrunkenIndexName = getShrinkIndexName(indexMetadata.getIndex().getName(), lifecycleState);
if (currentState.metadata().index(shrunkenIndexName) != null) {
final String targetIndexName = targetIndexNameSupplier.apply(indexMetadata.getIndex().getName(), lifecycleState);
if (currentState.metadata().index(targetIndexName) != null) {
logger.warn(
"skipping [{}] step for index [{}] as part of policy [{}] as the shrunk index [{}] already exists",
ShrinkStep.NAME,
"skipping [{}] step for index [{}] as part of policy [{}] as the target index [{}] already exists",
getKey().name(),
indexMetadata.getIndex().getName(),
indexMetadata.getLifecyclePolicyName(),
shrunkenIndexName
targetIndexName
);
listener.onResponse(null);
return;
}

String policyName = indexMetadata.getLifecyclePolicyName();

Settings.Builder builder = Settings.builder();
// need to remove the single shard, allocation so replicas can be allocated
builder.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, indexMetadata.getNumberOfReplicas())
.put(LifecycleSettings.LIFECYCLE_NAME, policyName)
Settings relevantTargetSettings = Settings.builder()
.put(targetIndexSettingsSupplier.apply(indexMetadata))
// We add the skip setting to prevent ILM from processing the shrunken index before the execution state has been copied - which
// could happen if the shards of the shrunken index take a long time to allocate.
.put(LifecycleSettings.LIFECYCLE_SKIP, true)
.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", (String) null);
if (numberOfShards != null) {
builder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards);
}
Settings relevantTargetSettings = builder.build();
.build();

ResizeRequest resizeRequest = new ResizeRequest(shrunkenIndexName, indexMetadata.getIndex().getName()).masterNodeTimeout(
ResizeRequest resizeRequest = new ResizeRequest(targetIndexName, indexMetadata.getIndex().getName()).masterNodeTimeout(
TimeValue.MAX_VALUE
);
resizeRequest.setResizeType(resizeType);
resizeRequest.setMaxPrimaryShardSize(maxPrimaryShardSize);
resizeRequest.getTargetIndexRequest().settings(relevantTargetSettings);

// Hard coding this to true as the resize request was executed and the corresponding cluster change was committed, so the
// eventual retry will not be able to succeed anymore (shrunk index was created already)
// The next step in the ShrinkAction will wait for the shrunk index to be created and for the shards to be allocated.
getClient(currentState.projectId()).admin()
.indices()
.resizeIndex(resizeRequest, listener.delegateFailureAndWrap((l, response) -> l.onResponse(null)));

}

public ResizeType getResizeType() {
return resizeType;
}

public BiFunction<String, LifecycleExecutionState, String> getTargetIndexNameSupplier() {
return targetIndexNameSupplier;
}

public Function<IndexMetadata, Settings> getTargetIndexSettingsSupplier() {
return targetIndexSettingsSupplier;
}

public ByteSizeValue getMaxPrimaryShardSize() {
return maxPrimaryShardSize;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), numberOfShards, maxPrimaryShardSize);
return Objects.hash(super.hashCode(), resizeType, maxPrimaryShardSize);
}

@Override
Expand All @@ -120,9 +132,9 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass()) {
return false;
}
ShrinkStep other = (ShrinkStep) obj;
ResizeIndexStep other = (ResizeIndexStep) obj;
return super.equals(obj)
&& Objects.equals(numberOfShards, other.numberOfShards)
&& Objects.equals(resizeType, other.resizeType)
&& Objects.equals(maxPrimaryShardSize, other.maxPrimaryShardSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.admin.indices.shrink.ResizeNumberOfShardsCalculator;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
Expand Down Expand Up @@ -46,6 +47,7 @@ public class ShrinkAction implements LifecycleAction {
public static final ParseField MAX_PRIMARY_SHARD_SIZE = new ParseField("max_primary_shard_size");
public static final ParseField ALLOW_WRITE_AFTER_SHRINK = new ParseField("allow_write_after_shrink");
public static final String CONDITIONAL_SKIP_SHRINK_STEP = BranchingStep.NAME + "-check-prerequisites";
public static final String SHRINK_STEP = "shrink";
public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";

private static final ConstructingObjectParser<ShrinkAction, Void> PARSER = new ConstructingObjectParser<>(
Expand Down Expand Up @@ -172,7 +174,7 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
StepKey generateShrinkIndexNameKey = new StepKey(phase, NAME, GenerateUniqueIndexNameStep.NAME);
StepKey setSingleNodeKey = new StepKey(phase, NAME, SetSingleNodeAllocateStep.NAME);
StepKey allocationRoutedKey = new StepKey(phase, NAME, CheckShrinkReadyStep.NAME);
StepKey shrinkKey = new StepKey(phase, NAME, ShrinkStep.NAME);
StepKey shrinkKey = new StepKey(phase, NAME, SHRINK_STEP);
StepKey enoughShardsKey = new StepKey(phase, NAME, ShrunkShardsAllocatedStep.NAME);
StepKey copyMetadataKey = new StepKey(phase, NAME, CopyExecutionStateStep.NAME);
StepKey dataStreamCheckBranchingKey = new StepKey(phase, NAME, CONDITIONAL_DATASTREAM_CHECK_KEY);
Expand Down Expand Up @@ -265,7 +267,24 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
new CheckShrinkReadyStep(allocationRoutedKey, shrinkKey),
setSingleNodeKey
);
ShrinkStep shrink = new ShrinkStep(shrinkKey, enoughShardsKey, client, numberOfShards, maxPrimaryShardSize);
ResizeIndexStep shrink = new ResizeIndexStep(
shrinkKey,
enoughShardsKey,
client,
ResizeType.SHRINK,
ShrinkIndexNameSupplier::getShrinkIndexName,
indexMetadata -> {
Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, indexMetadata.getNumberOfReplicas())
// We need to remove the single node allocation so replicas can be allocated.
.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", (String) null);
if (numberOfShards != null) {
settingsBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards);
}
return settingsBuilder.build();
},
maxPrimaryShardSize
);
// wait until the shrunk index is recovered. we again wait until the configured threshold is breached and if the shrunk index has
// not successfully recovered until then, we rewind to the "cleanup-shrink-index" step to delete this unsuccessful shrunk index
// and retry the operation by generating a new shrink index name and attempting to shrink again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -29,60 +30,72 @@
import static org.elasticsearch.xpack.core.ilm.ShrinkIndexNameSupplier.SHRUNKEN_INDEX_PREFIX;
import static org.hamcrest.Matchers.equalTo;

public class ShrinkStepTests extends AbstractStepTestCase<ShrinkStep> {
public class ResizeIndexStepTests extends AbstractStepTestCase<ResizeIndexStep> {

@Override
public ShrinkStep createRandomInstance() {
public ResizeIndexStep createRandomInstance() {
StepKey stepKey = randomStepKey();
StepKey nextStepKey = randomStepKey();
Integer numberOfShards = null;
ResizeType resizeType = randomFrom(ResizeType.values());
Settings.Builder settings = Settings.builder();
ByteSizeValue maxPrimaryShardSize = null;
if (randomBoolean()) {
numberOfShards = randomIntBetween(1, 20);
settings.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 20));
} else {
maxPrimaryShardSize = ByteSizeValue.ofBytes(between(1, 100));
}
return new ShrinkStep(stepKey, nextStepKey, client, numberOfShards, maxPrimaryShardSize);
return new ResizeIndexStep(
stepKey,
nextStepKey,
client,
resizeType,
(index, state) -> randomAlphaOfLength(5) + index,
indexMetadata -> settings.build(),
maxPrimaryShardSize
);
}

@Override
public ShrinkStep mutateInstance(ShrinkStep instance) {
public ResizeIndexStep mutateInstance(ResizeIndexStep instance) {
StepKey key = instance.getKey();
StepKey nextKey = instance.getNextStepKey();
Integer numberOfShards = instance.getNumberOfShards();
ResizeType resizeType = instance.getResizeType();
ByteSizeValue maxPrimaryShardSize = instance.getMaxPrimaryShardSize();

switch (between(0, 2)) {
case 0 -> key = new StepKey(key.phase(), key.action(), key.name() + randomAlphaOfLength(5));
case 1 -> nextKey = new StepKey(nextKey.phase(), nextKey.action(), nextKey.name() + randomAlphaOfLength(5));
case 2 -> {
if (numberOfShards != null) {
numberOfShards = numberOfShards + 1;
}
if (maxPrimaryShardSize != null) {
maxPrimaryShardSize = ByteSizeValue.ofBytes(maxPrimaryShardSize.getBytes() + 1);
}
}
case 2 -> resizeType = randomValueOtherThan(resizeType, () -> randomFrom(ResizeType.values()));
default -> throw new AssertionError("Illegal randomisation branch");
}

return new ShrinkStep(key, nextKey, instance.getClientWithoutProject(), numberOfShards, maxPrimaryShardSize);
return new ResizeIndexStep(
key,
nextKey,
instance.getClientWithoutProject(),
resizeType,
instance.getTargetIndexNameSupplier(),
instance.getTargetIndexSettingsSupplier(),
maxPrimaryShardSize
);
}

@Override
public ShrinkStep copyInstance(ShrinkStep instance) {
return new ShrinkStep(
public ResizeIndexStep copyInstance(ResizeIndexStep instance) {
return new ResizeIndexStep(
instance.getKey(),
instance.getNextStepKey(),
instance.getClientWithoutProject(),
instance.getNumberOfShards(),
instance.getResizeType(),
instance.getTargetIndexNameSupplier(),
instance.getTargetIndexSettingsSupplier(),
instance.getMaxPrimaryShardSize()
);
}

public void testPerformAction() throws Exception {
String lifecycleName = randomAlphaOfLength(5);
ShrinkStep step = createRandomInstance();
ResizeIndexStep step = createRandomInstance();
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
lifecycleState.setPhase(step.getKey().phase());
lifecycleState.setAction(step.getKey().action());
Expand All @@ -103,22 +116,12 @@ public void testPerformAction() throws Exception {
assertThat(request.getSourceIndex(), equalTo(sourceIndexMetadata.getIndex().getName()));
assertThat(request.getTargetIndexRequest().aliases(), equalTo(Set.of()));

Settings.Builder builder = Settings.builder();
builder.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, sourceIndexMetadata.getNumberOfReplicas())
.put(LifecycleSettings.LIFECYCLE_NAME, lifecycleName)
Settings expectedSettings = Settings.builder()
.put(step.getTargetIndexSettingsSupplier().apply(null))
.put(LifecycleSettings.LIFECYCLE_SKIP, true)
.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", (String) null);
if (step.getNumberOfShards() != null) {
builder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, step.getNumberOfShards());
}
assertThat(request.getTargetIndexRequest().settings(), equalTo(builder.build()));
if (step.getNumberOfShards() != null) {
assertThat(
request.getTargetIndexRequest().settings().getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, -1),
equalTo(step.getNumberOfShards())
);
}
request.setMaxPrimaryShardSize(step.getMaxPrimaryShardSize());
.build();
assertThat(request.getTargetIndexRequest().settings(), equalTo(expectedSettings));
assertThat(request.getMaxPrimaryShardSize(), equalTo(step.getMaxPrimaryShardSize()));
listener.onResponse(new CreateIndexResponse(true, true, sourceIndexMetadata.getIndex().getName()));
return null;
}).when(indicesClient).resizeIndex(Mockito.any(), Mockito.any());
Expand All @@ -136,7 +139,7 @@ public void testPerformAction() throws Exception {
public void testPerformActionShrunkenIndexExists() throws Exception {
String sourceIndexName = randomAlphaOfLength(10);
String lifecycleName = randomAlphaOfLength(5);
ShrinkStep step = createRandomInstance();
ResizeIndexStep step = createRandomInstance();
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
lifecycleState.setPhase(step.getKey().phase());
lifecycleState.setAction(step.getKey().action());
Expand Down Expand Up @@ -180,7 +183,7 @@ public void testPerformActionIsCompleteForUnAckedRequests() throws Exception {
.numberOfShards(randomIntBetween(1, 5))
.numberOfReplicas(randomIntBetween(0, 5))
.build();
ShrinkStep step = createRandomInstance();
ResizeIndexStep step = createRandomInstance();

Mockito.doAnswer(invocation -> {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -209,7 +212,7 @@ public void testPerformActionFailure() throws Exception {
.numberOfReplicas(randomIntBetween(0, 5))
.build();
Exception exception = new RuntimeException();
ShrinkStep step = createRandomInstance();
ResizeIndexStep step = createRandomInstance();

Mockito.doAnswer(invocation -> {
@SuppressWarnings("unchecked")
Expand Down
Loading