Skip to content

Commit 3eeffa3

Browse files
authored
Merge pull request #5344 from eclipse-vertx/interrupt-suspended-virtual-threads-4.x
Closing a context should interrupt pending virtual thread tasks of this context
2 parents 76a9f52 + 21cfc78 commit 3eeffa3

22 files changed

+938
-223
lines changed

src/main/java/io/vertx/core/Future.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.CompletionStage;
25+
import java.util.concurrent.CountDownLatch;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.function.Function;
2728
import java.util.function.Supplier;
@@ -668,13 +669,17 @@ static <T> Future<T> fromCompletionStage(CompletionStage<T> completionStage, Con
668669
*/
669670
static <T> T await(Future<T> future) {
670671
io.vertx.core.impl.WorkerExecutor executor = io.vertx.core.impl.WorkerExecutor.unwrapWorkerExecutor();
671-
io.vertx.core.impl.WorkerExecutor.TaskController cont = executor.current();
672-
future.onComplete(ar -> cont.resume());
673-
try {
674-
cont.suspendAndAwaitResume();
675-
} catch (InterruptedException e) {
676-
Utils.throwAsUnchecked(e);
677-
return null;
672+
if (executor == null) {
673+
throw new IllegalStateException();
674+
}
675+
CountDownLatch latch = executor.suspend(cont -> future.onComplete(ar -> cont.resume()));
676+
if (latch != null) {
677+
try {
678+
latch.await();
679+
} catch (InterruptedException e) {
680+
Utils.throwAsUnchecked(e);
681+
return null;
682+
}
678683
}
679684
if (future.succeeded()) {
680685
return future.result();

src/main/java/io/vertx/core/impl/CloseFuture.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public CloseFuture(Logger log) {
5151
*
5252
* @param hook the hook to add
5353
*/
54-
public synchronized void add(Closeable hook) {
54+
public synchronized boolean add(Closeable hook) {
5555
if (closed) {
56-
throw new IllegalStateException();
56+
return false;
5757
}
5858
if (hook instanceof CloseFuture) {
5959
// Close future might be closed independently, so we optimize and remove the hooks when
@@ -72,6 +72,7 @@ public synchronized void add(Closeable hook) {
7272
}
7373
hooks.put(hook, this);
7474
}
75+
return true;
7576
}
7677

7778
/**

src/main/java/io/vertx/core/impl/ContextImpl.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ static <T> void setResultHandler(ContextInternal ctx, Future<T> fut, Handler<Asy
5656
final TaskQueue internalOrderedTasks;
5757
final WorkerPool internalWorkerPool;
5858
final WorkerPool workerPool;
59-
final TaskQueue orderedTasks;
59+
final WorkerTaskQueue orderedTasks;
6060

6161
public ContextImpl(VertxInternal vertx,
6262
int localsLength,
@@ -65,7 +65,7 @@ public ContextImpl(VertxInternal vertx,
6565
EventExecutor executor,
6666
WorkerPool internalWorkerPool,
6767
WorkerPool workerPool,
68-
TaskQueue orderedTasks,
68+
WorkerTaskQueue orderedTasks,
6969
Deployment deployment,
7070
CloseFuture closeFuture,
7171
ClassLoader tccl) {
@@ -84,6 +84,14 @@ public ContextImpl(VertxInternal vertx,
8484
this.internalOrderedTasks = new TaskQueue();
8585
}
8686

87+
public Future<Void> close() {
88+
if (closeFuture == owner.closeFuture()) {
89+
return Future.future(p -> orderedTasks.shutdown(eventLoop, p));
90+
} else {
91+
return closeFuture.close().eventually(() -> Future.<Void>future(p -> orderedTasks.shutdown(eventLoop, p)));
92+
}
93+
}
94+
8795
public Deployment getDeployment() {
8896
return deployment;
8997
}
@@ -201,29 +209,29 @@ private static <T> Future<T> internalExecuteBlocking(ContextInternal context, Ha
201209
Object queueMetric = metrics != null ? metrics.submitted() : null;
202210
Promise<T> promise = context.promise();
203211
Future<T> fut = promise.future();
204-
try {
205-
Runnable command = () -> {
206-
Object execMetric = null;
207-
if (metrics != null) {
208-
execMetric = metrics.begin(queueMetric);
209-
}
212+
WorkerTask task = new WorkerTask(metrics, queueMetric) {
213+
@Override
214+
protected void execute() {
210215
context.dispatch(promise, blockingCodeHandler);
216+
}
217+
@Override
218+
void reject() {
211219
if (metrics != null) {
212-
metrics.end(execMetric, fut.succeeded());
220+
metrics.rejected(queueMetric);
213221
}
214-
};
222+
promise.fail(new RejectedExecutionException());
223+
}
224+
};
225+
try {
215226
Executor exec = workerPool.executor();
216227
if (queue != null) {
217-
queue.execute(command, exec);
228+
queue.execute(task, exec);
218229
} else {
219-
exec.execute(command);
230+
exec.execute(task);
220231
}
221232
} catch (RejectedExecutionException e) {
222233
// Pool is already shut down
223-
if (metrics != null) {
224-
metrics.rejected(queueMetric);
225-
}
226-
throw e;
234+
task.reject();
227235
}
228236
return fut;
229237
}

src/main/java/io/vertx/core/impl/ContextInternal.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,13 @@ default <E> void dispatch(E event, Handler<E> handler) {
275275
}
276276
}
277277

278+
/**
279+
* Close this context, cleanup close future hooks then dispose pending ordered task queue.
280+
*
281+
* @return a future signalling close completion
282+
*/
283+
Future<Void> close();
284+
278285
/**
279286
* Begin the execution of a task on this context.
280287
* <p>

src/main/java/io/vertx/core/impl/DeploymentManager.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ private Future<Deployment> doDeploy(String identifier,
200200
context = vertx.createVirtualThreadContext(deployment, closeFuture, tccl);
201201
break;
202202
}
203-
VerticleHolder holder = new VerticleHolder(verticle, context, closeFuture);
203+
VerticleHolder holder = new VerticleHolder(verticle, context);
204204
deployment.addVerticle(holder);
205205
context.runOnContext(v -> {
206206
try {
@@ -241,16 +241,14 @@ static class VerticleHolder {
241241

242242
final Verticle verticle;
243243
final ContextImpl context;
244-
final CloseFuture closeFuture;
245244

246-
VerticleHolder(Verticle verticle, ContextImpl context, CloseFuture closeFuture) {
245+
VerticleHolder(Verticle verticle, ContextImpl context) {
247246
this.verticle = verticle;
248247
this.context = context;
249-
this.closeFuture = closeFuture;
250248
}
251249

252250
void close(Handler<AsyncResult<Void>> completionHandler) {
253-
closeFuture.close().onComplete(completionHandler);
251+
context.close().onComplete(completionHandler);
254252
}
255253
}
256254

src/main/java/io/vertx/core/impl/DuplicatedContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,4 +193,9 @@ public ContextInternal unwrap() {
193193
public boolean isDuplicate() {
194194
return true;
195195
}
196+
197+
@Override
198+
public Future<Void> close() {
199+
return Future.succeededFuture();
200+
}
196201
}

0 commit comments

Comments
 (0)