Skip to content

Commit ae0d7bd

Browse files
committed
Rework the TaskQueue implementation as well as implementing a close state.
Motivation: The TaskQueue does not implement closeability, since now such queue can hold many suspended virtual thread tasks, we should provide a way to deal those threads (interrupt) when the context holding the queue is closed, e.g. undeploying a verticle or closing a vertx instance. Changes: Implement a TaskQueue close method that returns the list of current thread being suspended. The context holding the queue can close the queue when the context close future is destroyed.
1 parent 76a9f52 commit ae0d7bd

File tree

8 files changed

+572
-164
lines changed

8 files changed

+572
-164
lines changed

src/main/java/io/vertx/core/Future.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.CompletionStage;
25+
import java.util.concurrent.CountDownLatch;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.function.Function;
2728
import java.util.function.Supplier;
@@ -668,13 +669,14 @@ static <T> Future<T> fromCompletionStage(CompletionStage<T> completionStage, Con
668669
*/
669670
static <T> T await(Future<T> future) {
670671
io.vertx.core.impl.WorkerExecutor executor = io.vertx.core.impl.WorkerExecutor.unwrapWorkerExecutor();
671-
io.vertx.core.impl.WorkerExecutor.TaskController cont = executor.current();
672-
future.onComplete(ar -> cont.resume());
673-
try {
674-
cont.suspendAndAwaitResume();
675-
} catch (InterruptedException e) {
676-
Utils.throwAsUnchecked(e);
677-
return null;
672+
CountDownLatch latch = executor.suspend(cont -> future.onComplete(ar -> cont.resume()));
673+
if (latch != null) {
674+
try {
675+
latch.await();
676+
} catch (InterruptedException e) {
677+
Utils.throwAsUnchecked(e);
678+
return null;
679+
}
678680
}
679681
if (future.succeeded()) {
680682
return future.result();

src/main/java/io/vertx/core/impl/TaskQueue.java

Lines changed: 186 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
import io.vertx.core.impl.logging.Logger;
1515
import io.vertx.core.impl.logging.LoggerFactory;
1616

17-
import java.util.LinkedList;
17+
import java.util.*;
1818
import java.util.concurrent.CountDownLatch;
1919
import java.util.concurrent.Executor;
2020
import java.util.concurrent.RejectedExecutionException;
21-
import java.util.concurrent.TimeUnit;
21+
import java.util.function.Consumer;
2222

2323
/**
2424
* A task queue that always run all tasks in order. The executor to run the tasks is passed
@@ -38,6 +38,8 @@ public class TaskQueue {
3838

3939
// @protectedby tasks
4040
private final LinkedList<Task> tasks = new LinkedList<>();
41+
private final Set<ContinuationTask> suspendedTasks = new HashSet<>();
42+
private boolean closed;
4143
private Executor currentExecutor;
4244
private Thread currentThread;
4345

@@ -51,13 +53,16 @@ private void run() {
5153
for (; ; ) {
5254
final ExecuteTask execute;
5355
synchronized (tasks) {
56+
if (closed) {
57+
return;
58+
}
5459
Task task = tasks.poll();
5560
if (task == null) {
5661
currentExecutor = null;
5762
return;
5863
}
59-
if (task instanceof ResumeTask) {
60-
ResumeTask resume = (ResumeTask) task;
64+
if (task instanceof ContinuationTask) {
65+
ContinuationTask resume = (ContinuationTask) task;
6166
currentExecutor = resume.executor;
6267
currentThread = resume.thread;
6368
resume.latch.run();
@@ -83,12 +88,18 @@ private void run() {
8388
}
8489

8590
/**
86-
* Return a controller for the current task.
91+
* A task of this queue.
92+
*/
93+
private interface Task {
94+
}
95+
96+
/**
97+
* Return a continuation task for the current task execution.
8798
*
8899
* @return the controller
89100
* @throws IllegalStateException if the current thread is not currently being executed by the queue
90101
*/
91-
public WorkerExecutor.TaskController current() {
102+
private ContinuationTask continuationTask() {
92103
Thread thread;
93104
Executor executor;
94105
synchronized (tasks) {
@@ -98,51 +109,19 @@ public WorkerExecutor.TaskController current() {
98109
thread = currentThread;
99110
executor = currentExecutor;
100111
}
101-
return new WorkerExecutor.TaskController() {
102-
103-
final CountDownLatch latch = new CountDownLatch(1);
104-
105-
@Override
106-
public void resume(Runnable callback) {
107-
Runnable task = () -> {
108-
callback.run();
109-
latch.countDown();
110-
};
111-
synchronized (tasks) {
112-
if (currentExecutor != null) {
113-
tasks.addFirst(new ResumeTask(task, executor, thread));
114-
return;
115-
}
116-
currentExecutor = executor;
117-
currentThread = thread;
118-
}
119-
task.run();
120-
}
121-
122-
@Override
123-
public CountDownLatch suspend() {
124-
if (Thread.currentThread() != thread) {
125-
throw new IllegalStateException();
126-
}
127-
synchronized (tasks) {
128-
if (currentThread == null || currentThread != Thread.currentThread()) {
129-
throw new IllegalStateException();
130-
}
131-
currentThread = null;
132-
}
133-
executor.execute(runner);
134-
return latch;
135-
}
136-
};
112+
return new ContinuationTask(thread, executor);
137113
}
138114

139115
/**
140116
* Run a task.
141117
*
142118
* @param task the task to run.
143119
*/
144-
public void execute(Runnable task, Executor executor) {
120+
public void execute(Runnable task, Executor executor) throws RejectedExecutionException {
145121
synchronized (tasks) {
122+
if (closed) {
123+
throw new RejectedExecutionException("Closed");
124+
}
146125
if (currentExecutor == null) {
147126
currentExecutor = executor;
148127
try {
@@ -168,9 +147,171 @@ public boolean isEmpty() {
168147
}
169148

170149
/**
171-
* A task of this queue.
150+
* Structure holding the queue state at close time.
172151
*/
173-
private interface Task {
152+
public final static class CloseResult {
153+
154+
private final Thread activeThread;
155+
private final List<Runnable> pendingTasks;
156+
private final List<Thread> suspendedThreads;
157+
158+
private CloseResult(Thread activeThread, List<Thread> suspendedThreads, List<Runnable> pendingTasks) {
159+
this.activeThread = activeThread;
160+
this.suspendedThreads = suspendedThreads;
161+
this.pendingTasks = pendingTasks;
162+
}
163+
164+
/**
165+
* @return the thread that was active
166+
*/
167+
public Thread activeThread() {
168+
return activeThread;
169+
}
170+
171+
/**
172+
* @return the list of pending tasks
173+
*/
174+
public List<Runnable> pendingTasks() {
175+
return pendingTasks;
176+
}
177+
178+
/**
179+
* @return the list of suspended threads
180+
*/
181+
public List<Thread> suspendedThreads() {
182+
return suspendedThreads;
183+
}
184+
}
185+
186+
/**
187+
* Close the queue.
188+
*
189+
* @return a structure of suspended threads and pending tasks
190+
*/
191+
public CloseResult close() {
192+
List<Runnable> pendingTasks = Collections.emptyList();
193+
List<Thread> suspendedThreads;
194+
Thread currentThread;
195+
synchronized (tasks) {
196+
if (closed) {
197+
throw new IllegalStateException("Already closed");
198+
}
199+
suspendedThreads = new ArrayList<>(suspendedTasks.size() + 1);
200+
for (Task t : tasks) {
201+
if (t instanceof ExecuteTask) {
202+
if (pendingTasks.isEmpty()) {
203+
pendingTasks = new LinkedList<>();
204+
}
205+
pendingTasks.add(((ExecuteTask)t).runnable);
206+
} else if (t instanceof ContinuationTask) {
207+
ContinuationTask rt = (ContinuationTask) t;
208+
suspendedThreads.add(rt.thread);
209+
}
210+
}
211+
tasks.clear();
212+
for (ContinuationTask task : suspendedTasks) {
213+
suspendedThreads.add(task.thread);
214+
}
215+
suspendedTasks.clear();
216+
currentThread = this.currentThread;
217+
currentExecutor = null;
218+
closed = true;
219+
}
220+
return new CloseResult(currentThread, suspendedThreads, pendingTasks);
221+
}
222+
223+
private class ContinuationTask extends CountDownLatch implements WorkerExecutor.Continuation, Task {
224+
225+
private static final int ST_CREATED = 0, ST_SUSPENDED = 1, ST_RESUMED = 2;
226+
227+
private final Thread thread;
228+
private final Executor executor;
229+
private int status;
230+
private Runnable latch;
231+
232+
public ContinuationTask(Thread thread, Executor executor) {
233+
super(1);
234+
this.thread = thread;
235+
this.executor = executor;
236+
this.status = ST_CREATED;
237+
}
238+
239+
@Override
240+
public void resume(Runnable callback) {
241+
synchronized (tasks) {
242+
if (closed) {
243+
return;
244+
}
245+
switch (status) {
246+
case ST_SUSPENDED:
247+
boolean removed = suspendedTasks.remove(this);
248+
assert removed;
249+
latch = () -> {
250+
callback.run();
251+
countDown();
252+
};
253+
if (currentExecutor != null) {
254+
tasks.addFirst(this);
255+
return;
256+
}
257+
currentExecutor = executor;
258+
currentThread = thread;
259+
break;
260+
case ST_CREATED:
261+
// The current task still owns the queue
262+
assert currentExecutor == executor;
263+
assert currentThread == thread;
264+
latch = callback;
265+
break;
266+
default:
267+
throw new IllegalStateException();
268+
}
269+
status = ST_RESUMED;
270+
}
271+
latch.run();
272+
}
273+
274+
public boolean suspend() {
275+
if (Thread.currentThread() != thread) {
276+
throw new IllegalStateException();
277+
}
278+
synchronized (tasks) {
279+
if (closed) {
280+
return false;
281+
}
282+
if (currentThread == null || currentThread != thread) {
283+
throw new IllegalStateException();
284+
}
285+
switch (status) {
286+
case ST_RESUMED:
287+
countDown();
288+
return false;
289+
case ST_SUSPENDED:
290+
throw new IllegalStateException();
291+
}
292+
status = ST_SUSPENDED;
293+
boolean added = suspendedTasks.add(this);
294+
assert added;
295+
currentThread = null;
296+
}
297+
executor.execute(runner);
298+
return true;
299+
}
300+
}
301+
302+
public CountDownLatch suspend() {
303+
return suspend(cont -> {});
304+
}
305+
306+
public CountDownLatch suspend(Consumer<WorkerExecutor.Continuation> abc) {
307+
ContinuationTask continuationTask = continuationTask();
308+
abc.accept(continuationTask);
309+
if (continuationTask.suspend()) {
310+
return continuationTask;
311+
} else {
312+
// Closed
313+
return null;
314+
}
174315
}
175316

176317
/**
@@ -184,18 +325,4 @@ public ExecuteTask(Runnable runnable, Executor exec) {
184325
this.exec = exec;
185326
}
186327
}
187-
188-
/**
189-
* Resume an existing task blocked on a thread
190-
*/
191-
private static class ResumeTask implements Task {
192-
private final Runnable latch;
193-
private final Executor executor;
194-
private final Thread thread;
195-
ResumeTask(Runnable latch, Executor executor, Thread thread) {
196-
this.latch = latch;
197-
this.executor = executor;
198-
this.thread = thread;
199-
}
200-
}
201328
}

0 commit comments

Comments
 (0)