Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public boolean isValidChange(OperationMode nextMode) {
},

/**
* this represents a state where only sensitive actions (like {@link ShrinkStep}) will be executed
* this represents a state where only sensitive actions (like {@link ResizeIndexStep}) will be executed
* until they finish, at which point the operation mode will move to <code>STOPPED</code>.
*/
STOPPING {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.ilm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* Resizes an index with the specified settings, using the name that was generated in a previous {@link GenerateUniqueIndexNameStep} step.
*/
public class ResizeIndexStep extends AsyncActionStep {

public static final String SHRINK = "shrink";
public static final String CLONE = "clone";
private static final Logger logger = LogManager.getLogger(ResizeIndexStep.class);

private final ResizeType resizeType;
private final BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier;
/** A supplier that takes the index metadata of the <i>original</i> index and returns settings for the target index . */
private final Function<IndexMetadata, Settings> targetIndexSettingsSupplier;
@Nullable
private final ByteSizeValue maxPrimaryShardSize;

public ResizeIndexStep(
StepKey key,
StepKey nextStepKey,
Client client,
ResizeType resizeType,
BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier,
Function<IndexMetadata, Settings> targetIndexSettingsSupplier,
@Nullable ByteSizeValue maxPrimaryShardSize
) {
super(key, nextStepKey, client);
this.resizeType = resizeType;
this.targetIndexNameSupplier = targetIndexNameSupplier;
this.targetIndexSettingsSupplier = targetIndexSettingsSupplier;
this.maxPrimaryShardSize = maxPrimaryShardSize;
assert resizeType == ResizeType.SHRINK || maxPrimaryShardSize == null : "maxPrimaryShardSize can only be set for shrink operations";
}

@Override
public boolean isRetryable() {
return true;
}

@Override
public void performAction(
IndexMetadata indexMetadata,
ProjectState currentState,
ClusterStateObserver observer,
ActionListener<Void> listener
) {
LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState();
if (lifecycleState.lifecycleDate() == null) {
throw new IllegalStateException("source index [" + indexMetadata.getIndex().getName() + "] is missing lifecycle date");
}

final String targetIndexName = targetIndexNameSupplier.apply(indexMetadata.getIndex().getName(), lifecycleState);
if (currentState.metadata().index(targetIndexName) != null) {
logger.warn(
"skipping [{}] step for index [{}] as part of policy [{}] as the target index [{}] already exists",
getKey().name(),
indexMetadata.getIndex().getName(),
indexMetadata.getLifecyclePolicyName(),
targetIndexName
);
listener.onResponse(null);
return;
}

Settings relevantTargetSettings = Settings.builder()
.put(targetIndexSettingsSupplier.apply(indexMetadata))
// We add the skip setting to prevent ILM from processing the shrunken index before the execution state has been copied - which
// could happen if the shards of the shrunken index take a long time to allocate.
.put(LifecycleSettings.LIFECYCLE_SKIP, true)
.build();

ResizeRequest resizeRequest = new ResizeRequest(targetIndexName, indexMetadata.getIndex().getName()).masterNodeTimeout(
TimeValue.MAX_VALUE
);
resizeRequest.setResizeType(resizeType);
resizeRequest.getTargetIndexRequest().settings(relevantTargetSettings);
if (resizeType == ResizeType.SHRINK) {
resizeRequest.setMaxPrimaryShardSize(maxPrimaryShardSize);
}

// This request does not wait for (successful) completion of the resize operation - it fires-and-forgets.
// It's up to a subsequent step to check for the existence of the target index and wait for it to be green.
getClient(currentState.projectId()).admin()
.indices()
.resizeIndex(resizeRequest, listener.delegateFailureAndWrap((l, response) -> l.onResponse(null)));

}

public ResizeType getResizeType() {
return resizeType;
}

public BiFunction<String, LifecycleExecutionState, String> getTargetIndexNameSupplier() {
return targetIndexNameSupplier;
}

public Function<IndexMetadata, Settings> getTargetIndexSettingsSupplier() {
return targetIndexSettingsSupplier;
}

public ByteSizeValue getMaxPrimaryShardSize() {
return maxPrimaryShardSize;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), resizeType, maxPrimaryShardSize);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
ResizeIndexStep other = (ResizeIndexStep) obj;
return super.equals(obj)
&& Objects.equals(resizeType, other.resizeType)
&& Objects.equals(maxPrimaryShardSize, other.maxPrimaryShardSize);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.admin.indices.shrink.ResizeNumberOfShardsCalculator;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
Expand Down Expand Up @@ -173,7 +174,7 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
StepKey generateShrinkIndexNameKey = new StepKey(phase, NAME, GenerateUniqueIndexNameStep.NAME);
StepKey setSingleNodeKey = new StepKey(phase, NAME, SetSingleNodeAllocateStep.NAME);
StepKey allocationRoutedKey = new StepKey(phase, NAME, CheckShrinkReadyStep.NAME);
StepKey shrinkKey = new StepKey(phase, NAME, ShrinkStep.NAME);
StepKey shrinkKey = new StepKey(phase, NAME, ResizeIndexStep.SHRINK);
StepKey enoughShardsKey = new StepKey(phase, NAME, ShrunkShardsAllocatedStep.NAME);
StepKey copyMetadataKey = new StepKey(phase, NAME, CopyExecutionStateStep.NAME);
StepKey dataStreamCheckBranchingKey = new StepKey(phase, NAME, CONDITIONAL_DATASTREAM_CHECK_KEY);
Expand Down Expand Up @@ -267,7 +268,24 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
new CheckShrinkReadyStep(allocationRoutedKey, shrinkKey),
setSingleNodeKey
);
ShrinkStep shrink = new ShrinkStep(shrinkKey, enoughShardsKey, client, numberOfShards, maxPrimaryShardSize);
ResizeIndexStep shrink = new ResizeIndexStep(
shrinkKey,
enoughShardsKey,
client,
ResizeType.SHRINK,
ShrinkIndexNameSupplier::getShrinkIndexName,
indexMetadata -> {
Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, indexMetadata.getNumberOfReplicas())
// We need to remove the single node allocation so replicas can be allocated.
.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", (String) null);
if (numberOfShards != null) {
settingsBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards);
}
return settingsBuilder.build();
},
maxPrimaryShardSize
);
// wait until the shrunk index is recovered. we again wait until the configured threshold is breached and if the shrunk index has
// not successfully recovered until then, we rewind to the "cleanup-shrink-index" step to delete this unsuccessful shrunk index
// and retry the operation by generating a new shrink index name and attempting to shrink again
Expand Down

This file was deleted.

Loading