Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/123712.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 123712
summary: Process ILM cluster state updates on another thread
area: ILM+SLM
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -90,8 +91,11 @@ public class IndexLifecycleService
private final IndexLifecycleRunner lifecycleRunner;
private final Settings settings;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final LongSupplier nowSupplier;
private SchedulerEngine.Job scheduledJob;
/** A reference to the last seen cluster state. If it's not null, we're currently processing a cluster state. */
private final AtomicReference<ClusterState> lastSeenState = new AtomicReference<>();

@SuppressWarnings("this-escape")
public IndexLifecycleService(
Expand All @@ -108,6 +112,7 @@ public IndexLifecycleService(
super();
this.settings = settings;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.clock = clock;
this.nowSupplier = nowSupplier;
this.scheduledJob = null;
Expand Down Expand Up @@ -332,17 +337,51 @@ public void clusterChanged(ClusterChangedEvent event) {
// ClusterChangedEvent.indicesDeleted uses an equality check to skip computation if necessary.
final List<Index> indicesDeleted = event.indicesDeleted();
if (indicesDeleted.isEmpty() == false) {
clusterService.getClusterApplierService().threadPool().executor(ThreadPool.Names.MANAGEMENT).execute(() -> {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> {
for (Index index : indicesDeleted) {
policyRegistry.delete(index);
}
});
}

triggerPolicies(event.state(), true);
// Only start processing the new cluster state if we're not already processing one.
// Note that we might override the last seen state with a new one, even if the previous one hasn't been processed yet.
// This means that when ILM's cluster state processing takes longer than the overall cluster state application or when
// the forked thread is waiting in the thread pool queue (e.g. when the master node is swamped), we might skip some
// cluster state updates. Since ILM does not depend on "deltas" in cluster states, we can skip some cluster states just fine.
if (lastSeenState.getAndSet(event.state()) == null) {
processClusterState();
} else {
logger.trace("ILM state processor still running, not starting new thread");
}
}
}

/**
* Instead of processing cluster state updates on the cluster state applier thread, we fork to a different thread where
* ILM's runtime of processing the cluster state update does not affect the speed at which the cluster can apply new cluster states.
* That does not mean we don't need to optimize ILM's cluster state processing, as the overall amount of processing is generally
* unaffected by this fork approach (unless we skip some cluster states), but it does mean we're saving a significant amount
* of processing on the critical cluster state applier thread.
*/
private void processClusterState() {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> {
final ClusterState currentState = lastSeenState.get();
// This should never be null, but we're checking anyway to be sure.
if (currentState == null) {
return;
}
triggerPolicies(currentState, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this throws? I'm worried that it could leave lastSeenState set while processing is not actually happening, meaning that we'll never trigger another thread to do future processing. Maybe we should do something with either try-catch or try-finally here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, are we okay with the fact that exceptions won't get propagated back to the ClusterApplierService? It looks like that would just log and discard the exception anyway, so I guess it's okay. But maybe we should also explicitly do the equivalent log and discard behaviour here, rather than relying on whatever the executor is going to do with unhandled exceptions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ Let's make this an AbstractRunnable and also make it force execute. Otherwise if we run into a rejection we'll never do the work for a cluster state update and implicitly rely on the fact that there will be another update to get us out of the broken state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL about AbstractRunnable, thanks @original-brownbear !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, @PeteGillinElastic! And thanks both! I changed the runnable into an AbstractRunnable to make it force execute and to take care of failures.

// If the last seen state is unchanged, we set it to null to indicate that processing has finished and we return.
if (lastSeenState.compareAndSet(currentState, null)) {
return;
}
// If the last seen cluster state changed while this thread was running, it means a new cluster state came in and we need to
// process it. We do that by kicking off a new thread, which will pick up the new cluster state when the thread gets executed.
processClusterState();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of recursion here is making me slightly nervous. I was initially worried about stack overflow scenarios, but I think we're safe here because the recursive call will just schedule the work on another thread and return immediately, right? (Assuming we never call this with a direct executor, or at least never outside tests!) Then I started worrying about things like deadlock, but again it's probably safe since nothing is blocking, the execute call should return immediately (unless we've managed to fill the thread pool's work queue, in which case it'll throw some rejected task exception, and in that case the system is in bad trouble anyway). So maybe this is all okay?

An alternative would be to have a loop here instead of recursing. That would mean that we'd start the next update on the same thread immediately, rather than putting it to the back of the pool's work queue. I don't know whether that matters, it depends what other work the pool is used for. Given that you've already done a bunch of benchmarking with the current approach, maybe it's better to stick with that.

Sorry, I realize that I'm kind of thinking aloud in this comment! I figured it's the kind of thing that's worth working through carefully...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I understand what you're saying. I intentionally decided not to use a loop, as I wanted to avoid hoarding a thread in the management pool for potentially long times. I preferred requesting a new thread at the end of the queue (as we're ok with skipping some cluster state updates in ILM). I tried to avoid deadlocks etc. by doing things in the right order, but I get that the recursion can seem confusing and that it's more prone to bugs, so I'm open to other suggestions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we're ok with skipping some cluster state updates in ILM

Just for the record, I think it would be possible to do this skipping with a loop. I wasn't thinking of putting the updates in a queue, I was thinking of having a do-while or something which gets the current state at the end and loops if it doesn't match the state it just processed.

However, I think that this:

I intentionally decided not to use a loop, as I wanted to avoid hoarding a thread in the management pool for potentially long times

is a reasonable point. And I think I've convinced myself that your recursion approach is safe. So let's go with that.

});
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
// only act if we are master, otherwise keep idle until elected
Expand Down