Skip to content

Commit 21cfc78

Browse files
committed
Adapt to vertx 4 and improve the implementation.
1 parent ae0d7bd commit 21cfc78

20 files changed

+384
-77
lines changed

src/main/java/io/vertx/core/Future.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,9 @@ static <T> Future<T> fromCompletionStage(CompletionStage<T> completionStage, Con
669669
*/
670670
static <T> T await(Future<T> future) {
671671
io.vertx.core.impl.WorkerExecutor executor = io.vertx.core.impl.WorkerExecutor.unwrapWorkerExecutor();
672+
if (executor == null) {
673+
throw new IllegalStateException();
674+
}
672675
CountDownLatch latch = executor.suspend(cont -> future.onComplete(ar -> cont.resume()));
673676
if (latch != null) {
674677
try {

src/main/java/io/vertx/core/impl/CloseFuture.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public CloseFuture(Logger log) {
5151
*
5252
* @param hook the hook to add
5353
*/
54-
public synchronized void add(Closeable hook) {
54+
public synchronized boolean add(Closeable hook) {
5555
if (closed) {
56-
throw new IllegalStateException();
56+
return false;
5757
}
5858
if (hook instanceof CloseFuture) {
5959
// Close future might be closed independently, so we optimize and remove the hooks when
@@ -72,6 +72,7 @@ public synchronized void add(Closeable hook) {
7272
}
7373
hooks.put(hook, this);
7474
}
75+
return true;
7576
}
7677

7778
/**

src/main/java/io/vertx/core/impl/ContextImpl.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ static <T> void setResultHandler(ContextInternal ctx, Future<T> fut, Handler<Asy
5656
final TaskQueue internalOrderedTasks;
5757
final WorkerPool internalWorkerPool;
5858
final WorkerPool workerPool;
59-
final TaskQueue orderedTasks;
59+
final WorkerTaskQueue orderedTasks;
6060

6161
public ContextImpl(VertxInternal vertx,
6262
int localsLength,
@@ -65,7 +65,7 @@ public ContextImpl(VertxInternal vertx,
6565
EventExecutor executor,
6666
WorkerPool internalWorkerPool,
6767
WorkerPool workerPool,
68-
TaskQueue orderedTasks,
68+
WorkerTaskQueue orderedTasks,
6969
Deployment deployment,
7070
CloseFuture closeFuture,
7171
ClassLoader tccl) {
@@ -84,6 +84,14 @@ public ContextImpl(VertxInternal vertx,
8484
this.internalOrderedTasks = new TaskQueue();
8585
}
8686

87+
public Future<Void> close() {
88+
if (closeFuture == owner.closeFuture()) {
89+
return Future.future(p -> orderedTasks.shutdown(eventLoop, p));
90+
} else {
91+
return closeFuture.close().eventually(() -> Future.<Void>future(p -> orderedTasks.shutdown(eventLoop, p)));
92+
}
93+
}
94+
8795
public Deployment getDeployment() {
8896
return deployment;
8997
}
@@ -201,29 +209,29 @@ private static <T> Future<T> internalExecuteBlocking(ContextInternal context, Ha
201209
Object queueMetric = metrics != null ? metrics.submitted() : null;
202210
Promise<T> promise = context.promise();
203211
Future<T> fut = promise.future();
204-
try {
205-
Runnable command = () -> {
206-
Object execMetric = null;
207-
if (metrics != null) {
208-
execMetric = metrics.begin(queueMetric);
209-
}
212+
WorkerTask task = new WorkerTask(metrics, queueMetric) {
213+
@Override
214+
protected void execute() {
210215
context.dispatch(promise, blockingCodeHandler);
216+
}
217+
@Override
218+
void reject() {
211219
if (metrics != null) {
212-
metrics.end(execMetric, fut.succeeded());
220+
metrics.rejected(queueMetric);
213221
}
214-
};
222+
promise.fail(new RejectedExecutionException());
223+
}
224+
};
225+
try {
215226
Executor exec = workerPool.executor();
216227
if (queue != null) {
217-
queue.execute(command, exec);
228+
queue.execute(task, exec);
218229
} else {
219-
exec.execute(command);
230+
exec.execute(task);
220231
}
221232
} catch (RejectedExecutionException e) {
222233
// Pool is already shut down
223-
if (metrics != null) {
224-
metrics.rejected(queueMetric);
225-
}
226-
throw e;
234+
task.reject();
227235
}
228236
return fut;
229237
}

src/main/java/io/vertx/core/impl/ContextInternal.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,13 @@ default <E> void dispatch(E event, Handler<E> handler) {
275275
}
276276
}
277277

278+
/**
279+
* Close this context, cleanup close future hooks then dispose pending ordered task queue.
280+
*
281+
* @return a future signalling close completion
282+
*/
283+
Future<Void> close();
284+
278285
/**
279286
* Begin the execution of a task on this context.
280287
* <p>

src/main/java/io/vertx/core/impl/DeploymentManager.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ private Future<Deployment> doDeploy(String identifier,
200200
context = vertx.createVirtualThreadContext(deployment, closeFuture, tccl);
201201
break;
202202
}
203-
VerticleHolder holder = new VerticleHolder(verticle, context, closeFuture);
203+
VerticleHolder holder = new VerticleHolder(verticle, context);
204204
deployment.addVerticle(holder);
205205
context.runOnContext(v -> {
206206
try {
@@ -241,16 +241,14 @@ static class VerticleHolder {
241241

242242
final Verticle verticle;
243243
final ContextImpl context;
244-
final CloseFuture closeFuture;
245244

246-
VerticleHolder(Verticle verticle, ContextImpl context, CloseFuture closeFuture) {
245+
VerticleHolder(Verticle verticle, ContextImpl context) {
247246
this.verticle = verticle;
248247
this.context = context;
249-
this.closeFuture = closeFuture;
250248
}
251249

252250
void close(Handler<AsyncResult<Void>> completionHandler) {
253-
closeFuture.close().onComplete(completionHandler);
251+
context.close().onComplete(completionHandler);
254252
}
255253
}
256254

src/main/java/io/vertx/core/impl/DuplicatedContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,4 +193,9 @@ public ContextInternal unwrap() {
193193
public boolean isDuplicate() {
194194
return true;
195195
}
196+
197+
@Override
198+
public Future<Void> close() {
199+
return Future.succeededFuture();
200+
}
196201
}

src/main/java/io/vertx/core/impl/TaskQueue.java

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@ public class TaskQueue {
3838

3939
// @protectedby tasks
4040
private final LinkedList<Task> tasks = new LinkedList<>();
41-
private final Set<ContinuationTask> suspendedTasks = new HashSet<>();
41+
private final Set<ContinuationTask> continuations = new HashSet<>();
4242
private boolean closed;
4343
private Executor currentExecutor;
4444
private Thread currentThread;
45+
private ExecuteTask currentTask;
4546

4647
private final Runnable runner;
4748

@@ -65,6 +66,7 @@ private void run() {
6566
ContinuationTask resume = (ContinuationTask) task;
6667
currentExecutor = resume.executor;
6768
currentThread = resume.thread;
69+
currentTask = resume.task;
6870
resume.latch.run();
6971
return;
7072
}
@@ -78,11 +80,13 @@ private void run() {
7880
}
7981
try {
8082
currentThread = Thread.currentThread();
83+
currentTask = execute;
8184
execute.runnable.run();
8285
} catch (Throwable t) {
8386
log.error("Caught unexpected Throwable", t);
8487
} finally {
8588
currentThread = null;
89+
currentTask = null;
8690
}
8791
}
8892
}
@@ -100,6 +104,7 @@ private interface Task {
100104
* @throws IllegalStateException if the current thread is not currently being executed by the queue
101105
*/
102106
private ContinuationTask continuationTask() {
107+
ExecuteTask task;
103108
Thread thread;
104109
Executor executor;
105110
synchronized (tasks) {
@@ -108,8 +113,9 @@ private ContinuationTask continuationTask() {
108113
}
109114
thread = currentThread;
110115
executor = currentExecutor;
116+
task = currentTask;
111117
}
112-
return new ContinuationTask(thread, executor);
118+
return new ContinuationTask(task, thread, executor);
113119
}
114120

115121
/**
@@ -152,12 +158,20 @@ public boolean isEmpty() {
152158
public final static class CloseResult {
153159

154160
private final Thread activeThread;
155-
private final List<Runnable> pendingTasks;
161+
private final Runnable activeTask;
162+
private final List<Runnable> suspendedTasks;
156163
private final List<Thread> suspendedThreads;
164+
private final List<Runnable> pendingTasks;
157165

158-
private CloseResult(Thread activeThread, List<Thread> suspendedThreads, List<Runnable> pendingTasks) {
166+
private CloseResult(Thread activeThread,
167+
Runnable activeTask,
168+
List<Thread> suspendedThreads,
169+
List<Runnable> suspendedTasks,
170+
List<Runnable> pendingTasks) {
159171
this.activeThread = activeThread;
172+
this.activeTask = activeTask;
160173
this.suspendedThreads = suspendedThreads;
174+
this.suspendedTasks = suspendedTasks;
161175
this.pendingTasks = pendingTasks;
162176
}
163177

@@ -168,6 +182,10 @@ public Thread activeThread() {
168182
return activeThread;
169183
}
170184

185+
public Runnable activeTask() {
186+
return activeTask;
187+
}
188+
171189
/**
172190
* @return the list of pending tasks
173191
*/
@@ -181,6 +199,13 @@ public List<Runnable> pendingTasks() {
181199
public List<Thread> suspendedThreads() {
182200
return suspendedThreads;
183201
}
202+
203+
/**
204+
* @return the list of suspended tasks
205+
*/
206+
public List<Runnable> suspendedTasks() {
207+
return suspendedTasks;
208+
}
184209
}
185210

186211
/**
@@ -191,12 +216,15 @@ public List<Thread> suspendedThreads() {
191216
public CloseResult close() {
192217
List<Runnable> pendingTasks = Collections.emptyList();
193218
List<Thread> suspendedThreads;
194-
Thread currentThread;
219+
List<Runnable> suspendedTasks;
220+
Thread activeThread;
221+
Runnable activeTask;
195222
synchronized (tasks) {
196223
if (closed) {
197224
throw new IllegalStateException("Already closed");
198225
}
199-
suspendedThreads = new ArrayList<>(suspendedTasks.size() + 1);
226+
suspendedThreads = new ArrayList<>(continuations.size());
227+
suspendedTasks = new ArrayList<>(continuations.size());
200228
for (Task t : tasks) {
201229
if (t instanceof ExecuteTask) {
202230
if (pendingTasks.isEmpty()) {
@@ -206,31 +234,36 @@ public CloseResult close() {
206234
} else if (t instanceof ContinuationTask) {
207235
ContinuationTask rt = (ContinuationTask) t;
208236
suspendedThreads.add(rt.thread);
237+
suspendedTasks.add(rt.task.runnable);
209238
}
210239
}
211240
tasks.clear();
212-
for (ContinuationTask task : suspendedTasks) {
213-
suspendedThreads.add(task.thread);
241+
for (ContinuationTask cont : continuations) {
242+
suspendedThreads.add(cont.thread);
243+
suspendedTasks.add(cont.task.runnable);
214244
}
215-
suspendedTasks.clear();
216-
currentThread = this.currentThread;
245+
continuations.clear();
246+
activeThread = currentThread;
247+
activeTask = currentTask != null ? currentTask.runnable : null;
217248
currentExecutor = null;
218249
closed = true;
219250
}
220-
return new CloseResult(currentThread, suspendedThreads, pendingTasks);
251+
return new CloseResult(activeThread, activeTask, suspendedThreads, suspendedTasks, pendingTasks);
221252
}
222253

223254
private class ContinuationTask extends CountDownLatch implements WorkerExecutor.Continuation, Task {
224255

225256
private static final int ST_CREATED = 0, ST_SUSPENDED = 1, ST_RESUMED = 2;
226257

258+
private final ExecuteTask task;
227259
private final Thread thread;
228260
private final Executor executor;
229261
private int status;
230262
private Runnable latch;
231263

232-
public ContinuationTask(Thread thread, Executor executor) {
264+
public ContinuationTask(ExecuteTask task, Thread thread, Executor executor) {
233265
super(1);
266+
this.task = task;
234267
this.thread = thread;
235268
this.executor = executor;
236269
this.status = ST_CREATED;
@@ -244,7 +277,7 @@ public void resume(Runnable callback) {
244277
}
245278
switch (status) {
246279
case ST_SUSPENDED:
247-
boolean removed = suspendedTasks.remove(this);
280+
boolean removed = continuations.remove(this);
248281
assert removed;
249282
latch = () -> {
250283
callback.run();
@@ -256,11 +289,13 @@ public void resume(Runnable callback) {
256289
}
257290
currentExecutor = executor;
258291
currentThread = thread;
292+
currentTask = task;
259293
break;
260294
case ST_CREATED:
261295
// The current task still owns the queue
262296
assert currentExecutor == executor;
263297
assert currentThread == thread;
298+
assert currentTask == task;
264299
latch = callback;
265300
break;
266301
default:
@@ -290,9 +325,10 @@ public boolean suspend() {
290325
throw new IllegalStateException();
291326
}
292327
status = ST_SUSPENDED;
293-
boolean added = suspendedTasks.add(this);
328+
boolean added = continuations.add(this);
294329
assert added;
295330
currentThread = null;
331+
currentTask = null;
296332
}
297333
executor.execute(runner);
298334
return true;

0 commit comments

Comments
 (0)