Skip to content

Commit f1a7da1

Browse files
committed
Add shadow context implementation.
A shadow context is a context referring to a context of another vertx instance. Shadow context is created when a context is obtained from a vertx instance and the current context is already associated with a different context. A shadow context appears to be local to each vertx instance.
1 parent 8a53868 commit f1a7da1

22 files changed

+885
-263
lines changed

vertx-core/src/main/java/io/vertx/core/ThreadingModel.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,30 @@
1111
package io.vertx.core;
1212

1313
/**
14-
* The threading model defines how user tasks should be executed.
14+
* The threading model defines the scheduler to execute context tasks.
1515
*
1616
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
1717
*/
1818
public enum ThreadingModel {
1919

2020
/**
21-
* Event-loop threading model.
21+
* Tasks are scheduled on the event-loop thread.
2222
*/
2323
EVENT_LOOP,
2424

2525
/**
26-
* Worker threading model
26+
* Tasks are scheduled on a worker pool.
2727
*/
2828
WORKER,
2929

3030
/**
31-
* Virtual thread threading model
31+
* Tasks are scheduled on a virtual thread.
3232
*/
33-
VIRTUAL_THREAD
33+
VIRTUAL_THREAD,
3434

35+
/**
36+
* Tasks are scheduled on threads not managed by the current vertx instance.
37+
*/
38+
OTHER
3539
}
3640

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ private void beginRequest(Stream stream, HttpRequestHead request, boolean chunke
248248
if (this.metrics != null) {
249249
stream.metric = this.metrics.requestBegin(request.uri, request);
250250
}
251-
VertxTracer tracer = context.tracer();
251+
VertxTracer tracer = stream.context.tracer();
252252
if (tracer != null) {
253253
BiConsumer<String, String> headers = (key, val) -> new HeadersAdaptor(nettyRequest.headers()).add(key, val);
254254
String operation = request.traceOperation;
@@ -901,7 +901,7 @@ private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
901901
stream.responseEnded = true;
902902
check = requests.peek() != stream;
903903
}
904-
VertxTracer tracer = context.tracer();
904+
VertxTracer tracer = stream.context.tracer();
905905
if (tracer != null) {
906906
tracer.receiveResponse(stream.context, response, stream.trace, null, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
907907
}
@@ -1164,7 +1164,6 @@ protected void handleClosed() {
11641164
evictionHandler.handle(null);
11651165
}
11661166
}
1167-
VertxTracer tracer = context.tracer();
11681167
List<Stream> allocatedStreams;
11691168
List<Stream> sentStreams;
11701169
synchronized (this) {
@@ -1180,6 +1179,7 @@ protected void handleClosed() {
11801179
metrics.requestReset(stream.metric);
11811180
}
11821181
Object trace = stream.trace;
1182+
VertxTracer tracer = stream.context.tracer();
11831183
if (tracer != null && trace != null) {
11841184
tracer.receiveResponse(stream.context, null, trace, HttpUtils.CONNECTION_CLOSED_EXCEPTION, TagExtractor.empty());
11851185
}

vertx-core/src/main/java/io/vertx/core/impl/ContextBase.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,15 @@
1010
*/
1111
package io.vertx.core.impl;
1212

13+
import io.vertx.core.Future;
14+
import io.vertx.core.Handler;
1315
import io.vertx.core.ThreadingModel;
1416
import io.vertx.core.internal.ContextInternal;
1517
import io.vertx.core.spi.context.storage.AccessMode;
1618
import io.vertx.core.spi.context.storage.ContextLocal;
1719

20+
import java.util.concurrent.Callable;
21+
import java.util.concurrent.Executor;
1822
import java.util.function.Supplier;
1923

2024
/**
@@ -30,6 +34,9 @@ abstract class ContextBase implements ContextInternal {
3034
this.locals = locals;
3135
}
3236

37+
@Override
38+
public abstract EventExecutor executor();
39+
3340
public ContextInternal beginDispatch() {
3441
VertxImpl vertx = (VertxImpl) owner();
3542
return vertx.beginDispatch(this);
@@ -68,4 +75,50 @@ public final <T> void putLocal(ContextLocal<T> key, AccessMode accessMode, T val
6875
}
6976
accessMode.put(locals, index, value);
7077
}
78+
79+
@Override
80+
public final boolean inThread() {
81+
return executor().inThread();
82+
}
83+
84+
@Override
85+
public final <T> void emit(T argument, Handler<T> task) {
86+
if (executor().inThread()) {
87+
ContextInternal prev = beginDispatch();
88+
try {
89+
task.handle(argument);
90+
} catch (Throwable t) {
91+
reportException(t);
92+
} finally {
93+
endDispatch(prev);
94+
}
95+
} else {
96+
executor().execute(() -> emit(argument, task));
97+
}
98+
}
99+
100+
@Override
101+
public final void execute(Runnable task) {
102+
if (executor().inThread()) {
103+
task.run();
104+
} else {
105+
executor().execute(task);
106+
}
107+
}
108+
109+
/**
110+
* <ul>
111+
* <li>When the current thread is event-loop thread of this context the implementation will execute the {@code task} directly</li>
112+
* <li>When the current thread is a worker thread of this context the implementation will execute the {@code task} directly</li>
113+
* <li>Otherwise the task will be scheduled on the context thread for execution</li>
114+
* </ul>
115+
*/
116+
@Override
117+
public final <T> void execute(T argument, Handler<T> task) {
118+
if (executor().inThread()) {
119+
task.handle(argument);
120+
} else {
121+
executor().execute(() -> task.handle(argument));
122+
}
123+
}
71124
}

vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java

Lines changed: 7 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public final class ContextImpl extends ContextBase implements ContextInternal {
4343
private final CloseFuture closeFuture;
4444
private final ClassLoader tccl;
4545
private final EventLoop eventLoop;
46+
private final ThreadingModel threadingModel;
4647
private final EventExecutor executor;
4748
private ConcurrentMap<Object, Object> data;
4849
private volatile Handler<Throwable> exceptionHandler;
@@ -52,6 +53,7 @@ public final class ContextImpl extends ContextBase implements ContextInternal {
5253
public ContextImpl(VertxInternal vertx,
5354
Object[] locals,
5455
EventLoop eventLoop,
56+
ThreadingModel threadingModel,
5557
EventExecutor executor,
5658
WorkerPool workerPool,
5759
TaskQueue orderedTasks,
@@ -62,6 +64,7 @@ public ContextImpl(VertxInternal vertx,
6264
this.deployment = deployment;
6365
this.config = deployment != null ? deployment.config() : new JsonObject();
6466
this.eventLoop = eventLoop;
67+
this.threadingModel = threadingModel;
6568
this.executor = executor;
6669
this.tccl = tccl;
6770
this.owner = vertx;
@@ -92,14 +95,9 @@ public VertxInternal owner() {
9295
return owner;
9396
}
9497

95-
@Override
96-
public <T> Future<T> executeBlockingInternal(Callable<T> action) {
97-
return executeBlocking(this, action, null, null);
98-
}
99-
10098
@Override
10199
public <T> Future<T> executeBlocking(Callable<T> blockingCodeHandler, boolean ordered) {
102-
return executeBlocking(this, blockingCodeHandler, workerPool, ordered ? orderedTasks : null);
100+
return workerPool.executeBlocking(this, blockingCodeHandler, ordered ? orderedTasks : null);
103101
}
104102

105103
@Override
@@ -109,74 +107,16 @@ public EventExecutor executor() {
109107

110108
@Override
111109
public boolean isEventLoopContext() {
112-
return executor.threadingModel() == ThreadingModel.EVENT_LOOP;
110+
return threadingModel() == ThreadingModel.EVENT_LOOP;
113111
}
114112

115113
@Override
116114
public boolean isWorkerContext() {
117-
return executor.threadingModel() == ThreadingModel.WORKER;
115+
return threadingModel() == ThreadingModel.WORKER;
118116
}
119117

120118
public ThreadingModel threadingModel() {
121-
return executor.threadingModel();
122-
}
123-
124-
@Override
125-
public boolean inThread() {
126-
return executor.inThread();
127-
}
128-
129-
@Override
130-
public <T> Future<T> executeBlocking(Callable<T> blockingCodeHandler, TaskQueue queue) {
131-
return executeBlocking(this, blockingCodeHandler, workerPool, queue);
132-
}
133-
134-
static <T> Future<T> executeBlocking(ContextInternal context, Callable<T> blockingCodeHandler,
135-
WorkerPool workerPool, TaskQueue queue) {
136-
return internalExecuteBlocking(context, promise -> {
137-
T result;
138-
try {
139-
result = blockingCodeHandler.call();
140-
} catch (Throwable e) {
141-
promise.fail(e);
142-
return;
143-
}
144-
promise.complete(result);
145-
}, workerPool, queue);
146-
}
147-
148-
private static <T> Future<T> internalExecuteBlocking(ContextInternal context, Handler<Promise<T>> blockingCodeHandler,
149-
WorkerPool workerPool, TaskQueue queue) {
150-
PoolMetrics metrics = workerPool.metrics();
151-
Object queueMetric = metrics != null ? metrics.enqueue() : null;
152-
Promise<T> promise = context.promise();
153-
Future<T> fut = promise.future();
154-
try {
155-
Runnable command = () -> {
156-
Object execMetric = null;
157-
if (metrics != null) {
158-
metrics.dequeue(queueMetric);
159-
execMetric = metrics.begin();
160-
}
161-
context.dispatch(promise, blockingCodeHandler);
162-
if (metrics != null) {
163-
metrics.end(execMetric);
164-
}
165-
};
166-
Executor exec = workerPool.executor();
167-
if (queue != null) {
168-
queue.execute(command, exec);
169-
} else {
170-
exec.execute(command);
171-
}
172-
} catch (RejectedExecutionException e) {
173-
// Pool is already shut down
174-
if (metrics != null) {
175-
metrics.dequeue(queueMetric);
176-
}
177-
throw e;
178-
}
179-
return fut;
119+
return threadingModel;
180120
}
181121

182122
@Override
@@ -225,68 +165,6 @@ public Handler<Throwable> exceptionHandler() {
225165
return exceptionHandler;
226166
}
227167

228-
protected void runOnContext(ContextInternal ctx, Handler<Void> action) {
229-
try {
230-
Executor exec = ctx.executor();
231-
exec.execute(() -> ctx.dispatch(action));
232-
} catch (RejectedExecutionException ignore) {
233-
// Pool is already shut down
234-
}
235-
}
236-
237-
@Override
238-
public void execute(Runnable task) {
239-
execute(this, task);
240-
}
241-
242-
@Override
243-
public <T> void execute(T argument, Handler<T> task) {
244-
execute(this, argument, task);
245-
}
246-
247-
protected void execute(ContextInternal ctx, Runnable task) {
248-
if (inThread()) {
249-
task.run();
250-
} else {
251-
executor.execute(task);
252-
}
253-
}
254-
255-
/**
256-
* <ul>
257-
* <li>When the current thread is event-loop thread of this context the implementation will execute the {@code task} directly</li>
258-
* <li>When the current thread is a worker thread of this context the implementation will execute the {@code task} directly</li>
259-
* <li>Otherwise the task will be scheduled on the context thread for execution</li>
260-
* </ul>
261-
*/
262-
protected <T> void execute(ContextInternal ctx, T argument, Handler<T> task) {
263-
if (inThread()) {
264-
task.handle(argument);
265-
} else {
266-
executor.execute(() -> task.handle(argument));
267-
}
268-
}
269-
270-
@Override
271-
public <T> void emit(T argument, Handler<T> task) {
272-
emit(this, argument, task);
273-
}
274-
275-
protected <T> void emit(ContextInternal ctx, T argument, Handler<T> task) {
276-
if (inThread()) {
277-
ContextInternal prev = ctx.beginDispatch();
278-
try {
279-
task.handle(argument);
280-
} catch (Throwable t) {
281-
reportException(t);
282-
} finally {
283-
ctx.endDispatch(prev);
284-
}
285-
} else {
286-
executor.execute(() -> emit(ctx, argument, task));
287-
}
288-
}
289-
290168
@Override
291169
public ContextInternal duplicate() {
292170
return new DuplicatedContext(this, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);

vertx-core/src/main/java/io/vertx/core/impl/DuplicatedContext.java

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import java.util.concurrent.Callable;
2525
import java.util.concurrent.ConcurrentMap;
26-
import java.util.concurrent.Executor;
2726

2827
/**
2928
* A context that forwards most operations to a delegate. This context
@@ -48,11 +47,6 @@ public ThreadingModel threadingModel() {
4847
return delegate.threadingModel();
4948
}
5049

51-
@Override
52-
public boolean inThread() {
53-
return delegate.inThread();
54-
}
55-
5650
@Override
5751
public CloseFuture closeFuture() {
5852
return delegate.closeFuture();
@@ -75,7 +69,7 @@ public Context exceptionHandler(Handler<Throwable> handler) {
7569
}
7670

7771
@Override
78-
public Executor executor() {
72+
public EventExecutor executor() {
7973
return delegate.executor();
8074
}
8175

@@ -119,34 +113,9 @@ public ConcurrentMap<Object, Object> contextData() {
119113
return delegate.contextData();
120114
}
121115

122-
@Override
123-
public <T> Future<T> executeBlockingInternal(Callable<T> action) {
124-
return ContextImpl.executeBlocking(this, action, delegate.owner().getInternalWorkerPool(), null);
125-
}
126-
127116
@Override
128117
public <T> Future<T> executeBlocking(Callable<T> blockingCodeHandler, boolean ordered) {
129-
return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, ordered ? delegate.orderedTasks : null);
130-
}
131-
132-
@Override
133-
public <T> Future<T> executeBlocking(Callable<T> blockingCodeHandler, TaskQueue queue) {
134-
return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, queue);
135-
}
136-
137-
@Override
138-
public <T> void execute(T argument, Handler<T> task) {
139-
delegate.execute(this, argument, task);
140-
}
141-
142-
@Override
143-
public <T> void emit(T argument, Handler<T> task) {
144-
delegate.emit(this, argument, task);
145-
}
146-
147-
@Override
148-
public void execute(Runnable task) {
149-
delegate.execute(this, task);
118+
return delegate.workerPool.executeBlocking(this, blockingCodeHandler, ordered ? delegate.orderedTasks : null);
150119
}
151120

152121
@Override

0 commit comments

Comments
 (0)