diff --git a/docs/changelog/123712.yaml b/docs/changelog/123712.yaml new file mode 100644 index 0000000000000..b3ae3c5a8fda4 --- /dev/null +++ b/docs/changelog/123712.yaml @@ -0,0 +1,5 @@ +pr: 123712 +summary: Process ILM cluster state updates on another thread +area: ILM+SLM +type: enhancement +issues: [] diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java index f2a407bfd0ff1..5b5f692674723 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.scheduler.SchedulerEngine; import org.elasticsearch.common.scheduler.TimeValueSchedule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; @@ -60,6 +61,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -90,7 +93,12 @@ public class IndexLifecycleService private final IndexLifecycleRunner lifecycleRunner; private final Settings settings; private final ClusterService clusterService; + private final ThreadPool threadPool; private final LongSupplier nowSupplier; + private final ExecutorService managementExecutor; + /** A reference to the last seen cluster state. If it's not null, we're currently processing a cluster state. */ + private final AtomicReference lastSeenState = new AtomicReference<>(); + private SchedulerEngine.Job scheduledJob; @SuppressWarnings("this-escape") @@ -108,12 +116,14 @@ public IndexLifecycleService( super(); this.settings = settings; this.clusterService = clusterService; + this.threadPool = threadPool; this.clock = clock; this.nowSupplier = nowSupplier; this.scheduledJob = null; this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client, licenseState); this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, ilmHistoryStore, clusterService, threadPool, nowSupplier); this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings); + this.managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT); clusterService.addStateApplier(this); clusterService.addListener(this); clusterService.getClusterSettings() @@ -332,17 +342,76 @@ public void clusterChanged(ClusterChangedEvent event) { // ClusterChangedEvent.indicesDeleted uses an equality check to skip computation if necessary. final List indicesDeleted = event.indicesDeleted(); if (indicesDeleted.isEmpty() == false) { - clusterService.getClusterApplierService().threadPool().executor(ThreadPool.Names.MANAGEMENT).execute(() -> { + managementExecutor.execute(() -> { for (Index index : indicesDeleted) { policyRegistry.delete(index); } }); } - triggerPolicies(event.state(), true); + // Only start processing the new cluster state if we're not already processing one. + // Note that we might override the last seen state with a new one, even if the previous one hasn't been processed yet. + // This means that when ILM's cluster state processing takes longer than the overall cluster state application or when + // the forked thread is waiting in the thread pool queue (e.g. when the master node is swamped), we might skip some + // cluster state updates. Since ILM does not depend on "deltas" in cluster states, we can skip some cluster states just fine. + if (lastSeenState.getAndSet(event.state()) == null) { + processClusterState(); + } else { + logger.trace("ILM state processor still running, not starting new thread"); + } } } + /** + * Instead of processing cluster state updates on the cluster state applier thread, we fork to a different thread where + * ILM's runtime of processing the cluster state update does not affect the speed at which the cluster can apply new cluster states. + * That does not mean we don't need to optimize ILM's cluster state processing, as the overall amount of processing is generally + * unaffected by this fork approach (unless we skip some cluster states), but it does mean we're saving a significant amount + * of processing on the critical cluster state applier thread. + */ + private void processClusterState() { + managementExecutor.execute(new AbstractRunnable() { + + private final SetOnce currentState = new SetOnce<>(); + + @Override + protected void doRun() throws Exception { + final ClusterState currentState = lastSeenState.get(); + // This should never be null, but we're checking anyway to be sure. + if (currentState == null) { + assert false : "Expected current state to non-null when processing cluster state in ILM"; + return; + } + this.currentState.set(currentState); + triggerPolicies(currentState, true); + } + + @Override + public void onFailure(Exception e) { + logger.warn("ILM failed to process cluster state", e); + } + + @Override + public void onAfter() { + // If the last seen state is unchanged, we set it to null to indicate that processing has finished and we return. + if (lastSeenState.compareAndSet(currentState.get(), null)) { + return; + } + // If the last seen cluster state changed while this thread was running, it means a new cluster state came in and we need to + // process it. We do that by kicking off a new thread, which will pick up the new cluster state when the thread gets + // executed. + processClusterState(); + } + + @Override + public boolean isForceExecution() { + // Without force execution, we risk ILM state processing being postponed arbitrarily long, which in turn could cause + // thundering herd issues if there's significant time between ILM runs. + return true; + } + }); + } + @Override public void applyClusterState(ClusterChangedEvent event) { // only act if we are master, otherwise keep idle until elected