Skip to content

Commit fcfc661

Browse files
committed
Redefine what a context closed means.
Motivation: Context#close implementation is currently too drastic and will refuse to execute any task in addition of interrupting threads. After a context is closed there is often the need to let task execution happen in order to cleanup state, e.g. pending HTTP tasks due to back-pressure catch up or timers. Changes: Context#close now allows to execute tasks.
1 parent a87afd2 commit fcfc661

File tree

5 files changed

+25
-63
lines changed

5 files changed

+25
-63
lines changed

src/main/java/io/vertx/core/impl/TaskQueue.java

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ private void run() {
5454
for (; ; ) {
5555
final ExecuteTask execute;
5656
synchronized (tasks) {
57-
if (closed) {
58-
return;
59-
}
6057
Task task = tasks.poll();
6158
if (task == null) {
6259
currentExecutor = null;
@@ -125,9 +122,6 @@ private ContinuationTask continuationTask() {
125122
*/
126123
public void execute(Runnable task, Executor executor) throws RejectedExecutionException {
127124
synchronized (tasks) {
128-
if (closed) {
129-
throw new RejectedExecutionException("Closed");
130-
}
131125
if (currentExecutor == null) {
132126
currentExecutor = executor;
133127
try {
@@ -161,18 +155,15 @@ public final static class CloseResult {
161155
private final Runnable activeTask;
162156
private final List<Runnable> suspendedTasks;
163157
private final List<Thread> suspendedThreads;
164-
private final List<Runnable> pendingTasks;
165158

166159
private CloseResult(Thread activeThread,
167160
Runnable activeTask,
168161
List<Thread> suspendedThreads,
169-
List<Runnable> suspendedTasks,
170-
List<Runnable> pendingTasks) {
162+
List<Runnable> suspendedTasks) {
171163
this.activeThread = activeThread;
172164
this.activeTask = activeTask;
173165
this.suspendedThreads = suspendedThreads;
174166
this.suspendedTasks = suspendedTasks;
175-
this.pendingTasks = pendingTasks;
176167
}
177168

178169
/**
@@ -186,13 +177,6 @@ public Runnable activeTask() {
186177
return activeTask;
187178
}
188179

189-
/**
190-
* @return the list of pending tasks
191-
*/
192-
public List<Runnable> pendingTasks() {
193-
return pendingTasks;
194-
}
195-
196180
/**
197181
* @return the list of suspended threads
198182
*/
@@ -214,7 +198,6 @@ public List<Runnable> suspendedTasks() {
214198
* @return a structure of suspended threads and pending tasks
215199
*/
216200
public CloseResult close() {
217-
List<Runnable> pendingTasks = Collections.emptyList();
218201
List<Thread> suspendedThreads;
219202
List<Runnable> suspendedTasks;
220203
Thread activeThread;
@@ -225,19 +208,16 @@ public CloseResult close() {
225208
}
226209
suspendedThreads = new ArrayList<>(continuations.size());
227210
suspendedTasks = new ArrayList<>(continuations.size());
228-
for (Task t : tasks) {
229-
if (t instanceof ExecuteTask) {
230-
if (pendingTasks.isEmpty()) {
231-
pendingTasks = new LinkedList<>();
232-
}
233-
pendingTasks.add(((ExecuteTask)t).runnable);
234-
} else if (t instanceof ContinuationTask) {
235-
ContinuationTask rt = (ContinuationTask) t;
236-
suspendedThreads.add(rt.thread);
237-
suspendedTasks.add(rt.task.runnable);
211+
Iterator<Task> it = tasks.iterator();
212+
while (it.hasNext()) {
213+
Task task = it.next();
214+
if (task instanceof ContinuationTask) {
215+
ContinuationTask continuationTask = (ContinuationTask) task;
216+
suspendedThreads.add(continuationTask.thread);
217+
suspendedTasks.add(continuationTask.task.runnable);
218+
it.remove();
238219
}
239220
}
240-
tasks.clear();
241221
for (ContinuationTask cont : continuations) {
242222
suspendedThreads.add(cont.thread);
243223
suspendedTasks.add(cont.task.runnable);
@@ -248,7 +228,7 @@ public CloseResult close() {
248228
currentExecutor = null;
249229
closed = true;
250230
}
251-
return new CloseResult(activeThread, activeTask, suspendedThreads, suspendedTasks, pendingTasks);
231+
return new CloseResult(activeThread, activeTask, suspendedThreads, suspendedTasks);
252232
}
253233

254234
private class ContinuationTask extends CountDownLatch implements WorkerExecutor.Continuation, Task {

src/main/java/io/vertx/core/impl/WorkerTaskQueue.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,6 @@ class WorkerTaskQueue extends TaskQueue {
3131
void shutdown(EventLoop executor, Promise<Void> completion) {
3232
TaskQueue.CloseResult closeResult = close();
3333

34-
// Reject all pending tasks
35-
List<Runnable> pendingTasks = closeResult.pendingTasks();
36-
for (Runnable pendingTask : pendingTasks) {
37-
WorkerTask pendingWorkerTask = (WorkerTask) pendingTask;
38-
pendingWorkerTask.reject();
39-
}
40-
4134
// Maintain context invariant: serialize task execution while interrupting tasks
4235
class InterruptSequence {
4336

src/test/java/io/vertx/core/ContextTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,15 +240,11 @@ public void testExecuteBlockingClose() {
240240
latch.await();
241241
return "";
242242
});
243-
Future<String> fut2 = ctx.executeBlocking(() -> "");
244243
assertWaitUntil(() -> thread.get() != null && thread.get().getState() == Thread.State.WAITING);
245244
ctx.close();
246245
assertWaitUntil(fut1::isComplete);
247246
assertTrue(fut1.failed());
248247
assertTrue(fut1.cause() instanceof InterruptedException);
249-
assertWaitUntil(fut2::isComplete);
250-
assertTrue(fut2.failed());
251-
assertTrue(fut2.cause() instanceof RejectedExecutionException);
252248
}
253249

254250
@Test

src/test/java/io/vertx/core/TaskQueueTest.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -202,20 +202,6 @@ public void shouldNotHaveTaskInQueueWhenTaskHasBeenRejected() {
202202
Assertions.assertThat(taskQueue.isEmpty()).isTrue();
203203
}
204204

205-
@Test
206-
public void testClosePendingTasks() {
207-
TaskQueue taskQueue = new TaskQueue();
208-
Deque<Runnable> pending = new ConcurrentLinkedDeque<>();
209-
Executor executor = pending::add;
210-
Runnable task = () -> {
211-
};
212-
taskQueue.execute(task, executor);
213-
assertEquals(1, pending.size());
214-
TaskQueue.CloseResult result = taskQueue.close();
215-
assertEquals(1, result.pendingTasks().size());
216-
assertSame(task, result.pendingTasks().get(0));
217-
}
218-
219205
@Test
220206
public void testCloseSuspendedTasks() {
221207
TaskQueue taskQueue = new TaskQueue();
@@ -325,14 +311,10 @@ public void testSubmitAfterClose() {
325311
taskQueue.close();
326312
Deque<Runnable> pending = new ConcurrentLinkedDeque<>();
327313
Executor exec = pending::add;
328-
try {
329-
taskQueue.execute(() -> {
314+
taskQueue.execute(() -> {
330315

331-
}, exec);
332-
fail();
333-
} catch (RejectedExecutionException expected) {
334-
}
335-
assertEquals(0, pending.size());
316+
}, exec);
317+
assertEquals(1, pending.size());
336318
}
337319

338320
@Test

src/test/java/io/vertx/core/VirtualThreadContextTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ public void testAwaitWhenClosed() throws Exception {
364364
assertFalse(Thread.currentThread().isInterrupted());
365365
}
366366
try {
367-
Promise.promise().future().await();
367+
Future.await(Promise.promise().future());
368368
fail();
369369
} catch (Exception e) {
370370
assertEquals(InterruptedException.class, e.getClass());
@@ -376,4 +376,15 @@ public void testAwaitWhenClosed() throws Exception {
376376
ctx.close();
377377
await();
378378
}
379+
380+
@Test
381+
public void testSubmitAfterClose() {
382+
Assume.assumeTrue(isVirtualThreadAvailable());
383+
ContextInternal ctx = vertx.createVirtualThreadContext();
384+
ctx.close();
385+
ctx.runOnContext(v -> {
386+
testComplete();
387+
});
388+
await();
389+
}
379390
}

0 commit comments

Comments
 (0)