Skip to content

Conversation

@albertzaharovits
Copy link
Contributor

@albertzaharovits albertzaharovits commented Jan 23, 2025

Compared to #120293 , this scheduler implementation:

  • does not use any static variables
  • sorts merges across all schedulers (across all shards) by size (smaller ones first)

This design reorders merges every time a new one comes in, or an existing one (from any scheduler) finishes, synchronizing on the ThreadPoolMergeExecutor. This complication is mainly a consequence of supporting the "max merge thread" setting, per scheduler (per shard).

@albertzaharovits albertzaharovits force-pushed the threadpool-merge-scheduler-sort-all-merges branch from f4d9595 to 5554bc2 Compare January 24, 2025 10:12
Comment on lines +239 to +245
private void update(Runnable updater) {
threadPoolMergeExecutor.updateMergeScheduler(this, (ignored) -> {
synchronized (this) {
updater.run();
}
});
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any update to any merge scheduler first takes the mutex of the single ThreadPoolMergeExecutor instance, and then of the merge scheduler itself.

Comment on lines +84 to +107
public synchronized void updateMergeScheduler(
ThreadPoolMergeScheduler threadPoolMergeScheduler,
Consumer<ThreadPoolMergeScheduler> updater
) {
boolean removed = registeredMergeSchedulers.remove(threadPoolMergeScheduler);
if (false == removed) {
throw new IllegalStateException("Cannot update a merge scheduler that is not registered");
}
currentlyExecutingMergesCount -= threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size();
currentlyActiveIOThrottledMergesCount -= getIOThrottledMergeTasksCount(threadPoolMergeScheduler);
updater.accept(threadPoolMergeScheduler);
boolean added = registeredMergeSchedulers.add(threadPoolMergeScheduler);
if (false == added) {
throw new IllegalStateException("Found duplicate registered merge scheduler");
}
currentlyExecutingMergesCount += threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size();
currentlyActiveIOThrottledMergesCount += getIOThrottledMergeTasksCount(threadPoolMergeScheduler);
double newTargetMBPerSec = maybeUpdateTargetMBPerSec();
if (newTargetMBPerSec != targetMBPerSec) {
targetMBPerSec = newTargetMBPerSec;
threadPoolMergeScheduler.setIORateLimitForAllMergeTasks(newTargetMBPerSec);
}
maybeExecuteNextMerges();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An to a threadpool has the potential to reorder it in the sorted set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also has the potential to change the stats related to IO throttling, because that is done "globally" i.e. across all shards on the node.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a quick read, left some comments/notes.

);
result.put(
ThreadPool.Names.MERGE,
new ScalingExecutorBuilder(ThreadPool.Names.MERGE, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5), false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we throttle anyway, I'd be inclined to allow processors number of threads here.

MergeTask mergeTask2 = tpms2.peekMergeTaskToExecute();
if (mergeTask1 == null && mergeTask2 == null) {
// arbitrary order between schedulers that cannot run any merge right now
return System.identityHashCode(mergeTask1) - System.identityHashCode(mergeTask2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this always returns 0 and wonder if you wanted to compare the tpmses instead?

}
}

public synchronized void updateMergeScheduler(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some perhaps unqualified anxiety around this central mutex across the node. Probably not a real issue but noting it anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is an issue, but I don't see a better way in the current design.

double newTargetMBPerSec = maybeUpdateTargetMBPerSec();
if (newTargetMBPerSec != targetMBPerSec) {
targetMBPerSec = newTargetMBPerSec;
threadPoolMergeScheduler.setIORateLimitForAllMergeTasks(newTargetMBPerSec);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not set it for all merge schedulers or running tasks?

I wonder if we should keep a list of the running tasks in this class so we can easily set it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right it should be set for all merge schedulers, and, as you say, it would better be set for the running tasks only (and set it when they start).

@albertzaharovits albertzaharovits changed the title Threadpool merge scheduler sort all merges WIP Threadpool merge scheduler sort all merges Jan 26, 2025
@albertzaharovits
Copy link
Contributor Author

We've ultimately merged a different alternative: #120869

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants