diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 9b95ab0b4c2..d9194e559b2 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -116,7 +117,7 @@ public EventBus send(String address, Object message) { @Override public EventBus send(String address, Object message, DeliveryOptions options) { MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName()); - sendOrPubInternal(msg, options, null, null); + sendOrPubInternal(msg, options, null); return this; } @@ -124,7 +125,7 @@ public EventBus send(String address, Object message, DeliveryOptions options) { public Future> request(String address, Object message, DeliveryOptions options) { MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName()); ReplyHandler handler = createReplyHandler(msg, true, options); - sendOrPubInternal(msg, options, handler, null); + sendOrPubInternal(msg, options, handler); return handler.result(); } @@ -161,7 +162,7 @@ public EventBus publish(String address, Object message) { @Override public EventBus publish(String address, Object message, DeliveryOptions options) { - sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null, null); + sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null); return this; } @@ -257,10 +258,18 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers, return msg; } - protected HandlerHolder addRegistration(String address, HandlerRegistration registration, boolean replyHandler, boolean localOnly, Promise promise) { - HandlerHolder holder = addLocalRegistration(address, registration, replyHandler, localOnly); - onLocalRegistration(holder, promise); - return holder; + protected Consumer> addRegistration(String address, HandlerRegistration registration, boolean broadcast, boolean localOnly, Promise promise) { + HandlerHolder holder = addLocalRegistration(address, registration, localOnly); + if (broadcast) { + onLocalRegistration(holder, promise); + } else { + if (promise != null) { + promise.complete(); + } + } + return p -> { + removeRegistration(holder, broadcast, p); + }; } protected void onLocalRegistration(HandlerHolder handlerHolder, Promise promise) { @@ -270,12 +279,12 @@ protected void onLocalRegistration(HandlerHolder handlerHolder, Promise HandlerHolder addLocalRegistration(String address, HandlerRegistration registration, - boolean replyHandler, boolean localOnly) { + boolean localOnly) { Objects.requireNonNull(address, "address"); ContextInternal context = registration.context; - HandlerHolder holder = createHandlerHolder(registration, replyHandler, localOnly, context); + HandlerHolder holder = createHandlerHolder(registration, localOnly, context); ConcurrentCyclicSequence handlers = new ConcurrentCyclicSequence().add(holder); ConcurrentCyclicSequence actualHandlers = handlerMap.merge( @@ -290,13 +299,17 @@ private HandlerHolder addLocalRegistration(String address, HandlerRegistr return holder; } - protected HandlerHolder createHandlerHolder(HandlerRegistration registration, boolean replyHandler, boolean localOnly, ContextInternal context) { - return new HandlerHolder<>(registration, replyHandler, localOnly, context); + protected HandlerHolder createHandlerHolder(HandlerRegistration registration, boolean localOnly, ContextInternal context) { + return new HandlerHolder<>(registration, localOnly, context); } - protected void removeRegistration(HandlerHolder handlerHolder, Promise promise) { + protected void removeRegistration(HandlerHolder handlerHolder, boolean broadcast, Promise promise) { removeLocalRegistration(handlerHolder); - onLocalUnregistration(handlerHolder, promise); + if (broadcast) { + onLocalUnregistration(handlerHolder, promise); + } else { + promise.complete(); + } } protected void onLocalUnregistration(HandlerHolder handlerHolder, Promise promise) { @@ -321,20 +334,24 @@ protected void sendReply(MessageImpl replyMessage, DeliveryOptions options, if (replyMessage.address() == null) { throw new IllegalStateException("address not specified"); } else { - sendOrPubInternal(new OutboundDeliveryContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler, null)); + sendOrPubInternal(new OutboundDeliveryContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler)); } } + protected void sendOrPub(ContextInternal ctx, MessageImpl message, DeliveryOptions options, Promise writePromise) { + sendLocally(message, writePromise); + } + protected void sendOrPub(OutboundDeliveryContext sendContext) { - sendLocally(sendContext); + sendOrPub(sendContext.ctx, sendContext.message, sendContext.options, sendContext); } - private void sendLocally(OutboundDeliveryContext sendContext) { - ReplyException failure = deliverMessageLocally(sendContext.message); + protected void sendLocally(MessageImpl message, Promise writePromise) { + ReplyException failure = deliverMessageLocally(message); if (failure != null) { - sendContext.written(failure); + writePromise.tryFail(failure); } else { - sendContext.written(null); + writePromise.tryComplete(); } } @@ -403,8 +420,8 @@ ReplyHandler createReplyHandler(MessageImpl message, } public OutboundDeliveryContext newSendContext(MessageImpl message, DeliveryOptions options, - ReplyHandler handler, Promise writePromise) { - return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler, writePromise); + ReplyHandler handler) { + return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler); } public void sendOrPubInternal(OutboundDeliveryContext senderCtx) { @@ -414,10 +431,12 @@ public void sendOrPubInternal(OutboundDeliveryContext senderCtx) { senderCtx.next(); } - public void sendOrPubInternal(MessageImpl message, DeliveryOptions options, - ReplyHandler handler, Promise writePromise) { + public Future sendOrPubInternal(MessageImpl message, DeliveryOptions options, + ReplyHandler handler) { checkStarted(); - sendOrPubInternal(newSendContext(message, options, handler, writePromise)); + OutboundDeliveryContext ctx = newSendContext(message, options, handler); + sendOrPubInternal(ctx); + return ctx.writePromise.future(); } private Future unregisterAll() { diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java index d1135b2b2f0..5a0e32c1497 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java @@ -22,14 +22,12 @@ public class HandlerHolder { public final ContextInternal context; public final HandlerRegistration handler; - public final boolean replyHandler; public final boolean localOnly; private boolean removed; - public HandlerHolder(HandlerRegistration handler, boolean replyHandler, boolean localOnly, ContextInternal context) { + public HandlerHolder(HandlerRegistration handler, boolean localOnly, ContextInternal context) { this.context = context; this.handler = handler; - this.replyHandler = replyHandler; this.localOnly = localOnly; } @@ -76,10 +74,6 @@ public HandlerRegistration getHandler() { return handler; } - public boolean isReplyHandler() { - return replyHandler; - } - public boolean isLocalOnly() { return localOnly; } diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java index a1c15ea6c6d..11b1e30ff7d 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java @@ -20,13 +20,15 @@ import io.vertx.core.spi.tracing.VertxTracer; import io.vertx.core.tracing.TracingPolicy; +import java.util.function.Consumer; + public abstract class HandlerRegistration implements Closeable { public final ContextInternal context; public final EventBusImpl bus; public final String address; public final boolean src; - private HandlerHolder registered; + private Consumer> registered; private Object metric; HandlerRegistration(ContextInternal context, @@ -56,13 +58,13 @@ void receive(MessageImpl msg) { protected abstract void dispatch(Message msg, ContextInternal context, Handler> handler); - synchronized void register(String repliedAddress, boolean localOnly, Promise promise) { + synchronized void register(boolean broadcast, boolean localOnly, Promise promise) { if (registered != null) { throw new IllegalStateException(); } - registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, promise); + registered = bus.addRegistration(address, this, broadcast, localOnly, promise); if (bus.metrics != null) { - metric = bus.metrics.handlerRegistered(address, repliedAddress); + metric = bus.metrics.handlerRegistered(address); } } @@ -74,7 +76,7 @@ public Future unregister() { Promise promise = context.promise(); synchronized (this) { if (registered != null) { - bus.removeRegistration(registered, promise); + registered.accept(promise); registered = null; if (bus.metrics != null) { bus.metrics.handlerUnregistered(metric); diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java index 776cbe0800d..0c9d0429ca5 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java @@ -216,7 +216,7 @@ public synchronized MessageConsumer handler(Handler> h) { registered = true; Promise p = result; Promise registration = context.promise(); - register(null, localOnly, registration); + register(true, localOnly, registration); registration.future().onComplete(ar -> { if (ar.succeeded()) { p.tryComplete(); diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java index 41d4bb2be81..18f02b646f6 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java @@ -45,14 +45,8 @@ public synchronized MessageProducer deliveryOptions(DeliveryOptions options) @Override public Future write(T body) { - Promise promise = ((VertxInternal)vertx).getOrCreateContext().promise(); - write(body, promise); - return promise.future(); - } - - private void write(T data, Promise handler) { - MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), data, options.getCodecName()); - bus.sendOrPubInternal(msg, options, null, handler); + MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), body, options.getCodecName()); + return bus.sendOrPubInternal(msg, options, null); } @Override diff --git a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java index 657ae791543..29c26f40803 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java +++ b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java @@ -11,6 +11,7 @@ package io.vertx.core.eventbus.impl; import io.vertx.core.AsyncResult; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.eventbus.DeliveryOptions; @@ -25,31 +26,43 @@ import java.util.function.BiConsumer; -public class OutboundDeliveryContext extends DeliveryContextBase implements Handler> { +public class OutboundDeliveryContext extends DeliveryContextBase implements Promise { public final ContextInternal ctx; public final DeliveryOptions options; public final ReplyHandler replyHandler; - private final Promise writePromise; + public final Promise writePromise; private boolean src; EventBusImpl bus; EventBusMetrics metrics; - OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler replyHandler, Promise writePromise) { + OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler replyHandler) { super(message, message.bus.outboundInterceptors(), ctx); this.ctx = ctx; this.options = options; this.replyHandler = replyHandler; - this.writePromise = writePromise; + this.writePromise = ctx.promise(); } @Override - public void handle(AsyncResult event) { - written(event.cause()); + public boolean tryComplete(Void result) { + written(null); + return true; } - public void written(Throwable failure) { + @Override + public boolean tryFail(Throwable cause) { + written(cause); + return false; + } + + @Override + public Future future() { + throw new UnsupportedOperationException(); + } + + private void written(Throwable failure) { // Metrics if (metrics != null) { diff --git a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java index 649668090e1..2ce310eedde 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java +++ b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java @@ -83,7 +83,7 @@ protected boolean doReceive(Message reply) { } void register() { - register(repliedAddress, true, null); + register(false, false, null); } @Override diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index c8acb3855a4..03b3284c005 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -11,12 +11,10 @@ package io.vertx.core.eventbus.impl.clustered; -import io.vertx.core.Handler; -import io.vertx.core.MultiMap; -import io.vertx.core.Promise; -import io.vertx.core.VertxOptions; +import io.vertx.core.*; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.AddressHelper; +import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.EventBusOptions; import io.vertx.core.eventbus.MessageCodec; import io.vertx.core.eventbus.impl.CodecManager; @@ -24,7 +22,6 @@ import io.vertx.core.eventbus.impl.HandlerHolder; import io.vertx.core.eventbus.impl.HandlerRegistration; import io.vertx.core.eventbus.impl.MessageImpl; -import io.vertx.core.eventbus.impl.OutboundDeliveryContext; import io.vertx.core.impl.CloseFuture; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; @@ -158,76 +155,68 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers, @Override protected void onLocalRegistration(HandlerHolder handlerHolder, Promise promise) { - if (!handlerHolder.isReplyHandler()) { - RegistrationInfo registrationInfo = new RegistrationInfo( - nodeId, - handlerHolder.getSeq(), - handlerHolder.isLocalOnly() - ); - clusterManager.addRegistration(handlerHolder.getHandler().address, registrationInfo, Objects.requireNonNull(promise)); - } else if (promise != null) { - promise.complete(); - } + RegistrationInfo registrationInfo = new RegistrationInfo( + nodeId, + handlerHolder.getSeq(), + handlerHolder.isLocalOnly() + ); + clusterManager.addRegistration(handlerHolder.getHandler().address, registrationInfo, Objects.requireNonNull(promise)); } @Override - protected HandlerHolder createHandlerHolder(HandlerRegistration registration, boolean replyHandler, boolean localOnly, ContextInternal context) { - return new ClusteredHandlerHolder<>(registration, replyHandler, localOnly, context, handlerSequence.getAndIncrement()); + protected HandlerHolder createHandlerHolder(HandlerRegistration registration, boolean localOnly, ContextInternal context) { + return new ClusteredHandlerHolder<>(registration, localOnly, context, handlerSequence.getAndIncrement()); } @Override protected void onLocalUnregistration(HandlerHolder handlerHolder, Promise completionHandler) { - if (!handlerHolder.isReplyHandler()) { - RegistrationInfo registrationInfo = new RegistrationInfo( - nodeId, - handlerHolder.getSeq(), - handlerHolder.isLocalOnly() - ); - Promise promise = Promise.promise(); - clusterManager.removeRegistration(handlerHolder.getHandler().address, registrationInfo, promise); - promise.future().onComplete(completionHandler); - } else { - completionHandler.complete(); - } + RegistrationInfo registrationInfo = new RegistrationInfo( + nodeId, + handlerHolder.getSeq(), + handlerHolder.isLocalOnly() + ); + Promise promise = Promise.promise(); + clusterManager.removeRegistration(handlerHolder.getHandler().address, registrationInfo, promise); + promise.future().onComplete(completionHandler); } @Override - protected void sendOrPub(OutboundDeliveryContext sendContext) { - if (((ClusteredMessage) sendContext.message).getRepliedTo() != null) { - clusteredSendReply(((ClusteredMessage) sendContext.message).getRepliedTo(), sendContext); - } else if (sendContext.options.isLocalOnly()) { - super.sendOrPub(sendContext); + protected void sendOrPub(ContextInternal ctx, MessageImpl message, DeliveryOptions options, Promise writePromise) { + if (((ClusteredMessage) message).getRepliedTo() != null) { + clusteredSendReply(message, writePromise, ((ClusteredMessage) message).getRepliedTo()); + } else if (options.isLocalOnly()) { + sendLocally(message, writePromise); } else { - Serializer serializer = Serializer.get(sendContext.ctx); - if (sendContext.message.isSend()) { - Promise promise = sendContext.ctx.promise(); - serializer.queue(sendContext.message, nodeSelector::selectForSend, promise); + Serializer serializer = Serializer.get(ctx); + if (message.isSend()) { + Promise promise = ctx.promise(); + serializer.queue(message, nodeSelector::selectForSend, promise); promise.future().onComplete(ar -> { if (ar.succeeded()) { - sendToNode(sendContext, ar.result()); + sendToNode(ar.result(), message, writePromise); } else { - sendOrPublishFailed(sendContext, ar.cause()); + sendOrPublishFailed(writePromise, ar.cause()); } }); } else { - Promise> promise = sendContext.ctx.promise(); - serializer.queue(sendContext.message, nodeSelector::selectForPublish, promise); + Promise> promise = ctx.promise(); + serializer.queue(message, nodeSelector::selectForPublish, promise); promise.future().onComplete(ar -> { if (ar.succeeded()) { - sendToNodes(sendContext, ar.result()); + sendToNodes(ar.result(), message, writePromise); } else { - sendOrPublishFailed(sendContext, ar.cause()); + sendOrPublishFailed(writePromise, ar.cause()); } }); } } } - private void sendOrPublishFailed(OutboundDeliveryContext sendContext, Throwable cause) { + private void sendOrPublishFailed(Promise promise, Throwable cause) { if (log.isDebugEnabled()) { log.error("Failed to send message", cause); } - sendContext.written(cause); + promise.tryFail(cause); } @Override @@ -251,7 +240,7 @@ protected HandlerHolder nextHandler(ConcurrentCyclicSequence hand Iterator iterator = handlers.iterator(false); while (iterator.hasNext()) { HandlerHolder next = iterator.next(); - if (next.isReplyHandler() || !next.isLocalOnly()) { + if (!next.isLocalOnly()) { handlerHolder = next; break; } @@ -328,39 +317,39 @@ public void handle(Buffer buff) { }; } - private void sendToNode(OutboundDeliveryContext sendContext, String nodeId) { + private void sendToNode(String nodeId, MessageImpl message, Promise writePromise) { if (nodeId != null && !nodeId.equals(this.nodeId)) { - sendRemote(sendContext, nodeId, sendContext.message); + sendRemote(nodeId, message, writePromise); } else { - super.sendOrPub(sendContext); + sendLocally(message, writePromise); } } - private void sendToNodes(OutboundDeliveryContext sendContext, Iterable nodeIds) { + private void sendToNodes(Iterable nodeIds, MessageImpl message, Promise writePromise) { boolean sentRemote = false; if (nodeIds != null) { for (String nid : nodeIds) { if (!sentRemote) { sentRemote = true; } - sendToNode(sendContext, nid); + // Write promise might be completed several times!!!! + sendToNode(nid, message, writePromise); } } if (!sentRemote) { - super.sendOrPub(sendContext); + sendLocally(message, writePromise); } } - private void clusteredSendReply(String replyDest, OutboundDeliveryContext sendContext) { - MessageImpl message = sendContext.message; + private void clusteredSendReply(MessageImpl message, Promise writePromise, String replyDest) { if (!replyDest.equals(nodeId)) { - sendRemote(sendContext, replyDest, message); + sendRemote(replyDest, message, writePromise); } else { - super.sendOrPub(sendContext); + sendLocally(message, writePromise); } } - private void sendRemote(OutboundDeliveryContext sendContext, String remoteNodeId, MessageImpl message) { + private void sendRemote(String remoteNodeId, MessageImpl message, Promise writePromise) { // We need to deal with the fact that connecting can take some time and is async, and we cannot // block to wait for it. So we add any sends to a pending list if not connected yet. // Once we connect we send them. @@ -379,7 +368,7 @@ private void sendRemote(OutboundDeliveryContext sendContext, String remoteNod holder.connect(); } } - holder.writeMessage(sendContext); + holder.writeMessage(message, writePromise); } ConcurrentMap connections() { diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredHandlerHolder.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredHandlerHolder.java index 8940eff5184..c5703d86117 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredHandlerHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredHandlerHolder.java @@ -19,8 +19,8 @@ public class ClusteredHandlerHolder extends HandlerHolder { private final long seq; - public ClusteredHandlerHolder(HandlerRegistration handler, boolean replyHandler, boolean localOnly, ContextInternal context, long seq) { - super(handler, replyHandler, localOnly, context); + public ClusteredHandlerHolder(HandlerRegistration handler, boolean localOnly, ContextInternal context, long seq) { + super(handler, localOnly, context); this.seq = seq; } diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java index 07348e82440..342b0c2e0a3 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java @@ -14,7 +14,7 @@ import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.EventBusOptions; -import io.vertx.core.eventbus.impl.OutboundDeliveryContext; +import io.vertx.core.eventbus.impl.MessageImpl; import io.vertx.core.eventbus.impl.codecs.PingMessageCodec; import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.logging.Logger; @@ -41,7 +41,7 @@ class ConnectionHolder { private final VertxInternal vertx; private final EventBusMetrics metrics; - private Queue> pending; + private Queue pendingWrites; private NetSocket socket; private boolean connected; private long timeoutID = -1; @@ -70,21 +70,21 @@ void connect() { } // TODO optimise this (contention on monitor) - synchronized void writeMessage(OutboundDeliveryContext ctx) { + synchronized void writeMessage(MessageImpl message, Promise writePromise) { if (connected) { - Buffer data = ((ClusteredMessage) ctx.message).encodeToWire(); + Buffer data = ((ClusteredMessage) message).encodeToWire(); if (metrics != null) { - metrics.messageWritten(ctx.message.address(), data.length()); + metrics.messageWritten(message.address(), data.length()); } - socket.write(data).onComplete(ctx); + socket.write(data).onComplete(writePromise); } else { - if (pending == null) { + if (pendingWrites == null) { if (log.isDebugEnabled()) { log.debug("Not connected to server " + remoteNodeId + " - starting queuing"); } - pending = new ArrayDeque<>(); + pendingWrites = new ArrayDeque<>(); } - pending.add(ctx); + pendingWrites.add(new MessageWrite(message, writePromise)); } } @@ -100,10 +100,10 @@ private void close(Throwable cause) { vertx.cancelTimer(pingTimeoutID); } synchronized (this) { - OutboundDeliveryContext msg; - if (pending != null) { - while ((msg = pending.poll()) != null) { - msg.written(cause); + MessageWrite msg; + if (pendingWrites != null) { + while ((msg = pendingWrites.poll()) != null) { + msg.writePromise.tryFail(cause); } } } @@ -146,18 +146,27 @@ private synchronized void connected(NetSocket socket) { }); // Start a pinger schedulePing(); - if (pending != null) { + if (pendingWrites != null) { if (log.isDebugEnabled()) { log.debug("Draining the queue for server " + remoteNodeId); } - for (OutboundDeliveryContext ctx : pending) { + for (MessageWrite ctx : pendingWrites) { Buffer data = ((ClusteredMessage)ctx.message).encodeToWire(); if (metrics != null) { metrics.messageWritten(ctx.message.address(), data.length()); } - socket.write(data).onComplete(ctx); + socket.write(data).onComplete(ctx.writePromise); } } - pending = null; + pendingWrites = null; + } + + private static class MessageWrite { + final MessageImpl message; + final Promise writePromise; + MessageWrite(MessageImpl message, Promise writePromise) { + this.message = message; + this.writePromise = writePromise; + } } } diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java index 145011725b9..08d0e403ae8 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java @@ -60,7 +60,7 @@ public static Serializer get(ContextInternal context) { return serializer; } - public void queue(Message message, BiConsumer, Promise> selectHandler, Promise promise) { + public void queue(Message message, BiConsumer> selectHandler, Promise promise) { ctx.emit(v -> { String address = message.address(); SerializerQueue queue = queues.computeIfAbsent(address, SerializerQueue::new); @@ -110,7 +110,7 @@ void checkPending() { } } - void add(Message msg, BiConsumer, Promise> selectHandler, Promise promise) { + void add(Message msg, BiConsumer> selectHandler, Promise promise) { SerializedTask serializedTask = new SerializedTask<>(ctx, msg, selectHandler); Future fut = serializedTask.internalPromise.future(); fut.onComplete(promise); @@ -136,20 +136,20 @@ void close() { private class SerializedTask implements Handler> { final Message msg; - final BiConsumer, Promise> selectHandler; + final BiConsumer> selectHandler; final Promise internalPromise; SerializedTask( ContextInternal context, Message msg, - BiConsumer, Promise> selectHandler) { + BiConsumer> selectHandler) { this.msg = msg; this.selectHandler = selectHandler; this.internalPromise = context.promise(); } void process() { - selectHandler.accept(msg, internalPromise); + selectHandler.accept(msg.address(), internalPromise); } @Override diff --git a/src/main/java/io/vertx/core/spi/cluster/NodeSelector.java b/src/main/java/io/vertx/core/spi/cluster/NodeSelector.java index 062322c24cf..f767a029a6e 100644 --- a/src/main/java/io/vertx/core/spi/cluster/NodeSelector.java +++ b/src/main/java/io/vertx/core/spi/cluster/NodeSelector.java @@ -13,7 +13,6 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; -import io.vertx.core.eventbus.Message; import io.vertx.core.impl.VertxBuilder; import io.vertx.core.spi.VertxServiceProvider; @@ -47,20 +46,16 @@ default void init(VertxBuilder builder) { * *

The provided {@code promise} needs to be completed with {@link Promise#tryComplete} and {@link Promise#tryFail} * as it might completed outside the selector. - * - * @throws IllegalArgumentException if {@link Message#isSend()} returns {@code false} */ - void selectForSend(Message message, Promise promise); + void selectForSend(String address, Promise promise); /** * Select a node for publishing the given {@code message}. * *

The provided {@code promise} needs to be completed with {@link Promise#tryComplete} and {@link Promise#tryFail} * as it might completed outside the selector. - * - * @throws IllegalArgumentException if {@link Message#isSend()} returns {@code true} */ - void selectForPublish(Message message, Promise> promise); + void selectForPublish(String address, Promise> promise); /** * Invoked by the {@link ClusterManager} when messaging handler registrations are added or removed. diff --git a/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java b/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java index 6198477d340..72007135316 100644 --- a/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java +++ b/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java @@ -13,8 +13,6 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; -import io.vertx.core.eventbus.Message; -import io.vertx.core.impl.Arguments; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.NodeSelector; import io.vertx.core.spi.cluster.RegistrationUpdateEvent; @@ -37,17 +35,15 @@ public void eventBusStarted() { } @Override - public void selectForSend(Message message, Promise promise) { - Arguments.require(message.isSend(), "selectForSend used for publishing"); - selectors.withSelector(message, promise, (prom, selector) -> { + public void selectForSend(String address, Promise promise) { + selectors.withSelector(address, promise, (prom, selector) -> { prom.tryComplete(selector.selectForSend()); }); } @Override - public void selectForPublish(Message message, Promise> promise) { - Arguments.require(!message.isSend(), "selectForPublish used for sending"); - selectors.withSelector(message, promise, (prom, selector) -> { + public void selectForPublish(String address, Promise> promise) { + selectors.withSelector(address, promise, (prom, selector) -> { prom.tryComplete(selector.selectForPublish()); }); } diff --git a/src/main/java/io/vertx/core/spi/cluster/impl/selector/Selectors.java b/src/main/java/io/vertx/core/spi/cluster/impl/selector/Selectors.java index 2244e85cc6f..0e5423e0153 100644 --- a/src/main/java/io/vertx/core/spi/cluster/impl/selector/Selectors.java +++ b/src/main/java/io/vertx/core/spi/cluster/impl/selector/Selectors.java @@ -35,8 +35,7 @@ public Selectors(ClusterManager clusterManager) { this.clusterManager = clusterManager; } - public void withSelector(Message message, Promise promise, BiConsumer, RoundRobinSelector> task) { - String address = message.address(); + public void withSelector(String address, Promise promise, BiConsumer, RoundRobinSelector> task) { SelectorEntry entry = map.compute(address, (addr, curr) -> { return curr == null ? new SelectorEntry() : (curr.isNotReady() ? curr.increment() : curr); }); diff --git a/src/main/java/io/vertx/core/spi/metrics/EventBusMetrics.java b/src/main/java/io/vertx/core/spi/metrics/EventBusMetrics.java index 8f92ddaf042..bd34e0186b7 100644 --- a/src/main/java/io/vertx/core/spi/metrics/EventBusMetrics.java +++ b/src/main/java/io/vertx/core/spi/metrics/EventBusMetrics.java @@ -23,13 +23,12 @@ public interface EventBusMetrics extends Metrics { /** * Called when a handler is registered on the event bus.

- * + *

* No specific thread and context can be expected when this method is called. * * @param address the address used to register the handler - * @param repliedAddress null when the handler is not a reply handler, otherwise the address this handler is replying to */ - default H handlerRegistered(String address, String repliedAddress) { + default H handlerRegistered(String address) { return null; } diff --git a/src/test/java/io/vertx/core/eventbus/CustomNodeSelectorTest.java b/src/test/java/io/vertx/core/eventbus/CustomNodeSelectorTest.java index 9b1645d09d2..f2762d0db70 100644 --- a/src/test/java/io/vertx/core/eventbus/CustomNodeSelectorTest.java +++ b/src/test/java/io/vertx/core/eventbus/CustomNodeSelectorTest.java @@ -105,12 +105,12 @@ public void eventBusStarted() { } @Override - public void selectForSend(Message message, Promise promise) { + public void selectForSend(String address, Promise promise) { promise.fail("Not implemented"); } @Override - public void selectForPublish(Message message, Promise> promise) { + public void selectForPublish(String address, Promise> promise) { List nodes = clusterManager.getNodes(); CompositeFuture future = nodes.stream() .map(nodeId -> { diff --git a/src/test/java/io/vertx/core/eventbus/MessageQueueOnWorkerThreadTest.java b/src/test/java/io/vertx/core/eventbus/MessageQueueOnWorkerThreadTest.java index 2cc6bb01db2..990c8264c28 100644 --- a/src/test/java/io/vertx/core/eventbus/MessageQueueOnWorkerThreadTest.java +++ b/src/test/java/io/vertx/core/eventbus/MessageQueueOnWorkerThreadTest.java @@ -89,7 +89,7 @@ public void eventBusStarted() { } @Override - public void selectForSend(Message message, Promise promise) { + public void selectForSend(String address, Promise promise) { try { NANOSECONDS.sleep(150); } catch (InterruptedException e) { @@ -99,7 +99,7 @@ public void selectForSend(Message message, Promise promise) { } @Override - public void selectForPublish(Message message, Promise> promise) { + public void selectForPublish(String address, Promise> promise) { throw new UnsupportedOperationException(); } diff --git a/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java b/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java index be2b2b02835..92dd56431d3 100644 --- a/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java +++ b/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java @@ -38,12 +38,12 @@ public void test() { .setPort(0); NodeSelector nodeSelector = new DefaultNodeSelector() { @Override - public void selectForSend(Message message, Promise promise) { + public void selectForSend(String address, Promise promise) { promise.fail(cause); } @Override - public void selectForPublish(Message message, Promise> promise) { + public void selectForPublish(String address, Promise> promise) { promise.fail("Not implemented"); } }; diff --git a/src/test/java/io/vertx/core/spi/cluster/WrappedNodeSelector.java b/src/test/java/io/vertx/core/spi/cluster/WrappedNodeSelector.java index 76e2597863f..479d6a98a8e 100644 --- a/src/test/java/io/vertx/core/spi/cluster/WrappedNodeSelector.java +++ b/src/test/java/io/vertx/core/spi/cluster/WrappedNodeSelector.java @@ -34,13 +34,13 @@ public void eventBusStarted() { } @Override - public void selectForSend(Message message, Promise promise) { - delegate.selectForSend(message, promise); + public void selectForSend(String address, Promise promise) { + delegate.selectForSend(address, promise); } @Override - public void selectForPublish(Message message, Promise> promise) { - delegate.selectForPublish(message, promise); + public void selectForPublish(String address, Promise> promise) { + delegate.selectForPublish(address, promise); } @Override diff --git a/src/test/java/io/vertx/core/spi/metrics/MetricsContextTest.java b/src/test/java/io/vertx/core/spi/metrics/MetricsContextTest.java index 22709d5acf3..83f67e728c7 100644 --- a/src/test/java/io/vertx/core/spi/metrics/MetricsContextTest.java +++ b/src/test/java/io/vertx/core/spi/metrics/MetricsContextTest.java @@ -827,13 +827,12 @@ private void testMessageHandler(BiConsumer> runOnContext) { AtomicReference deliveredThread = new AtomicReference<>(); AtomicBoolean registeredCalled = new AtomicBoolean(); AtomicBoolean unregisteredCalled = new AtomicBoolean(); - AtomicBoolean messageDelivered = new AtomicBoolean(); VertxMetricsFactory factory = (options) -> new DummyVertxMetrics() { @Override public EventBusMetrics createEventBusMetrics() { return new DummyEventBusMetrics() { @Override - public Void handlerRegistered(String address, String repliedAddress) { + public Void handlerRegistered(String address) { registeredCalled.set(true); return null; } diff --git a/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java b/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java index bdfbe0cebec..9670089adbf 100644 --- a/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java +++ b/src/test/java/io/vertx/core/spi/metrics/MetricsTest.java @@ -342,7 +342,6 @@ public void testHandlerRegistration() throws Exception { assertEquals(1, metrics.getRegistrations().size()); HandlerMetric registration = metrics.getRegistrations().get(0); assertEquals(ADDRESS1, registration.address); - assertEquals(null, registration.repliedAddress); consumer.unregister().onComplete(onSuccess(v1 -> { assertEquals(0, metrics.getRegistrations().size()); consumer.unregister().onComplete(onSuccess(v2 -> { @@ -397,7 +396,6 @@ private void testHandlerProcessMessage(Vertx from, Vertx to, int expectedLocalCo to.eventBus().consumer(ADDRESS1, msg -> { HandlerMetric registration = assertRegistration(metrics); assertEquals(ADDRESS1, registration.address); - assertEquals(null, registration.repliedAddress); assertEquals(1, registration.scheduleCount.get()); assertEquals(expectedLocalCount, registration.localScheduleCount.get()); assertEquals(1, registration.deliveredCount.get()); @@ -416,7 +414,6 @@ private void testHandlerProcessMessage(Vertx from, Vertx to, int expectedLocalCo } HandlerMetric registration = assertRegistration(metrics); assertEquals(ADDRESS1, registration.address); - assertEquals(null, registration.repliedAddress); from.eventBus().request(ADDRESS1, "ping").onComplete(onSuccess(reply -> { assertEquals(1, registration.scheduleCount.get()); // This might take a little time @@ -438,7 +435,6 @@ public void testHandlerMetricReply() throws Exception { assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address); assertWaitUntil(() -> metrics.getRegistrations().size() == 2); HandlerMetric registration = metrics.getRegistrations().get(1); - assertEquals(ADDRESS1, registration.repliedAddress); assertEquals(0, registration.scheduleCount.get()); assertEquals(0, registration.deliveredCount.get()); assertEquals(0, registration.localDeliveredCount.get()); @@ -452,13 +448,11 @@ public void testHandlerMetricReply() throws Exception { vertx.eventBus().request(ADDRESS1, "ping").onComplete(onSuccess(reply -> { assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address); HandlerMetric registration = replyRegistration.get(); - assertEquals(ADDRESS1, registration.repliedAddress); assertEquals(1, registration.scheduleCount.get()); assertEquals(1, registration.deliveredCount.get()); assertEquals(1, registration.localDeliveredCount.get()); vertx.runOnContext(v -> { assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address); - assertEquals(ADDRESS1, registration.repliedAddress); assertEquals(1, registration.scheduleCount.get()); assertEquals(1, registration.deliveredCount.get()); assertEquals(1, registration.localDeliveredCount.get()); diff --git a/src/test/java/io/vertx/test/fakemetrics/FakeEventBusMetrics.java b/src/test/java/io/vertx/test/fakemetrics/FakeEventBusMetrics.java index 72285390fc9..9151abe72a1 100644 --- a/src/test/java/io/vertx/test/fakemetrics/FakeEventBusMetrics.java +++ b/src/test/java/io/vertx/test/fakemetrics/FakeEventBusMetrics.java @@ -76,8 +76,8 @@ public int getDecodedBytes(String address) { } @Override - public HandlerMetric handlerRegistered(String address, String repliedAddress) { - HandlerMetric registration = new HandlerMetric(address, repliedAddress); + public HandlerMetric handlerRegistered(String address) { + HandlerMetric registration = new HandlerMetric(address); registrations.add(registration); return registration; } diff --git a/src/test/java/io/vertx/test/fakemetrics/HandlerMetric.java b/src/test/java/io/vertx/test/fakemetrics/HandlerMetric.java index 293b36a69ce..c93f15c3c79 100644 --- a/src/test/java/io/vertx/test/fakemetrics/HandlerMetric.java +++ b/src/test/java/io/vertx/test/fakemetrics/HandlerMetric.java @@ -19,21 +19,19 @@ public class HandlerMetric { public final String address; - public final String repliedAddress; public final AtomicInteger scheduleCount = new AtomicInteger(); public final AtomicInteger localScheduleCount = new AtomicInteger(); public final AtomicInteger discardCount = new AtomicInteger(); public final AtomicInteger deliveredCount = new AtomicInteger(); public final AtomicInteger localDeliveredCount = new AtomicInteger(); - public HandlerMetric(String address, String repliedAddress) { + public HandlerMetric(String address) { this.address = address; - this.repliedAddress = repliedAddress; } @Override public String toString() { - return "HandlerRegistration[address=" + address + ",repliedAddress=" + repliedAddress + + return "HandlerRegistration[address=" + address + ",deliveredCount=" + deliveredCount.get() + ",discardCount=" + discardCount + ",localCount=" + localDeliveredCount.get() + "]"; } }