Skip to content

Commit 848a9a4

Browse files
committed
Let the DeliveryContext next implementation work correctly when used by different threads.
Motivation: The current implementation of DeliveryContext#next assumes the same thread makes interceptor delivery progress, this API however assumes any thread can make interceptor progress. Changes: Rewrite the implementation of DeliveryContext#next with an atomic interceptor index that guarantees that progress is visible accross threads until the last interceptor is invoked.
1 parent 532e428 commit 848a9a4

File tree

2 files changed

+22
-33
lines changed

2 files changed

+22
-33
lines changed

vertx-core/src/main/java/io/vertx/core/eventbus/impl/DeliveryContextBase.java

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,17 @@
1515
import io.vertx.core.eventbus.Message;
1616
import io.vertx.core.internal.ContextInternal;
1717

18+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19+
1820
abstract class DeliveryContextBase<T> implements DeliveryContext<T> {
1921

22+
private static final AtomicIntegerFieldUpdater<DeliveryContextBase> UPDATER = AtomicIntegerFieldUpdater.newUpdater(DeliveryContextBase.class, "interceptorIdx");
23+
2024
public final MessageImpl<?, T> message;
2125
public final ContextInternal context;
2226

2327
private final Handler<DeliveryContext>[] interceptors;
24-
25-
private int interceptorIdx;
26-
private boolean invoking;
27-
private boolean invokeNext;
28+
private volatile int interceptorIdx;
2829

2930
protected DeliveryContextBase(MessageImpl<?, T> message, Handler<DeliveryContext>[] interceptors, ContextInternal context) {
3031
this.message = message;
@@ -34,12 +35,7 @@ protected DeliveryContextBase(MessageImpl<?, T> message, Handler<DeliveryContext
3435
}
3536

3637
void dispatch() {
37-
this.interceptorIdx = 0;
38-
if (invoking) {
39-
this.invokeNext = true;
40-
} else {
41-
next();
42-
}
38+
next();
4339
}
4440

4541
@Override
@@ -49,33 +45,24 @@ public Message<T> message() {
4945

5046
protected abstract void execute();
5147

52-
5348
@Override
5449
public void next() {
55-
if (invoking) {
56-
invokeNext = true;
57-
} else {
58-
while (interceptorIdx < interceptors.length) {
59-
Handler<DeliveryContext> interceptor = interceptors[interceptorIdx];
60-
invoking = true;
61-
interceptorIdx++;
62-
if (context.inThread()) {
63-
context.dispatch(this, interceptor);
64-
} else {
65-
try {
66-
interceptor.handle(this);
67-
} catch (Throwable t) {
68-
context.reportException(t);
69-
}
50+
int idx = UPDATER.getAndIncrement(this);
51+
if (idx < interceptors.length) {
52+
Handler<DeliveryContext> interceptor = interceptors[idx];
53+
if (context.inThread()) {
54+
context.dispatch(this, interceptor);
55+
} else {
56+
try {
57+
interceptor.handle(this);
58+
} catch (Throwable t) {
59+
context.reportException(t);
7060
}
71-
invoking = false;
72-
if (!invokeNext) {
73-
return;
74-
}
75-
invokeNext = false;
7661
}
77-
interceptorIdx = 0;
62+
} else if (idx == interceptors.length) {
7863
execute();
64+
} else {
65+
throw new IllegalStateException();
7966
}
8067
}
8168
}

vertx-core/src/test/java/io/vertx/tests/eventbus/EventBusInterceptorTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ public void testOutboundInterceptorFromNonVertxThreadFailure() {
385385

386386
@Test
387387
public void testInboundInterceptorFromNonVertxThreadDispatch() {
388+
disableThreadChecks();
388389
AtomicReference<Thread> interceptorThread = new AtomicReference<>();
389390
AtomicReference<Thread> th = new AtomicReference<>();
390391
eb.addInboundInterceptor(sc -> {
@@ -394,12 +395,13 @@ public void testInboundInterceptorFromNonVertxThreadDispatch() {
394395
}).start();
395396
});
396397
eb.addInboundInterceptor(sc -> {
398+
assertTrue(!Context.isOnEventLoopThread());
397399
interceptorThread.set(Thread.currentThread());
398400
});
399401
eb.consumer("some-address", msg -> {
400402
});
401403
eb.send("some-address", "armadillo");
402-
waitUntil(() -> interceptorThread.get() != null);
404+
assertWaitUntil(() -> interceptorThread.get() != null);
403405
assertSame(th.get(), interceptorThread.get());
404406
}
405407

0 commit comments

Comments
 (0)