Skip to content

Commit fbe4c82

Browse files
authored
Merge pull request #5699 from eclipse-vertx/improve-event-bus-interceptor
Avoid the event-bus delivery context machinery when there are not interceptors.
2 parents 848a9a4 + f0f57f6 commit fbe4c82

File tree

6 files changed

+87
-92
lines changed

6 files changed

+87
-92
lines changed

vertx-core/src/main/java/io/vertx/core/eventbus/impl/DeliveryContextBase.java renamed to vertx-core/src/main/java/io/vertx/core/eventbus/impl/DeliveryContextImpl.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,39 +17,47 @@
1717

1818
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
1919

20-
abstract class DeliveryContextBase<T> implements DeliveryContext<T> {
20+
class DeliveryContextImpl<T> implements DeliveryContext<T> {
2121

22-
private static final AtomicIntegerFieldUpdater<DeliveryContextBase> UPDATER = AtomicIntegerFieldUpdater.newUpdater(DeliveryContextBase.class, "interceptorIdx");
22+
private static final AtomicIntegerFieldUpdater<DeliveryContextImpl> UPDATER = AtomicIntegerFieldUpdater.newUpdater(DeliveryContextImpl.class, "interceptorIdx");
2323

24-
public final MessageImpl<?, T> message;
25-
public final ContextInternal context;
26-
27-
private final Handler<DeliveryContext>[] interceptors;
24+
private final MessageImpl<?, T> message;
25+
private final ContextInternal context;
26+
private final Object body;
27+
private final Runnable dispatch;
28+
private final Handler<DeliveryContext<?>>[] interceptors;
2829
private volatile int interceptorIdx;
2930

30-
protected DeliveryContextBase(MessageImpl<?, T> message, Handler<DeliveryContext>[] interceptors, ContextInternal context) {
31+
protected DeliveryContextImpl(MessageImpl<?, T> message, Handler<DeliveryContext<?>>[] interceptors,
32+
ContextInternal context, Object body, Runnable dispatch) {
3133
this.message = message;
3234
this.interceptors = interceptors;
3335
this.context = context;
3436
this.interceptorIdx = 0;
35-
}
36-
37-
void dispatch() {
38-
next();
37+
this.body = body;
38+
this.dispatch = dispatch;
3939
}
4040

4141
@Override
4242
public Message<T> message() {
4343
return message;
4444
}
4545

46-
protected abstract void execute();
46+
@Override
47+
public boolean send() {
48+
return message.isSend();
49+
}
50+
51+
@Override
52+
public Object body() {
53+
return body;
54+
}
4755

4856
@Override
4957
public void next() {
5058
int idx = UPDATER.getAndIncrement(this);
5159
if (idx < interceptors.length) {
52-
Handler<DeliveryContext> interceptor = interceptors[idx];
60+
Handler<DeliveryContext<?>> interceptor = interceptors[idx];
5361
if (context.inThread()) {
5462
context.dispatch(this, interceptor);
5563
} else {
@@ -60,7 +68,7 @@ public void next() {
6068
}
6169
}
6270
} else if (idx == interceptors.length) {
63-
execute();
71+
dispatch.run();
6472
} else {
6573
throw new IllegalStateException();
6674
}

vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public class EventBusImpl implements EventBusInternal, MetricsProvider {
4242
private static final AtomicReferenceFieldUpdater<EventBusImpl, Handler[]> OUTBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventBusImpl.class, Handler[].class, "outboundInterceptors");
4343
private static final AtomicReferenceFieldUpdater<EventBusImpl, Handler[]> INBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventBusImpl.class, Handler[].class, "inboundInterceptors");
4444

45-
private volatile Handler<DeliveryContext>[] outboundInterceptors = new Handler[0];
46-
private volatile Handler<DeliveryContext>[] inboundInterceptors = new Handler[0];
45+
private volatile Handler<DeliveryContext<?>>[] outboundInterceptors = new Handler[0];
46+
private volatile Handler<DeliveryContext<?>>[] inboundInterceptors = new Handler[0];
4747
private final AtomicLong replySequence = new AtomicLong(0);
4848
protected final VertxInternal vertx;
4949
protected final EventBusMetrics metrics;
@@ -81,11 +81,11 @@ public <T> EventBus removeInboundInterceptor(Handler<DeliveryContext<T>> interce
8181
return this;
8282
}
8383

84-
Handler<DeliveryContext>[] inboundInterceptors() {
84+
Handler<DeliveryContext<?>>[] inboundInterceptors() {
8585
return inboundInterceptors;
8686
}
8787

88-
Handler<DeliveryContext>[] outboundInterceptors() {
88+
Handler<DeliveryContext<?>>[] outboundInterceptors() {
8989
return outboundInterceptors;
9090
}
9191

@@ -343,19 +343,19 @@ private <T> void removeLocalRegistration(HandlerHolder<T> holder) {
343343
}
344344
}
345345

346-
protected <T> void sendReply(MessageImpl replyMessage, DeliveryOptions options, ReplyHandler<T> replyHandler) {
346+
protected <T> void sendReply(MessageImpl<?, T> replyMessage, DeliveryOptions options, ReplyHandler<T> replyHandler) {
347347
if (replyMessage.address() == null) {
348348
throw new IllegalStateException("address not specified");
349349
} else {
350-
sendOrPubInternal(new OutboundDeliveryContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler));
350+
sendOrPubInternal(new SendContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler));
351351
}
352352
}
353353

354354
protected <T> void sendOrPub(ContextInternal ctx, MessageImpl<?, T> message, DeliveryOptions options, Promise<Void> writePromise) {
355355
sendLocally(message, writePromise);
356356
}
357357

358-
protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
358+
protected <T> void sendOrPub(SendContext<T> sendContext) {
359359
sendOrPub(sendContext.ctx, sendContext.message, sendContext.options, sendContext);
360360
}
361361

@@ -432,22 +432,21 @@ <T> ReplyHandler<T> createReplyHandler(MessageImpl message,
432432
return handler;
433433
}
434434

435-
public <T> OutboundDeliveryContext<T> newSendContext(MessageImpl message, DeliveryOptions options,
436-
ReplyHandler<T> handler) {
437-
return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler);
435+
<T> SendContext<T> newSendContext(MessageImpl<?, T> message, DeliveryOptions options, ReplyHandler<T> handler) {
436+
return new SendContext<>(vertx.getOrCreateContext(), message, options, handler);
438437
}
439438

440-
public <T> void sendOrPubInternal(OutboundDeliveryContext<T> senderCtx) {
439+
public <T> void sendOrPubInternal(SendContext<T> senderCtx) {
441440
checkStarted();
442441
senderCtx.bus = this;
443442
senderCtx.metrics = metrics;
444-
senderCtx.next();
443+
senderCtx.send();
445444
}
446445

447-
public <T> Future<Void> sendOrPubInternal(MessageImpl message, DeliveryOptions options,
446+
<T> Future<Void> sendOrPubInternal(MessageImpl<?, T> message, DeliveryOptions options,
448447
ReplyHandler<T> handler) {
449448
checkStarted();
450-
OutboundDeliveryContext<T> ctx = newSendContext(message, options, handler);
449+
SendContext<T> ctx = newSendContext(message, options, handler);
451450
sendOrPubInternal(ctx);
452451
Future<Void> future = ctx.writePromise.future();
453452
if (message.send) {

vertx-core/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java

Lines changed: 29 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package io.vertx.core.eventbus.impl;
1212

1313
import io.vertx.core.*;
14+
import io.vertx.core.eventbus.DeliveryContext;
1415
import io.vertx.core.eventbus.Message;
1516
import io.vertx.core.eventbus.ReplyException;
1617
import io.vertx.core.eventbus.ReplyFailure;
@@ -58,7 +59,7 @@ public String address() {
5859

5960
protected abstract void doReceive(Message<T> msg);
6061

61-
protected abstract void dispatch(Message<T> msg, ContextInternal context, Handler<Message<T>> handler);
62+
protected abstract void dispatchMessage(Message<T> msg, ContextInternal context, Handler<Message<T>> handler);
6263

6364
synchronized void register(boolean broadcast, boolean localOnly, Completable<Void> promise) {
6465
if (registered != null) {
@@ -90,9 +91,33 @@ public Future<Void> unregister() {
9091
return promise.future();
9192
}
9293

93-
void dispatch(Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
94-
InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl<?, T>) message, theHandler, context);
95-
deliveryCtx.dispatch();
94+
void dispatchMessage(Handler<Message<T>> handler, MessageImpl<?, T> message, ContextInternal context) {
95+
Handler<DeliveryContext<?>>[] interceptors = message.bus.inboundInterceptors();
96+
if (interceptors.length > 0) {
97+
Runnable dispatch = () -> dispatch(context, message, handler);
98+
DeliveryContextImpl<T> deliveryCtx = new DeliveryContextImpl<>(message, interceptors, context, message.receivedBody, dispatch);
99+
deliveryCtx.next();
100+
} else {
101+
dispatch(context, message, handler);
102+
}
103+
}
104+
105+
private void dispatch(ContextInternal ctx, MessageImpl<?, T> message, Handler<Message<T>> handler) {
106+
Object m = metric;
107+
VertxTracer tracer = ctx.tracer();
108+
if (bus.metrics != null) {
109+
bus.metrics.messageDelivered(m, message.isLocal());
110+
}
111+
if (tracer != null && !src) {
112+
message.trace = tracer.receiveRequest(ctx, SpanKind.RPC, TracingPolicy.PROPAGATE, message, message.isSend() ? "send" : "publish", message.headers(), MessageTagExtractor.INSTANCE);
113+
dispatchMessage(message, ctx, handler);
114+
Object trace = message.trace;
115+
if (message.replyAddress == null && trace != null) {
116+
tracer.sendResponse(ctx, null, trace, null, TagExtractor.empty());
117+
}
118+
} else {
119+
dispatchMessage(message, ctx, handler);
120+
}
96121
}
97122

98123
void discardMessage(Message<T> msg) {
@@ -106,46 +131,6 @@ void discardMessage(Message<T> msg) {
106131
}
107132
}
108133

109-
private class InboundDeliveryContext extends DeliveryContextBase<T> {
110-
111-
private final Handler<Message<T>> handler;
112-
113-
private InboundDeliveryContext(MessageImpl<?, T> message, Handler<Message<T>> handler, ContextInternal context) {
114-
super(message, message.bus.inboundInterceptors(), context);
115-
116-
this.handler = handler;
117-
}
118-
119-
protected void execute() {
120-
ContextInternal ctx = InboundDeliveryContext.super.context;
121-
Object m = metric;
122-
VertxTracer tracer = ctx.tracer();
123-
if (bus.metrics != null) {
124-
bus.metrics.messageDelivered(m, message.isLocal());
125-
}
126-
if (tracer != null && !src) {
127-
message.trace = tracer.receiveRequest(ctx, SpanKind.RPC, TracingPolicy.PROPAGATE, message, message.isSend() ? "send" : "publish", message.headers(), MessageTagExtractor.INSTANCE);
128-
HandlerRegistration.this.dispatch(message, ctx, handler);
129-
Object trace = message.trace;
130-
if (message.replyAddress == null && trace != null) {
131-
tracer.sendResponse(this.context, null, trace, null, TagExtractor.empty());
132-
}
133-
} else {
134-
HandlerRegistration.this.dispatch(message, ctx, handler);
135-
}
136-
}
137-
138-
@Override
139-
public boolean send() {
140-
return message.isSend();
141-
}
142-
143-
@Override
144-
public Object body() {
145-
return message.receivedBody;
146-
}
147-
}
148-
149134
@Override
150135
public void close(Completable<Void> completion) {
151136
unregister().onComplete(completion);

vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ protected void handleMessage(Message<T> msg) {
5757
handler = MessageConsumerImpl.this.handler;
5858
}
5959
if (handler != null) {
60-
dispatch(handler, msg, context.duplicate());
60+
dispatchMessage(handler, (MessageImpl<?, T>) msg, context.duplicate());
6161
} else {
6262
handleDiscard(msg, false);
6363
}
@@ -118,7 +118,7 @@ protected void doReceive(Message<T> message) {
118118
}
119119

120120
@Override
121-
protected void dispatch(Message<T> msg, ContextInternal context, Handler<Message<T>> handler) {
121+
protected void dispatchMessage(Message<T> msg, ContextInternal context, Handler<Message<T>> handler) {
122122
if (handler == null) {
123123
throw new NullPointerException();
124124
}

vertx-core/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,15 @@ public void handle(Long id) {
7575

7676
@Override
7777
protected void doReceive(Message<T> reply) {
78-
dispatch(null, reply, context);
78+
dispatchMessage(null, (MessageImpl<?, T>) reply, context);
7979
}
8080

8181
void register() {
8282
register(false, false, NULL_COMPLETABLE);
8383
}
8484

8585
@Override
86-
protected void dispatch(Message<T> reply, ContextInternal context, Handler<Message<T>> handler /* null */) {
86+
protected void dispatchMessage(Message<T> reply, ContextInternal context, Handler<Message<T>> handler /* null */) {
8787
if (context.owner().cancelTimer(timeoutID)) {
8888
unregister();
8989
if (reply.body() instanceof ReplyException) {
Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
package io.vertx.core.eventbus.impl;
1212

1313
import io.vertx.core.Future;
14+
import io.vertx.core.Handler;
1415
import io.vertx.core.Promise;
16+
import io.vertx.core.eventbus.DeliveryContext;
1517
import io.vertx.core.eventbus.DeliveryOptions;
1618
import io.vertx.core.eventbus.ReplyException;
1719
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
@@ -24,25 +26,36 @@
2426

2527
import java.util.function.BiConsumer;
2628

27-
public class OutboundDeliveryContext<T> extends DeliveryContextBase<T> implements Promise<Void> {
29+
public class SendContext<T> implements Promise<Void> {
2830

31+
private final ReplyHandler<T> replyHandler;
32+
private boolean src;
33+
public final MessageImpl<?, T> message;
2934
public final ContextInternal ctx;
3035
public final DeliveryOptions options;
31-
public final ReplyHandler<T> replyHandler;
3236
public final Promise<Void> writePromise;
33-
private boolean src;
3437

3538
EventBusImpl bus;
3639
EventBusMetrics metrics;
3740

38-
OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler<T> replyHandler) {
39-
super(message, message.bus.outboundInterceptors(), ctx);
41+
SendContext(ContextInternal ctx, MessageImpl<?, T> message, DeliveryOptions options, ReplyHandler<T> replyHandler) {
42+
this.message = message;
4043
this.ctx = ctx;
4144
this.options = options;
4245
this.replyHandler = replyHandler;
4346
this.writePromise = ctx.promise();
4447
}
4548

49+
void send() {
50+
Handler<DeliveryContext<?>>[] interceptors = message.bus.outboundInterceptors();
51+
if (interceptors.length > 0) {
52+
DeliveryContextImpl<T> deliveryContext = new DeliveryContextImpl<>(message, interceptors, ctx, message.sentBody, this::sendOrPub);
53+
deliveryContext.next();
54+
} else {
55+
sendOrPub();
56+
}
57+
}
58+
4659
@Override
4760
public boolean tryComplete(Void result) {
4861
written(null);
@@ -100,8 +113,7 @@ private void written(Throwable failure) {
100113
}
101114
}
102115

103-
@Override
104-
protected void execute() {
116+
private void sendOrPub() {
105117
VertxTracer tracer = ctx.tracer();
106118
if (tracer != null) {
107119
if (message.trace == null) {
@@ -120,13 +132,4 @@ protected void execute() {
120132
bus.sendOrPub(this);
121133
}
122134

123-
@Override
124-
public boolean send() {
125-
return message.isSend();
126-
}
127-
128-
@Override
129-
public Object body() {
130-
return message.sentBody;
131-
}
132135
}

0 commit comments

Comments
 (0)