Skip to content

Commit 11e9b67

Browse files
committed
Use a broadcast flag when registering a handler registration
1 parent 6799425 commit 11e9b67

File tree

5 files changed

+13
-13
lines changed

5 files changed

+13
-13
lines changed

src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -258,17 +258,17 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers,
258258
return msg;
259259
}
260260

261-
protected <T> Consumer<Promise<Void>> addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly, Promise<Void> promise) {
261+
protected <T> Consumer<Promise<Void>> addRegistration(String address, HandlerRegistration<T> registration, boolean broadcast, boolean localOnly, Promise<Void> promise) {
262262
HandlerHolder<T> holder = addLocalRegistration(address, registration, localOnly);
263-
if (!replyHandler) {
263+
if (broadcast) {
264264
onLocalRegistration(holder, promise);
265265
} else {
266266
if (promise != null) {
267267
promise.complete();
268268
}
269269
}
270270
return p -> {
271-
removeRegistration(holder, replyHandler, p);
271+
removeRegistration(holder, broadcast, p);
272272
};
273273
}
274274

@@ -303,9 +303,9 @@ protected <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> regist
303303
return new HandlerHolder<>(registration, localOnly, context);
304304
}
305305

306-
protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, boolean replyHandler, Promise<Void> promise) {
306+
protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, boolean broadcast, Promise<Void> promise) {
307307
removeLocalRegistration(handlerHolder);
308-
if (!replyHandler) {
308+
if (broadcast) {
309309
onLocalUnregistration(handlerHolder, promise);
310310
} else {
311311
promise.complete();

src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,13 @@ void receive(MessageImpl msg) {
5858

5959
protected abstract void dispatch(Message<T> msg, ContextInternal context, Handler<Message<T>> handler);
6060

61-
synchronized void register(String repliedAddress, boolean localOnly, Promise<Void> promise) {
61+
synchronized void register(boolean broadcast, boolean localOnly, Promise<Void> promise) {
6262
if (registered != null) {
6363
throw new IllegalStateException();
6464
}
65-
registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, promise);
65+
registered = bus.addRegistration(address, this, broadcast, localOnly, promise);
6666
if (bus.metrics != null) {
67-
metric = bus.metrics.handlerRegistered(address, repliedAddress);
67+
metric = bus.metrics.handlerRegistered(address, null /* regression */);
6868
}
6969
}
7070

src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
216216
registered = true;
217217
Promise<Void> p = result;
218218
Promise<Void> registration = context.promise();
219-
register(null, localOnly, registration);
219+
register(true, localOnly, registration);
220220
registration.future().onComplete(ar -> {
221221
if (ar.succeeded()) {
222222
p.tryComplete();

src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ protected boolean doReceive(Message<T> reply) {
8383
}
8484

8585
void register() {
86-
register(repliedAddress, false, null);
86+
register(false, false, null);
8787
}
8888

8989
@Override

src/test/java/io/vertx/core/spi/metrics/MetricsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ public void testHandlerMetricReply() throws Exception {
429429
assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address);
430430
assertWaitUntil(() -> metrics.getRegistrations().size() == 2);
431431
HandlerMetric registration = metrics.getRegistrations().get(1);
432-
assertEquals(ADDRESS1, registration.repliedAddress);
432+
assertEquals(null, registration.repliedAddress); // new behavior
433433
assertEquals(0, registration.scheduleCount.get());
434434
assertEquals(0, registration.deliveredCount.get());
435435
assertEquals(0, registration.localDeliveredCount.get());
@@ -443,13 +443,13 @@ public void testHandlerMetricReply() throws Exception {
443443
vertx.eventBus().request(ADDRESS1, "ping").onComplete(onSuccess(reply -> {
444444
assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address);
445445
HandlerMetric registration = replyRegistration.get();
446-
assertEquals(ADDRESS1, registration.repliedAddress);
446+
assertEquals(null, registration.repliedAddress);
447447
assertEquals(1, registration.scheduleCount.get());
448448
assertEquals(1, registration.deliveredCount.get());
449449
assertEquals(1, registration.localDeliveredCount.get());
450450
vertx.runOnContext(v -> {
451451
assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address);
452-
assertEquals(ADDRESS1, registration.repliedAddress);
452+
assertEquals(null, registration.repliedAddress);
453453
assertEquals(1, registration.scheduleCount.get());
454454
assertEquals(1, registration.deliveredCount.get());
455455
assertEquals(1, registration.localDeliveredCount.get());

0 commit comments

Comments
 (0)