Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/123712.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 123712
summary: Process ILM cluster state updates on another thread
area: ILM+SLM
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.scheduler.SchedulerEngine;
import org.elasticsearch.common.scheduler.TimeValueSchedule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
Expand Down Expand Up @@ -60,6 +61,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -90,7 +93,12 @@ public class IndexLifecycleService
private final IndexLifecycleRunner lifecycleRunner;
private final Settings settings;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final LongSupplier nowSupplier;
private final ExecutorService managementExecutor;
/** A reference to the last seen cluster state. If it's not null, we're currently processing a cluster state. */
private final AtomicReference<ClusterState> lastSeenState = new AtomicReference<>();

private SchedulerEngine.Job scheduledJob;

@SuppressWarnings("this-escape")
Expand All @@ -108,12 +116,14 @@ public IndexLifecycleService(
super();
this.settings = settings;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.clock = clock;
this.nowSupplier = nowSupplier;
this.scheduledJob = null;
this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client, licenseState);
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, ilmHistoryStore, clusterService, threadPool, nowSupplier);
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
this.managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
clusterService.addStateApplier(this);
clusterService.addListener(this);
clusterService.getClusterSettings()
Expand Down Expand Up @@ -332,17 +342,75 @@ public void clusterChanged(ClusterChangedEvent event) {
// ClusterChangedEvent.indicesDeleted uses an equality check to skip computation if necessary.
final List<Index> indicesDeleted = event.indicesDeleted();
if (indicesDeleted.isEmpty() == false) {
clusterService.getClusterApplierService().threadPool().executor(ThreadPool.Names.MANAGEMENT).execute(() -> {
managementExecutor.execute(() -> {
for (Index index : indicesDeleted) {
policyRegistry.delete(index);
}
});
}

triggerPolicies(event.state(), true);
// Only start processing the new cluster state if we're not already processing one.
// Note that we might override the last seen state with a new one, even if the previous one hasn't been processed yet.
// This means that when ILM's cluster state processing takes longer than the overall cluster state application or when
// the forked thread is waiting in the thread pool queue (e.g. when the master node is swamped), we might skip some
// cluster state updates. Since ILM does not depend on "deltas" in cluster states, we can skip some cluster states just fine.
if (lastSeenState.getAndSet(event.state()) == null) {
processClusterState();
} else {
logger.trace("ILM state processor still running, not starting new thread");
}
}
}

/**
* Instead of processing cluster state updates on the cluster state applier thread, we fork to a different thread where
* ILM's runtime of processing the cluster state update does not affect the speed at which the cluster can apply new cluster states.
* That does not mean we don't need to optimize ILM's cluster state processing, as the overall amount of processing is generally
* unaffected by this fork approach (unless we skip some cluster states), but it does mean we're saving a significant amount
* of processing on the critical cluster state applier thread.
*/
private void processClusterState() {
managementExecutor.execute(new AbstractRunnable() {

private final SetOnce<ClusterState> currentState = new SetOnce<>();

@Override
protected void doRun() throws Exception {
final ClusterState currentState = lastSeenState.get();
// This should never be null, but we're checking anyway to be sure.
if (currentState == null) {
return;
}
this.currentState.set(currentState);
triggerPolicies(currentState, true);
}

@Override
public void onFailure(Exception e) {
logger.warn("ILM failed to process cluster state", e);
}

@Override
public void onAfter() {
// If the last seen state is unchanged, we set it to null to indicate that processing has finished and we return.
if (lastSeenState.compareAndSet(currentState.get(), null)) {
return;
}
// If the last seen cluster state changed while this thread was running, it means a new cluster state came in and we need to
// process it. We do that by kicking off a new thread, which will pick up the new cluster state when the thread gets
// executed.
processClusterState();
}

@Override
public boolean isForceExecution() {
// Without force execution, we risk ILM state processing being postponed arbitrarily long, which in turn could cause
// thundering herd issues if there's significant time between ILM runs.
return true;
}
});
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
// only act if we are master, otherwise keep idle until elected
Expand Down
Loading