Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 19 additions & 32 deletions src/main/java/io/vertx/core/eventbus/impl/DeliveryContextBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

import java.util.List;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

abstract class DeliveryContextBase<T> implements DeliveryContext<T> {

private static final AtomicIntegerFieldUpdater<DeliveryContextBase> UPDATER = AtomicIntegerFieldUpdater.newUpdater(DeliveryContextBase.class, "interceptorIdx");

public final MessageImpl<?, T> message;
public final ContextInternal context;

private final Handler<DeliveryContext>[] interceptors;

private int interceptorIdx;
private boolean invoking;
private boolean invokeNext;
private volatile int interceptorIdx;

protected DeliveryContextBase(MessageImpl<?, T> message, Handler<DeliveryContext>[] interceptors, ContextInternal context) {
this.message = message;
Expand All @@ -36,12 +37,7 @@ protected DeliveryContextBase(MessageImpl<?, T> message, Handler<DeliveryContext
}

void dispatch() {
this.interceptorIdx = 0;
if (invoking) {
this.invokeNext = true;
} else {
next();
}
next();
}

@Override
Expand All @@ -51,33 +47,24 @@ public Message<T> message() {

protected abstract void execute();


@Override
public void next() {
if (invoking) {
invokeNext = true;
} else {
while (interceptorIdx < interceptors.length) {
Handler<DeliveryContext> interceptor = interceptors[interceptorIdx];
invoking = true;
interceptorIdx++;
if (context.inThread()) {
context.dispatch(this, interceptor);
} else {
try {
interceptor.handle(this);
} catch (Throwable t) {
context.reportException(t);
}
int idx = UPDATER.getAndIncrement(this);
if (idx < interceptors.length) {
Handler<DeliveryContext> interceptor = interceptors[idx];
if (context.inThread()) {
context.dispatch(this, interceptor);
} else {
try {
interceptor.handle(this);
} catch (Throwable t) {
context.reportException(t);
}
invoking = false;
if (!invokeNext) {
return;
}
invokeNext = false;
}
interceptorIdx = 0;
} else if (idx == interceptors.length) {
execute();
} else {
throw new IllegalStateException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ public void testOutboundInterceptorFromNonVertxThreadFailure() {

@Test
public void testInboundInterceptorFromNonVertxThreadDispatch() {
disableThreadChecks();
AtomicReference<Thread> interceptorThread = new AtomicReference<>();
AtomicReference<Thread> th = new AtomicReference<>();
eb.addInboundInterceptor(sc -> {
Expand All @@ -393,12 +394,13 @@ public void testInboundInterceptorFromNonVertxThreadDispatch() {
}).start();
});
eb.addInboundInterceptor(sc -> {
assertTrue(!Context.isOnEventLoopThread());
interceptorThread.set(Thread.currentThread());
});
eb.consumer("some-address", msg -> {
});
eb.send("some-address", "armadillo");
waitUntil(() -> interceptorThread.get() != null);
assertWaitUntil(() -> interceptorThread.get() != null);
assertSame(th.get(), interceptorThread.get());
}

Expand Down
Loading