diff --git a/src/main/java/io/vertx/core/eventbus/impl/DeliveryContextBase.java b/src/main/java/io/vertx/core/eventbus/impl/DeliveryContextBase.java index 9d4338b8d8a..e40a2f6003f 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/DeliveryContextBase.java +++ b/src/main/java/io/vertx/core/eventbus/impl/DeliveryContextBase.java @@ -17,16 +17,17 @@ import java.util.List; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + abstract class DeliveryContextBase implements DeliveryContext { + private static final AtomicIntegerFieldUpdater UPDATER = AtomicIntegerFieldUpdater.newUpdater(DeliveryContextBase.class, "interceptorIdx"); + public final MessageImpl message; public final ContextInternal context; private final Handler[] interceptors; - - private int interceptorIdx; - private boolean invoking; - private boolean invokeNext; + private volatile int interceptorIdx; protected DeliveryContextBase(MessageImpl message, Handler[] interceptors, ContextInternal context) { this.message = message; @@ -36,12 +37,7 @@ protected DeliveryContextBase(MessageImpl message, Handler message() { protected abstract void execute(); - @Override public void next() { - if (invoking) { - invokeNext = true; - } else { - while (interceptorIdx < interceptors.length) { - Handler interceptor = interceptors[interceptorIdx]; - invoking = true; - interceptorIdx++; - if (context.inThread()) { - context.dispatch(this, interceptor); - } else { - try { - interceptor.handle(this); - } catch (Throwable t) { - context.reportException(t); - } + int idx = UPDATER.getAndIncrement(this); + if (idx < interceptors.length) { + Handler interceptor = interceptors[idx]; + if (context.inThread()) { + context.dispatch(this, interceptor); + } else { + try { + interceptor.handle(this); + } catch (Throwable t) { + context.reportException(t); } - invoking = false; - if (!invokeNext) { - return; - } - invokeNext = false; } - interceptorIdx = 0; + } else if (idx == interceptors.length) { execute(); + } else { + throw new IllegalStateException(); } } } diff --git a/src/test/java/io/vertx/core/eventbus/EventBusInterceptorTest.java b/src/test/java/io/vertx/core/eventbus/EventBusInterceptorTest.java index e79e0748d3e..f7fa4b078f4 100644 --- a/src/test/java/io/vertx/core/eventbus/EventBusInterceptorTest.java +++ b/src/test/java/io/vertx/core/eventbus/EventBusInterceptorTest.java @@ -384,6 +384,7 @@ public void testOutboundInterceptorFromNonVertxThreadFailure() { @Test public void testInboundInterceptorFromNonVertxThreadDispatch() { + disableThreadChecks(); AtomicReference interceptorThread = new AtomicReference<>(); AtomicReference th = new AtomicReference<>(); eb.addInboundInterceptor(sc -> { @@ -393,12 +394,13 @@ public void testInboundInterceptorFromNonVertxThreadDispatch() { }).start(); }); eb.addInboundInterceptor(sc -> { + assertTrue(!Context.isOnEventLoopThread()); interceptorThread.set(Thread.currentThread()); }); eb.consumer("some-address", msg -> { }); eb.send("some-address", "armadillo"); - waitUntil(() -> interceptorThread.get() != null); + assertWaitUntil(() -> interceptorThread.get() != null); assertSame(th.get(), interceptorThread.get()); }