Skip to content

Commit aec6053

Browse files
committed
Let the DeliveryContext next implementation work correctly when used by different threads.
Motivation: The current implementation of DeliveryContext#next assumes the same thread makes interceptor delivery progress, this API however assumes any thread can make interceptor progress. Changes: Rewrite the implementation of DeliveryContext#next with an atomic interceptor index that guarantees that progress is visible accross threads until the last interceptor is invoked.
1 parent 48868e1 commit aec6053

File tree

2 files changed

+22
-33
lines changed

2 files changed

+22
-33
lines changed

src/main/java/io/vertx/core/eventbus/impl/DeliveryContextBase.java

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@
1717

1818
import java.util.List;
1919

20+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
21+
2022
abstract class DeliveryContextBase<T> implements DeliveryContext<T> {
2123

24+
private static final AtomicIntegerFieldUpdater<DeliveryContextBase> UPDATER = AtomicIntegerFieldUpdater.newUpdater(DeliveryContextBase.class, "interceptorIdx");
25+
2226
public final MessageImpl<?, T> message;
2327
public final ContextInternal context;
2428

2529
private final Handler<DeliveryContext>[] interceptors;
26-
27-
private int interceptorIdx;
28-
private boolean invoking;
29-
private boolean invokeNext;
30+
private volatile int interceptorIdx;
3031

3132
protected DeliveryContextBase(MessageImpl<?, T> message, Handler<DeliveryContext>[] interceptors, ContextInternal context) {
3233
this.message = message;
@@ -36,12 +37,7 @@ protected DeliveryContextBase(MessageImpl<?, T> message, Handler<DeliveryContext
3637
}
3738

3839
void dispatch() {
39-
this.interceptorIdx = 0;
40-
if (invoking) {
41-
this.invokeNext = true;
42-
} else {
43-
next();
44-
}
40+
next();
4541
}
4642

4743
@Override
@@ -51,33 +47,24 @@ public Message<T> message() {
5147

5248
protected abstract void execute();
5349

54-
5550
@Override
5651
public void next() {
57-
if (invoking) {
58-
invokeNext = true;
59-
} else {
60-
while (interceptorIdx < interceptors.length) {
61-
Handler<DeliveryContext> interceptor = interceptors[interceptorIdx];
62-
invoking = true;
63-
interceptorIdx++;
64-
if (context.inThread()) {
65-
context.dispatch(this, interceptor);
66-
} else {
67-
try {
68-
interceptor.handle(this);
69-
} catch (Throwable t) {
70-
context.reportException(t);
71-
}
52+
int idx = UPDATER.getAndIncrement(this);
53+
if (idx < interceptors.length) {
54+
Handler<DeliveryContext> interceptor = interceptors[idx];
55+
if (context.inThread()) {
56+
context.dispatch(this, interceptor);
57+
} else {
58+
try {
59+
interceptor.handle(this);
60+
} catch (Throwable t) {
61+
context.reportException(t);
7262
}
73-
invoking = false;
74-
if (!invokeNext) {
75-
return;
76-
}
77-
invokeNext = false;
7863
}
79-
interceptorIdx = 0;
64+
} else if (idx == interceptors.length) {
8065
execute();
66+
} else {
67+
throw new IllegalStateException();
8168
}
8269
}
8370
}

src/test/java/io/vertx/core/eventbus/EventBusInterceptorTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ public void testOutboundInterceptorFromNonVertxThreadFailure() {
384384

385385
@Test
386386
public void testInboundInterceptorFromNonVertxThreadDispatch() {
387+
disableThreadChecks();
387388
AtomicReference<Thread> interceptorThread = new AtomicReference<>();
388389
AtomicReference<Thread> th = new AtomicReference<>();
389390
eb.addInboundInterceptor(sc -> {
@@ -393,12 +394,13 @@ public void testInboundInterceptorFromNonVertxThreadDispatch() {
393394
}).start();
394395
});
395396
eb.addInboundInterceptor(sc -> {
397+
assertTrue(!Context.isOnEventLoopThread());
396398
interceptorThread.set(Thread.currentThread());
397399
});
398400
eb.consumer("some-address", msg -> {
399401
});
400402
eb.send("some-address", "armadillo");
401-
waitUntil(() -> interceptorThread.get() != null);
403+
assertWaitUntil(() -> interceptorThread.get() != null);
402404
assertSame(th.get(), interceptorThread.get());
403405
}
404406

0 commit comments

Comments
 (0)