Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand All @@ -29,12 +30,17 @@ public AsyncWaitStep(StepKey key, StepKey nextStepKey, Client client) {
this.client = client;
}

// For testing only
@Nullable
protected Client getClient() {
Client getClient() {
return client;
}

public abstract void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout);
protected Client getClient(ProjectId projectId) {
return client.projectClient(projectId);
}

public abstract void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout);

public interface Listener {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -56,42 +56,44 @@ public int getMaxNumSegments() {
}

@Override
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> {
IndexSegments idxSegments = response.getIndices().get(index.getName());
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
logger.info(
"[{}] retrieval of segment counts after force merge did not succeed, there were {} shard failures. failures: {}",
index.getName(),
response.getFailedShards(),
failures == null
? "n/a"
: Strings.collectionToDelimitedString(Arrays.stream(failures).map(Strings::toString).toList(), ",")
);
listener.onResponse(true, new Info(-1));
} else {
List<ShardSegments> unmergedShards = idxSegments.getShards()
.values()
.stream()
.flatMap(iss -> Arrays.stream(iss.shards()))
.filter(shardSegments -> shardSegments.getSegments().size() > maxNumSegments)
.toList();
if (unmergedShards.size() > 0) {
Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
getClient(state.projectId()).admin()
.indices()
.segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> {
IndexSegments idxSegments = response.getIndices().get(index.getName());
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
logger.info(
"[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
"[{}] retrieval of segment counts after force merge did not succeed, there were {} shard failures. failures: {}",
index.getName(),
maxNumSegments,
unmergedShards.size(),
unmergedShardCounts
response.getFailedShards(),
failures == null
? "n/a"
: Strings.collectionToDelimitedString(Arrays.stream(failures).map(Strings::toString).toList(), ",")
);
listener.onResponse(true, new Info(-1));
} else {
List<ShardSegments> unmergedShards = idxSegments.getShards()
.values()
.stream()
.flatMap(iss -> Arrays.stream(iss.shards()))
.filter(shardSegments -> shardSegments.getSegments().size() > maxNumSegments)
.toList();
if (unmergedShards.size() > 0) {
Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
logger.info(
"[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
index.getName(),
maxNumSegments,
unmergedShards.size(),
unmergedShardCounts
);
}
// Force merging is best effort, so always return true that the condition has been met.
listener.onResponse(true, new Info(unmergedShards.size()));
}
// Force merging is best effort, so always return true that the condition has been met.
listener.onResponse(true, new Info(unmergedShards.size()));
}
}, listener::onFailure));
}, listener::onFailure));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -39,8 +39,8 @@ public boolean isRetryable() {
}

@Override
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
IndexMetadata indexMetadata = metadata.getProject().index(index);
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
IndexMetadata indexMetadata = state.metadata().index(index);
Map<String, String> customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY);
if (customIndexMetadata == null) {
listener.onResponse(true, null);
Expand All @@ -49,7 +49,7 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,

FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
request.setIndices(new String[] { index.getName() });
getClient().execute(
getClient(state.projectId()).execute(
FollowStatsAction.INSTANCE,
request,
ActionListener.wrap(r -> handleResponse(r, listener), listener::onFailure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
Expand Down Expand Up @@ -52,26 +53,26 @@ public boolean isRetryable() {
}

@Override
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
XPackInfoRequest xPackInfoRequest = new XPackInfoRequest();
xPackInfoRequest.setCategories(EnumSet.of(XPackInfoRequest.Category.FEATURES));
getClient().execute(XPackInfoFeatureAction.CCR, xPackInfoRequest, ActionListener.wrap((xPackInfoResponse) -> {
getClient(state.projectId()).execute(XPackInfoFeatureAction.CCR, xPackInfoRequest, ActionListener.wrap((xPackInfoResponse) -> {
XPackInfoResponse.FeatureSetsInfo.FeatureSet featureSet = xPackInfoResponse.getInfo();
if (featureSet != null && featureSet.enabled() == false) {
listener.onResponse(true, null);
return;
}
leaderIndexCheck(metadata, index, listener, masterTimeout);
leaderIndexCheck(state.projectId(), index, listener);
}, listener::onFailure));
}

private void leaderIndexCheck(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
private void leaderIndexCheck(ProjectId projectId, Index index, Listener listener) {
IndicesStatsRequest request = new IndicesStatsRequest();
request.clear();
String indexName = index.getName();
request.indices(indexName);

getClient().admin().indices().stats(request, ActionListener.wrap((response) -> {
getClient(projectId).admin().indices().stats(request, ActionListener.wrap((response) -> {
IndexStats indexStats = response.getIndex(indexName);
if (indexStats == null) {
// Index was probably deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.support.IndexComponentSelector;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -80,8 +80,8 @@ public boolean isRetryable() {
}

@Override
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
IndexAbstraction indexAbstraction = metadata.getProject().getIndicesLookup().get(index.getName());
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(index.getName());
assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found";
final String rolloverTarget;
final boolean targetFailureStore;
Expand All @@ -95,14 +95,14 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
index.getName(),
targetFailureStore ? "failure store " : "",
dataStream.getName(),
metadata.getProject().index(index).getLifecyclePolicyName()
state.metadata().index(index).getLifecyclePolicyName()
);
listener.onResponse(true, EmptyInfo.INSTANCE);
return;
}
rolloverTarget = dataStream.getName();
} else {
IndexMetadata indexMetadata = metadata.getProject().index(index);
IndexMetadata indexMetadata = state.metadata().index(index);
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());

if (Strings.isNullOrEmpty(rolloverAlias)) {
Expand Down Expand Up @@ -200,15 +200,17 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,

// if we should only rollover if not empty, *and* if neither an explicit min_docs nor an explicit min_primary_shard_docs
// has been specified on this policy, then inject a default min_docs: 1 condition so that we do not rollover empty indices
boolean rolloverOnlyIfHasDocuments = LifecycleSettings.LIFECYCLE_ROLLOVER_ONLY_IF_HAS_DOCUMENTS_SETTING.get(metadata.settings());
boolean rolloverOnlyIfHasDocuments = LifecycleSettings.LIFECYCLE_ROLLOVER_ONLY_IF_HAS_DOCUMENTS_SETTING.get(
state.cluster().metadata().settings()
);
RolloverRequest rolloverRequest = createRolloverRequest(
rolloverTarget,
masterTimeout,
rolloverOnlyIfHasDocuments,
targetFailureStore
);

getClient().admin().indices().rolloverIndex(rolloverRequest, ActionListener.wrap(response -> {
getClient(state.projectId()).admin().indices().rolloverIndex(rolloverRequest, ActionListener.wrap(response -> {
final var conditionStatus = response.getConditionStatus();
final var conditionsMet = rolloverRequest.getConditions().areConditionsMet(conditionStatus);
if (conditionsMet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -54,8 +54,8 @@ public class WaitForSnapshotStep extends AsyncWaitStep {
}

@Override
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
IndexMetadata indexMetadata = metadata.getProject().index(index);
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
IndexMetadata indexMetadata = state.metadata().index(index);
if (indexMetadata == null) {
listener.onFailure(error(NO_INDEX_METADATA_MESSAGE, index.getName()));
return;
Expand All @@ -68,7 +68,7 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
return;
}

SnapshotLifecycleMetadata snapMeta = metadata.getProject().custom(SnapshotLifecycleMetadata.TYPE);
SnapshotLifecycleMetadata snapMeta = state.metadata().custom(SnapshotLifecycleMetadata.TYPE);
if (snapMeta == null || snapMeta.getSnapshotConfigurations().containsKey(policy) == false) {
listener.onFailure(error(POLICY_NOT_FOUND_MESSAGE, policy));
return;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
.snapshots(new String[] { snapshotName })
.includeIndexNames(true)
.verbose(false);
getClient().admin().cluster().getSnapshots(request, ActionListener.wrap(response -> {
getClient(state.projectId()).admin().cluster().getSnapshots(request, ActionListener.wrap(response -> {
if (response.getSnapshots().size() != 1) {
listener.onFailure(error(UNEXPECTED_SNAPSHOT_STATE_MESSAGE, repositoryName, snapshotName, response.getSnapshots().size()));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -66,12 +66,12 @@ public boolean equals(Object obj) {
}

@Override
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
IndexMetadata indexMetadata = metadata.getProject().index(index);
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
IndexMetadata indexMetadata = state.metadata().index(index);
assert indexMetadata != null
: "the index metadata for index [" + index.getName() + "] must exist in the cluster state for step [" + NAME + "]";

final LifecycleExecutionState executionState = metadata.getProject().index(index.getName()).getLifecycleExecutionState();
final LifecycleExecutionState executionState = state.metadata().index(index.getName()).getLifecycleExecutionState();
assert executionState != null
: "the lifecycle execution state for index [" + index.getName() + "] must exist in the cluster state for step [" + NAME + "]";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -43,8 +43,8 @@ public boolean isRetryable() {
}

@Override
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
IndexMetadata indexMetadata = metadata.getProject().index(index);
public void evaluateCondition(ProjectState state, Index index, Listener listener, TimeValue masterTimeout) {
IndexMetadata indexMetadata = state.metadata().index(index);
assert indexMetadata != null
: "the index metadata for index [" + index.getName() + "] must exist in the cluster state for step [" + NAME + "]";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void setupClient() {
adminClient = Mockito.mock(AdminClient.class);
indicesClient = Mockito.mock(IndicesAdminClient.class);

Mockito.when(client.projectClient(Mockito.any())).thenReturn(client);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
}
Expand Down
Loading