Skip to content
5 changes: 4 additions & 1 deletion src/main/java/io/vertx/core/eventbus/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.impl.DefaultSerializableChecker;
Expand Down Expand Up @@ -199,6 +198,10 @@ default <T> Future<Message<T>> request(String address, @Nullable Object message)
*/
<T> MessageProducer<T> publisher(String address, DeliveryOptions options);

Future<Void> bindStream(String address, Handler<MessageStream> handler);

Future<MessageStream> connectStream(String address);

/**
* Register a message codec.
* <p>
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/core/eventbus/MessageStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2011-2021 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.eventbus;

import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Handler;

@VertxGen
public interface MessageStream {

void handler(Handler<Message<String>> handler);

void endHandler(Handler<Void> handler);

void write(String msg);

void end();

}
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/ClientStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.Promise;
import io.vertx.core.eventbus.MessageStream;
import io.vertx.core.impl.ContextInternal;

class ClientStream extends StreamBase {

private final Promise<MessageStream> promise2;

public ClientStream(EventBusImpl eventBus, String sourceAddress, ContextInternal ctx, Promise<MessageStream> promise2) {
super(sourceAddress, ctx, eventBus, sourceAddress, true);
this.promise2 = promise2;
}

@Override
protected boolean doReceive(Frame frame) {
if (frame instanceof SynFrame) {
SynFrame syn = (SynFrame) frame;
remoteAddress = syn.src;
promise2.complete(this);
return true;
} else {
return super.doReceive(frame);
}
}
}
153 changes: 104 additions & 49 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -116,15 +117,15 @@ public EventBus send(String address, Object message) {
@Override
public EventBus send(String address, Object message, DeliveryOptions options) {
MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
sendOrPubInternal(msg, options, null, null);
sendOrPubInternal(msg, options, null);
return this;
}

@Override
public <T> Future<Message<T>> request(String address, Object message, DeliveryOptions options) {
MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
ReplyHandler<T> handler = createReplyHandler(msg, true, options);
sendOrPubInternal(msg, options, handler, null);
sendOrPubInternal(msg, options, handler);
return handler.result();
}

Expand Down Expand Up @@ -154,14 +155,39 @@ public <T> MessageProducer<T> publisher(String address, DeliveryOptions options)
return new MessageProducerImpl<>(vertx, address, false, options);
}

@Override
public Future<Void> bindStream(String address, Handler<MessageStream> handler) {
ContextInternal ctx = vertx.getOrCreateContext();
HandlerRegistration reg = new StreamServer(this, ctx, address, handler);
Promise<Void> promise = ctx.promise();
reg.register(true, false, promise);
return promise.future();
}

@Override
public Future<MessageStream> connectStream(String address) {
ContextInternal ctx = vertx.getOrCreateContext();
String sourceAddress = generateReplyAddress();
Promise<MessageStream> promise2 = ctx.promise();
StreamBase reg = new ClientStream(this, sourceAddress, ctx, promise2);
Promise<Void> promise = ctx.promise();
reg.register(false, false, promise);
promise.future().onComplete(ar -> {
if (ar.succeeded()) {
sendLocally(new SynFrame(sourceAddress, address), ctx.promise());
}
});
return promise2.future();
}

@Override
public EventBus publish(String address, Object message) {
return publish(address, message, new DeliveryOptions());
}

@Override
public EventBus publish(String address, Object message, DeliveryOptions options) {
sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null, null);
sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null);
return this;
}

Expand Down Expand Up @@ -257,10 +283,18 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers,
return msg;
}

protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly, Promise<Void> promise) {
HandlerHolder<T> holder = addLocalRegistration(address, registration, replyHandler, localOnly);
onLocalRegistration(holder, promise);
return holder;
protected <T> Consumer<Promise<Void>> addRegistration(String address, HandlerRegistration<T> registration, boolean broadcast, boolean localOnly, Promise<Void> promise) {
HandlerHolder<T> holder = addLocalRegistration(address, registration, localOnly);
if (broadcast) {
onLocalRegistration(holder, promise);
} else {
if (promise != null) {
promise.complete();
}
}
return p -> {
removeRegistration(holder, broadcast, p);
};
}

protected <T> void onLocalRegistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
Expand All @@ -270,12 +304,12 @@ protected <T> void onLocalRegistration(HandlerHolder<T> handlerHolder, Promise<V
}

private <T> HandlerHolder<T> addLocalRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
boolean localOnly) {
Objects.requireNonNull(address, "address");

ContextInternal context = registration.context;

HandlerHolder<T> holder = createHandlerHolder(registration, replyHandler, localOnly, context);
HandlerHolder<T> holder = createHandlerHolder(registration, localOnly, context);

ConcurrentCyclicSequence<HandlerHolder> handlers = new ConcurrentCyclicSequence<HandlerHolder>().add(holder);
ConcurrentCyclicSequence<HandlerHolder> actualHandlers = handlerMap.merge(
Expand All @@ -290,13 +324,17 @@ private <T> HandlerHolder<T> addLocalRegistration(String address, HandlerRegistr
return holder;
}

protected <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly, ContextInternal context) {
return new HandlerHolder<>(registration, replyHandler, localOnly, context);
protected <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> registration, boolean localOnly, ContextInternal context) {
return new HandlerHolder<>(registration, localOnly, context);
}

protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, boolean broadcast, Promise<Void> promise) {
removeLocalRegistration(handlerHolder);
onLocalUnregistration(handlerHolder, promise);
if (broadcast) {
onLocalUnregistration(handlerHolder, promise);
} else {
promise.complete();
}
}

protected <T> void onLocalUnregistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
Expand All @@ -321,59 +359,74 @@ protected <T> void sendReply(MessageImpl replyMessage, DeliveryOptions options,
if (replyMessage.address() == null) {
throw new IllegalStateException("address not specified");
} else {
sendOrPubInternal(new OutboundDeliveryContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler, null));
sendOrPubInternal(new OutboundDeliveryContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler));
}
}

protected <T> void sendOrPub(ContextInternal ctx, MessageImpl<?, T> message, DeliveryOptions options, Promise<Void> writePromise) {
sendLocally(message, writePromise);
}

protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
sendLocally(sendContext);
sendOrPub(sendContext.ctx, sendContext.message, sendContext.options, sendContext);
}

private <T> void sendLocally(OutboundDeliveryContext<T> sendContext) {
ReplyException failure = deliverMessageLocally(sendContext.message);
protected <T> void sendLocally(Frame message, Promise<Void> writePromise) {
ReplyException failure = deliverMessageLocally(message);
if (failure != null) {
sendContext.written(failure);
writePromise.tryFail(failure);
} else {
sendContext.written(null);
writePromise.tryComplete();
}
}

protected boolean isMessageLocal(MessageImpl msg) {
protected boolean isMessageLocal(Frame msg) {
return true;
}

protected ReplyException deliverMessageLocally(MessageImpl msg) {
ConcurrentCyclicSequence<HandlerHolder> handlers = handlerMap.get(msg.address());
boolean messageLocal = isMessageLocal(msg);
if (handlers != null) {
if (msg.isSend()) {
//Choose one
HandlerHolder holder = nextHandler(handlers, messageLocal);
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, holder != null ? 1 : 0);
}
if (holder != null) {
holder.handler.receive(msg.copyBeforeReceive());
protected ReplyException deliverMessageLocally(Frame frame) {
ConcurrentCyclicSequence<HandlerHolder> handlers = handlerMap.get(frame.address());
boolean messageLocal = isMessageLocal(frame);
if (frame instanceof MessageImpl) {
MessageImpl msg = (MessageImpl) frame;
if (handlers != null) {
if (msg.isSend()) {
//Choose one
HandlerHolder holder = nextHandler(handlers, messageLocal);
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, holder != null ? 1 : 0);
}
if (holder != null) {
holder.handler.receive(msg.copyBeforeReceive());
} else {
// RACY issue !!!!!
}
} else {
// RACY issue !!!!!
// Publish
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, handlers.size());
}
for (HandlerHolder holder: handlers) {
if (messageLocal || !holder.isLocalOnly()) {
holder.handler.receive(msg.copyBeforeReceive());
}
}
}
return null;
} else {
// Publish
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, handlers.size());
}
for (HandlerHolder holder: handlers) {
if (messageLocal || !holder.isLocalOnly()) {
holder.handler.receive(msg.copyBeforeReceive());
}
metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, 0);
}
return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
}
return null;
} else {
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, 0);
if (handlers != null) {
HandlerHolder holder = nextHandler(handlers, messageLocal);
holder.handler.receive(frame);
return null;
} else {
return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + frame.address());
}
return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
}
}

Expand Down Expand Up @@ -403,8 +456,8 @@ <T> ReplyHandler<T> createReplyHandler(MessageImpl message,
}

public <T> OutboundDeliveryContext<T> newSendContext(MessageImpl message, DeliveryOptions options,
ReplyHandler<T> handler, Promise<Void> writePromise) {
return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler, writePromise);
ReplyHandler<T> handler) {
return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler);
}

public <T> void sendOrPubInternal(OutboundDeliveryContext<T> senderCtx) {
Expand All @@ -414,10 +467,12 @@ public <T> void sendOrPubInternal(OutboundDeliveryContext<T> senderCtx) {
senderCtx.next();
}

public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
ReplyHandler<T> handler, Promise<Void> writePromise) {
public <T> Future<Void> sendOrPubInternal(MessageImpl message, DeliveryOptions options,
ReplyHandler<T> handler) {
checkStarted();
sendOrPubInternal(newSendContext(message, options, handler, writePromise));
OutboundDeliveryContext<T> ctx = newSendContext(message, options, handler);
sendOrPubInternal(ctx);
return ctx.writePromise.future();
}

private Future<Void> unregisterAll() {
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/FinFrame.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.buffer.Buffer;

class FinFrame implements Frame {

final String addr;

public FinFrame(String addr) {
this.addr = addr;
}

@Override
public String address() {
return addr;
}

@Override
public Buffer encodeToWire() {
throw new UnsupportedOperationException();
}

@Override
public boolean isFromWire() {
return false;
}
}
23 changes: 23 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/Frame.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.eventbus.impl;

import io.vertx.core.buffer.Buffer;

public interface Frame {

String address();

Buffer encodeToWire();

boolean isFromWire();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an Enum FrameSource?


}
Loading