Skip to content

Commit e837ff7

Browse files
authored
Extract ThrottledTaskRunner (#93436)
Generalizes `PrioritizedThrottledTaskRunner` slightly: - The throttling behaviour is also useful for tasks which do not complete synchronously. The new `ThrottledTaskRunner` passes a `Releasable` to each task, which until released will prevent spawning further tasks. - The only part that needs the tasks to be `Comparable<>` is the queue. Letting the caller specify the queue means that we can also use the throttling without the prioritisation.
1 parent acece61 commit e837ff7

File tree

6 files changed

+483
-120
lines changed

6 files changed

+483
-120
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.common.util.concurrent;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.core.Releasable;
15+
import org.elasticsearch.core.Releasables;
16+
import org.elasticsearch.core.Strings;
17+
18+
import java.util.Queue;
19+
import java.util.concurrent.Executor;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
/**
23+
* {@link AbstractThrottledTaskRunner} runs the enqueued tasks using the given executor, limiting the number of tasks that are submitted to
24+
* the executor at once.
25+
*/
26+
public class AbstractThrottledTaskRunner<T extends ActionListener<Releasable>> {
27+
private static final Logger logger = LogManager.getLogger(AbstractThrottledTaskRunner.class);
28+
29+
private final String taskRunnerName;
30+
// The max number of tasks that this runner will schedule to concurrently run on the executor.
31+
private final int maxRunningTasks;
32+
// As we fork off dequeued tasks to the given executor, technically the following counter represents
33+
// the number of the concurrent pollAndSpawn calls currently checking the queue for a task to run. This
34+
// doesn't necessarily correspond to currently running tasks, since a pollAndSpawn could return without
35+
// actually running a task when the queue is empty.
36+
private final AtomicInteger runningTasks = new AtomicInteger();
37+
private final Queue<T> tasks;
38+
private final Executor executor;
39+
40+
public AbstractThrottledTaskRunner(final String name, final int maxRunningTasks, final Executor executor, final Queue<T> taskQueue) {
41+
assert maxRunningTasks > 0;
42+
this.taskRunnerName = name;
43+
this.maxRunningTasks = maxRunningTasks;
44+
this.executor = executor;
45+
this.tasks = taskQueue;
46+
}
47+
48+
/**
49+
* Submits a task for execution. If there are fewer than {@code maxRunningTasks} tasks currently running then this task is immediately
50+
* submitted to the executor. Otherwise this task is enqueued and will be submitted to the executor in turn on completion of some other
51+
* task.
52+
*
53+
* Tasks are executed via their {@link ActionListener#onResponse} method, receiving a {@link Releasable} which must be closed on
54+
* completion of the task. Task which are rejected from their executor are notified via their {@link ActionListener#onFailure} method.
55+
* Neither of these methods may themselves throw exceptions.
56+
*/
57+
public void enqueueTask(final T task) {
58+
logger.trace("[{}] enqueuing task {}", taskRunnerName, task);
59+
tasks.add(task);
60+
// Try to run a task since now there is at least one in the queue. If the maxRunningTasks is
61+
// reached, the task is just enqueued.
62+
pollAndSpawn();
63+
}
64+
65+
/**
66+
* Allows certain tasks to force their execution, bypassing the queue-length limit on the executor. See also {@link
67+
* AbstractRunnable#isForceExecution()}.
68+
*/
69+
protected boolean isForceExecution(@SuppressWarnings("unused") /* TODO test this */ T task) {
70+
return false;
71+
}
72+
73+
private void pollAndSpawn() {
74+
// A pollAndSpawn attempts to run a new task. There could be many concurrent pollAndSpawn calls competing
75+
// to get a "free slot", since we attempt to run a new task on every enqueueTask call and every time an
76+
// existing task is finished.
77+
while (incrementRunningTasks()) {
78+
T task = tasks.poll();
79+
if (task == null) {
80+
logger.trace("[{}] task queue is empty", taskRunnerName);
81+
// We have taken up a "free slot", but there are no tasks in the queue! This could happen each time a worker
82+
// sees an empty queue after running a task. Decrement to give competing pollAndSpawn calls a chance!
83+
int decremented = runningTasks.decrementAndGet();
84+
assert decremented >= 0;
85+
// We might have blocked all competing pollAndSpawn calls. This could happen for example when
86+
// maxRunningTasks=1 and a task got enqueued just after checking the queue but before decrementing.
87+
// To be sure, return only if the queue is still empty. If the queue is not empty, this might be the
88+
// only pollAndSpawn call in progress, and returning without peeking would risk ending up with a
89+
// non-empty queue and no workers!
90+
if (tasks.peek() == null) break;
91+
} else {
92+
final boolean isForceExecution = isForceExecution(task);
93+
executor.execute(new AbstractRunnable() {
94+
private boolean rejected; // need not be volatile - if we're rejected then that happens-before calling onAfter
95+
96+
private final Releasable releasable = Releasables.releaseOnce(() -> {
97+
// To avoid missing to run tasks that are enqueued and waiting, we check the queue again once running
98+
// a task is finished.
99+
int decremented = runningTasks.decrementAndGet();
100+
assert decremented >= 0;
101+
102+
if (rejected == false) {
103+
pollAndSpawn();
104+
}
105+
});
106+
107+
@Override
108+
public boolean isForceExecution() {
109+
return isForceExecution;
110+
}
111+
112+
@Override
113+
public void onRejection(Exception e) {
114+
logger.trace("[{}] task {} rejected", taskRunnerName, task);
115+
rejected = true;
116+
try {
117+
task.onFailure(e);
118+
} finally {
119+
releasable.close();
120+
}
121+
}
122+
123+
@Override
124+
public void onFailure(Exception e) {
125+
// should not happen
126+
logger.error(() -> Strings.format("[%s] task %s failed", taskRunnerName, task), e);
127+
assert false : e;
128+
task.onFailure(e);
129+
}
130+
131+
@Override
132+
protected void doRun() {
133+
logger.trace("[{}] running task {}", taskRunnerName, task);
134+
task.onResponse(releasable);
135+
}
136+
137+
@Override
138+
public String toString() {
139+
return task.toString();
140+
}
141+
});
142+
}
143+
}
144+
}
145+
146+
// Each worker thread that runs a task, first needs to get a "free slot" in order to respect maxRunningTasks.
147+
private boolean incrementRunningTasks() {
148+
int preUpdateValue = runningTasks.getAndAccumulate(maxRunningTasks, (v, maxRunning) -> v < maxRunning ? v + 1 : v);
149+
assert preUpdateValue <= maxRunningTasks;
150+
return preUpdateValue < maxRunningTasks;
151+
}
152+
153+
// exposed for testing
154+
int runningTasks() {
155+
return runningTasks.get();
156+
}
157+
158+
}

server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedThrottledTaskRunner.java

Lines changed: 52 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -8,134 +8,82 @@
88

99
package org.elasticsearch.common.util.concurrent;
1010

11-
import org.apache.logging.log4j.LogManager;
12-
import org.apache.logging.log4j.Logger;
13-
import org.elasticsearch.core.Strings;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.core.Releasable;
1413

15-
import java.util.concurrent.BlockingQueue;
1614
import java.util.concurrent.Executor;
1715
import java.util.concurrent.PriorityBlockingQueue;
18-
import java.util.concurrent.atomic.AtomicInteger;
1916

2017
/**
2118
* {@link PrioritizedThrottledTaskRunner} performs the enqueued tasks in the order dictated by the
2219
* natural ordering of the tasks, limiting the max number of concurrently running tasks. Each new task
2320
* that is dequeued to be run, is forked off to the given executor.
2421
*/
2522
public class PrioritizedThrottledTaskRunner<T extends AbstractRunnable & Comparable<T>> {
26-
private static final Logger logger = LogManager.getLogger(PrioritizedThrottledTaskRunner.class);
27-
28-
private final String taskRunnerName;
29-
// The max number of tasks that this runner will schedule to concurrently run on the executor.
30-
private final int maxRunningTasks;
31-
// As we fork off dequeued tasks to the given executor, technically the following counter represents
32-
// the number of the concurrent pollAndSpawn calls currently checking the queue for a task to run. This
33-
// doesn't necessarily correspond to currently running tasks, since a pollAndSpawn could return without
34-
// actually running a task when the queue is empty.
35-
private final AtomicInteger runningTasks = new AtomicInteger();
36-
private final BlockingQueue<T> tasks = new PriorityBlockingQueue<>();
37-
private final Executor executor;
3823

39-
public PrioritizedThrottledTaskRunner(final String name, final int maxRunningTasks, final Executor executor) {
40-
assert maxRunningTasks > 0;
41-
this.taskRunnerName = name;
42-
this.maxRunningTasks = maxRunningTasks;
43-
this.executor = executor;
44-
}
24+
private final AbstractThrottledTaskRunner<TaskWrapper<T>> runner;
25+
private final PriorityBlockingQueue<TaskWrapper<T>> queue;
4526

46-
public void enqueueTask(final T task) {
47-
logger.trace("[{}] enqueuing task {}", taskRunnerName, task);
48-
tasks.add(task);
49-
// Try to run a task since now there is at least one in the queue. If the maxRunningTasks is
50-
// reached, the task is just enqueued.
51-
pollAndSpawn();
52-
}
27+
private static class TaskWrapper<T extends AbstractRunnable & Comparable<T>>
28+
implements
29+
ActionListener<Releasable>,
30+
Comparable<TaskWrapper<T>> {
31+
32+
private final T task;
33+
34+
TaskWrapper(T task) {
35+
this.task = task;
36+
}
37+
38+
@Override
39+
public int compareTo(TaskWrapper<T> o) {
40+
return task.compareTo(o.task);
41+
}
5342

54-
private void pollAndSpawn() {
55-
// A pollAndSpawn attempts to run a new task. There could be many concurrent pollAndSpawn calls competing
56-
// to get a "free slot", since we attempt to run a new task on every enqueueTask call and every time an
57-
// existing task is finished.
58-
while (incrementRunningTasks()) {
59-
T task = tasks.poll();
60-
if (task == null) {
61-
logger.trace("[{}] task queue is empty", taskRunnerName);
62-
// We have taken up a "free slot", but there are no tasks in the queue! This could happen each time a worker
63-
// sees an empty queue after running a task. Decrement to give competing pollAndSpawn calls a chance!
64-
int decremented = runningTasks.decrementAndGet();
65-
assert decremented >= 0;
66-
// We might have blocked all competing pollAndSpawn calls. This could happen for example when
67-
// maxRunningTasks=1 and a task got enqueued just after checking the queue but before decrementing.
68-
// To be sure, return only if the queue is still empty. If the queue is not empty, this might be the
69-
// only pollAndSpawn call in progress, and returning without peeking would risk ending up with a
70-
// non-empty queue and no workers!
71-
if (tasks.peek() == null) break;
72-
} else {
73-
executor.execute(new AbstractRunnable() {
74-
private boolean rejected; // need not be volatile - if we're rejected then that happens-before calling onAfter
75-
76-
@Override
77-
public boolean isForceExecution() {
78-
return task.isForceExecution();
79-
}
80-
81-
@Override
82-
public void onRejection(Exception e) {
83-
logger.trace("[{}] task {} rejected", taskRunnerName, task);
84-
rejected = true;
85-
task.onRejection(e);
86-
}
87-
88-
@Override
89-
public void onFailure(Exception e) {
90-
logger.trace(() -> Strings.format("[%s] task %s failed", taskRunnerName, task), e);
91-
task.onFailure(e);
92-
}
93-
94-
@Override
95-
protected void doRun() throws Exception {
96-
logger.trace("[{}] running task {}", taskRunnerName, task);
97-
task.doRun();
98-
}
99-
100-
@Override
101-
public void onAfter() {
102-
try {
103-
task.onAfter();
104-
} finally {
105-
// To avoid missing to run tasks that are enqueued and waiting, we check the queue again once running
106-
// a task is finished.
107-
int decremented = runningTasks.decrementAndGet();
108-
assert decremented >= 0;
109-
110-
if (rejected == false) {
111-
pollAndSpawn();
112-
}
113-
}
114-
}
115-
116-
@Override
117-
public String toString() {
118-
return task.toString();
119-
}
120-
});
43+
@Override
44+
public String toString() {
45+
return task.toString();
46+
}
47+
48+
@Override
49+
public void onResponse(Releasable releasable) {
50+
try (releasable) {
51+
task.run();
52+
}
53+
}
54+
55+
@Override
56+
public void onFailure(Exception e) {
57+
assert e instanceof EsRejectedExecutionException : e;
58+
try {
59+
task.onRejection(e);
60+
} finally {
61+
task.onAfter();
12162
}
12263
}
12364
}
12465

125-
// Each worker thread that runs a task, first needs to get a "free slot" in order to respect maxRunningTasks.
126-
private boolean incrementRunningTasks() {
127-
int preUpdateValue = runningTasks.getAndAccumulate(maxRunningTasks, (v, maxRunning) -> v < maxRunning ? v + 1 : v);
128-
assert preUpdateValue <= maxRunningTasks;
129-
return preUpdateValue < maxRunningTasks;
66+
public PrioritizedThrottledTaskRunner(final String name, final int maxRunningTasks, final Executor executor) {
67+
this.queue = new PriorityBlockingQueue<>();
68+
this.runner = new AbstractThrottledTaskRunner<>(name, maxRunningTasks, executor, queue);
69+
}
70+
71+
/**
72+
* Submits a task for execution. If there are fewer than {@code maxRunningTasks} tasks currently running then this task is immediately
73+
* submitted to the executor. Otherwise this task is enqueued and will be submitted to the executor in turn on completion of some other
74+
* task.
75+
*/
76+
public void enqueueTask(final T task) {
77+
runner.enqueueTask(new TaskWrapper<>(task));
13078
}
13179

13280
// Only use for testing
13381
public int runningTasks() {
134-
return runningTasks.get();
82+
return runner.runningTasks();
13583
}
13684

13785
// Only use for testing
13886
public int queueSize() {
139-
return tasks.size();
87+
return queue.size();
14088
}
14189
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.common.util.concurrent;
10+
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.core.Releasable;
13+
14+
import java.util.concurrent.Executor;
15+
16+
public class ThrottledTaskRunner extends AbstractThrottledTaskRunner<ActionListener<Releasable>> {
17+
// a simple AbstractThrottledTaskRunner which fixes the task type and uses a regular FIFO blocking queue.
18+
public ThrottledTaskRunner(String name, int maxRunningTasks, Executor executor) {
19+
super(name, maxRunningTasks, executor, ConcurrentCollections.newBlockingQueue());
20+
}
21+
}

0 commit comments

Comments
 (0)