Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/123712.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 123712
summary: Process ILM cluster state updates on another thread
area: ILM+SLM
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.scheduler.SchedulerEngine;
import org.elasticsearch.common.scheduler.TimeValueSchedule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
Expand Down Expand Up @@ -60,6 +61,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -90,7 +93,12 @@ public class IndexLifecycleService
private final IndexLifecycleRunner lifecycleRunner;
private final Settings settings;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final LongSupplier nowSupplier;
private final ExecutorService managementExecutor;
/** A reference to the last seen cluster state. If it's not null, we're currently processing a cluster state. */
private final AtomicReference<ClusterState> lastSeenState = new AtomicReference<>();

private SchedulerEngine.Job scheduledJob;

@SuppressWarnings("this-escape")
Expand All @@ -108,12 +116,14 @@ public IndexLifecycleService(
super();
this.settings = settings;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.clock = clock;
this.nowSupplier = nowSupplier;
this.scheduledJob = null;
this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client, licenseState);
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, ilmHistoryStore, clusterService, threadPool, nowSupplier);
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
this.managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
clusterService.addStateApplier(this);
clusterService.addListener(this);
clusterService.getClusterSettings()
Expand Down Expand Up @@ -332,17 +342,76 @@ public void clusterChanged(ClusterChangedEvent event) {
// ClusterChangedEvent.indicesDeleted uses an equality check to skip computation if necessary.
final List<Index> indicesDeleted = event.indicesDeleted();
if (indicesDeleted.isEmpty() == false) {
clusterService.getClusterApplierService().threadPool().executor(ThreadPool.Names.MANAGEMENT).execute(() -> {
managementExecutor.execute(() -> {
for (Index index : indicesDeleted) {
policyRegistry.delete(index);
}
});
}

triggerPolicies(event.state(), true);
// Only start processing the new cluster state if we're not already processing one.
// Note that we might override the last seen state with a new one, even if the previous one hasn't been processed yet.
// This means that when ILM's cluster state processing takes longer than the overall cluster state application or when
// the forked thread is waiting in the thread pool queue (e.g. when the master node is swamped), we might skip some
// cluster state updates. Since ILM does not depend on "deltas" in cluster states, we can skip some cluster states just fine.
if (lastSeenState.getAndSet(event.state()) == null) {
processClusterState();
} else {
logger.trace("ILM state processor still running, not starting new thread");
}
}
}

/**
* Instead of processing cluster state updates on the cluster state applier thread, we fork to a different thread where
* ILM's runtime of processing the cluster state update does not affect the speed at which the cluster can apply new cluster states.
* That does not mean we don't need to optimize ILM's cluster state processing, as the overall amount of processing is generally
* unaffected by this fork approach (unless we skip some cluster states), but it does mean we're saving a significant amount
* of processing on the critical cluster state applier thread.
*/
private void processClusterState() {
managementExecutor.execute(new AbstractRunnable() {

private final SetOnce<ClusterState> currentState = new SetOnce<>();

@Override
protected void doRun() throws Exception {
final ClusterState currentState = lastSeenState.get();
// This should never be null, but we're checking anyway to be sure.
if (currentState == null) {
assert false : "Expected current state to non-null when processing cluster state in ILM";
return;
}
this.currentState.set(currentState);
triggerPolicies(currentState, true);
}

@Override
public void onFailure(Exception e) {
logger.warn("ILM failed to process cluster state", e);
}

@Override
public void onAfter() {
// If the last seen state is unchanged, we set it to null to indicate that processing has finished and we return.
if (lastSeenState.compareAndSet(currentState.get(), null)) {
return;
}
// If the last seen cluster state changed while this thread was running, it means a new cluster state came in and we need to
// process it. We do that by kicking off a new thread, which will pick up the new cluster state when the thread gets
// executed.
processClusterState();
}

@Override
public boolean isForceExecution() {
// Without force execution, we risk ILM state processing being postponed arbitrarily long, which in turn could cause
// thundering herd issues if there's significant time between ILM runs.
return true;
}
});
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
// only act if we are master, otherwise keep idle until elected
Expand Down
Loading