Skip to content

Commit 54ba6db

Browse files
committed
Basic stream impl
1 parent 11e9b67 commit 54ba6db

File tree

13 files changed

+413
-4
lines changed

13 files changed

+413
-4
lines changed

src/main/java/io/vertx/core/eventbus/EventBus.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import io.vertx.codegen.annotations.GenIgnore;
1616
import io.vertx.codegen.annotations.Nullable;
1717
import io.vertx.codegen.annotations.VertxGen;
18-
import io.vertx.core.AsyncResult;
1918
import io.vertx.core.Future;
2019
import io.vertx.core.Handler;
2120
import io.vertx.core.eventbus.impl.DefaultSerializableChecker;
@@ -199,6 +198,10 @@ default <T> Future<Message<T>> request(String address, @Nullable Object message)
199198
*/
200199
<T> MessageProducer<T> publisher(String address, DeliveryOptions options);
201200

201+
Future<Void> bindStream(String address, Handler<MessageStream> handler);
202+
203+
Future<MessageStream> connectStream(String address);
204+
202205
/**
203206
* Register a message codec.
204207
* <p>
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2011-2021 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
package io.vertx.core.eventbus;
12+
13+
import io.vertx.codegen.annotations.VertxGen;
14+
import io.vertx.core.Handler;
15+
16+
@VertxGen
17+
public interface MessageStream {
18+
19+
void handler(Handler<Message<String>> handler);
20+
21+
void endHandler(Handler<Void> handler);
22+
23+
void write(String msg);
24+
25+
void end();
26+
27+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.vertx.core.eventbus.impl;
2+
3+
import io.vertx.core.Handler;
4+
import io.vertx.core.Promise;
5+
import io.vertx.core.eventbus.Message;
6+
import io.vertx.core.eventbus.MessageStream;
7+
import io.vertx.core.impl.ContextInternal;
8+
9+
import java.util.concurrent.TimeoutException;
10+
11+
class ClientStream extends StreamBase implements Handler<Long> {
12+
13+
private final Promise<MessageStream> promise2;
14+
private final long timeoutID;
15+
16+
public ClientStream(EventBusImpl eventBus, String sourceAddress, ContextInternal ctx, Promise<MessageStream> promise2) {
17+
super(sourceAddress, ctx, eventBus, sourceAddress, true);
18+
this.promise2 = promise2;
19+
this.timeoutID = ctx.setTimer(3_000, this);
20+
}
21+
22+
@Override
23+
public void handle(Long event) {
24+
unregister();
25+
promise2.fail(new TimeoutException());
26+
}
27+
28+
@Override
29+
protected boolean doReceive(Message msg) {
30+
if (msg.body() instanceof SynFrame) {
31+
if (context.owner().cancelTimer(timeoutID)) {
32+
base = (MessageImpl) msg;
33+
SynFrame syn = (SynFrame) msg.body();
34+
remoteAddress = syn.src;
35+
promise2.complete(this);
36+
}
37+
return true;
38+
} else {
39+
if (base != null) {
40+
return super.doReceive(msg);
41+
} else {
42+
if (context.owner().cancelTimer(timeoutID)) {
43+
unregister();
44+
promise2.fail(new IllegalStateException());
45+
}
46+
return true;
47+
}
48+
}
49+
}
50+
}

src/main/java/io/vertx/core/eventbus/impl/CodecManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public CodecManager() {
6363
this.systemCodecs = codecs(NULL_MESSAGE_CODEC, PING_MESSAGE_CODEC, STRING_MESSAGE_CODEC, BUFFER_MESSAGE_CODEC, JSON_OBJECT_MESSAGE_CODEC, JSON_ARRAY_MESSAGE_CODEC,
6464
BYTE_ARRAY_MESSAGE_CODEC, INT_MESSAGE_CODEC, LONG_MESSAGE_CODEC, FLOAT_MESSAGE_CODEC, DOUBLE_MESSAGE_CODEC,
6565
BOOLEAN_MESSAGE_CODEC, SHORT_MESSAGE_CODEC, CHAR_MESSAGE_CODEC, BYTE_MESSAGE_CODEC, REPLY_EXCEPTION_MESSAGE_CODEC,
66-
clusterSerializableCodec, serializableCodec);
66+
clusterSerializableCodec, serializableCodec, SynFrame.CODEC, FinFrame.CODEC);
6767
}
6868

6969
public MessageCodec lookupCodec(Object body, String codecName, boolean local) {
@@ -98,6 +98,10 @@ public MessageCodec lookupCodec(Object body, String codecName, boolean local) {
9898
codec = CHAR_MESSAGE_CODEC;
9999
} else if (body instanceof Byte) {
100100
codec = BYTE_MESSAGE_CODEC;
101+
} else if (body instanceof SynFrame) {
102+
codec = SynFrame.CODEC;
103+
} else if (body instanceof FinFrame) {
104+
codec = FinFrame.CODEC;
101105
} else if (body instanceof ReplyException) {
102106
codec = defaultCodecMap.get(body.getClass());
103107
if (codec == null) {

src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,33 @@ public <T> MessageProducer<T> publisher(String address, DeliveryOptions options)
155155
return new MessageProducerImpl<>(vertx, address, false, options);
156156
}
157157

158+
@Override
159+
public Future<Void> bindStream(String address, Handler<MessageStream> handler) {
160+
ContextInternal ctx = vertx.getOrCreateContext();
161+
HandlerRegistration reg = new StreamServer(this, ctx, address, handler);
162+
Promise<Void> promise = ctx.promise();
163+
reg.register(true, false, promise);
164+
return promise.future();
165+
}
166+
167+
@Override
168+
public Future<MessageStream> connectStream(String address) {
169+
ContextInternal ctx = vertx.getOrCreateContext();
170+
String sourceAddress = generateReplyAddress();
171+
Promise<MessageStream> promise2 = ctx.promise();
172+
StreamBase reg = new ClientStream(this, sourceAddress, ctx, promise2);
173+
Promise<Void> promise = ctx.promise();
174+
reg.register(false, false, promise);
175+
promise.future().onComplete(ar -> {
176+
if (ar.succeeded()) {
177+
MessageImpl msg = createMessage(true, address, MultiMap.caseInsensitiveMultiMap(), new SynFrame(sourceAddress, address), null);
178+
msg.setReplyAddress(sourceAddress);
179+
sendOrPub(ctx, msg, new DeliveryOptions(), ctx.promise());
180+
}
181+
});
182+
return promise2.future();
183+
}
184+
158185
@Override
159186
public EventBus publish(String address, Object message) {
160187
return publish(address, message, new DeliveryOptions());
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.vertx.core.eventbus.impl;
2+
3+
import io.netty.util.CharsetUtil;
4+
import io.vertx.core.buffer.Buffer;
5+
import io.vertx.core.eventbus.MessageCodec;
6+
7+
class FinFrame implements Frame {
8+
9+
public static final MessageCodec<FinFrame, FinFrame> CODEC = new MessageCodec<>() {
10+
@Override
11+
public void encodeToWire(Buffer buffer, FinFrame synFrame) {
12+
byte[] strBytes = synFrame.addr.getBytes(CharsetUtil.UTF_8);
13+
buffer.appendInt(strBytes.length);
14+
buffer.appendBytes(strBytes);
15+
}
16+
17+
@Override
18+
public FinFrame decodeFromWire(int pos, Buffer buffer) {
19+
int length = buffer.getInt(pos);
20+
pos += 4;
21+
byte[] bytes = buffer.getBytes(pos, pos + length);
22+
String src = new String(bytes, CharsetUtil.UTF_8);
23+
return new FinFrame(src);
24+
}
25+
26+
@Override
27+
public FinFrame transform(FinFrame finFrame) {
28+
return finFrame;
29+
}
30+
31+
@Override
32+
public String name() {
33+
return "frame.fin";
34+
}
35+
36+
@Override
37+
public byte systemCodecID() {
38+
return 19;
39+
}
40+
};
41+
42+
final String addr;
43+
44+
public FinFrame(String addr) {
45+
this.addr = addr;
46+
}
47+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package io.vertx.core.eventbus.impl;
2+
3+
public interface Frame {
4+
}

src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions op
118118
}
119119
}
120120

121-
protected MessageImpl createReply(Object message, DeliveryOptions options) {
121+
public MessageImpl createReply(Object message, DeliveryOptions options) {
122122
MessageImpl reply = bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName());
123123
reply.trace = trace;
124124
return reply;
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package io.vertx.core.eventbus.impl;
2+
3+
import io.vertx.core.Handler;
4+
import io.vertx.core.MultiMap;
5+
import io.vertx.core.eventbus.DeliveryOptions;
6+
import io.vertx.core.eventbus.Message;
7+
import io.vertx.core.eventbus.MessageStream;
8+
import io.vertx.core.impl.ContextInternal;
9+
10+
class StreamBase extends HandlerRegistration implements MessageStream {
11+
12+
MessageImpl base;
13+
private Handler<Message<String>> handler;
14+
private Handler<Void> endHandler;
15+
final String localAddress;
16+
String remoteAddress;
17+
private boolean halfClosed;
18+
19+
StreamBase(String localAddress, ContextInternal context, EventBusImpl bus, String address, boolean src) {
20+
super(context, bus, address, src);
21+
this.localAddress = localAddress;
22+
}
23+
24+
@Override
25+
protected boolean doReceive(Message msg) {
26+
if (msg.body() instanceof FinFrame) {
27+
Handler<Void> h = endHandler;
28+
if (h != null) {
29+
h.handle(null);
30+
}
31+
if (halfClosed) {
32+
unregister();
33+
} else {
34+
halfClosed = true;
35+
}
36+
return true;
37+
} else {
38+
Handler<Message<String>> h = handler;
39+
if (h != null) {
40+
h.handle(msg);
41+
}
42+
return true;
43+
}
44+
}
45+
46+
@Override
47+
protected void dispatch(Message msg, ContextInternal context, Handler handler) {
48+
49+
}
50+
51+
@Override
52+
public void handler(Handler<Message<String>> handler) {
53+
this.handler = handler;
54+
}
55+
56+
@Override
57+
public void endHandler(Handler<Void> handler) {
58+
this.endHandler = handler;
59+
}
60+
61+
@Override
62+
public void write(String body) {
63+
MessageImpl msg = base.createReply(body, new DeliveryOptions());
64+
bus.sendOrPub(context, msg, new DeliveryOptions(), context.promise());
65+
}
66+
67+
@Override
68+
public void end() {
69+
MessageImpl msg = base.createReply(new FinFrame(remoteAddress), new DeliveryOptions());
70+
bus.sendOrPub(context, msg, new DeliveryOptions(), context.promise());
71+
if (halfClosed) {
72+
unregister();
73+
} else {
74+
halfClosed = true;
75+
}
76+
}
77+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package io.vertx.core.eventbus.impl;
2+
3+
import io.vertx.core.Handler;
4+
import io.vertx.core.MultiMap;
5+
import io.vertx.core.eventbus.DeliveryOptions;
6+
import io.vertx.core.eventbus.Message;
7+
import io.vertx.core.eventbus.MessageStream;
8+
import io.vertx.core.impl.ContextInternal;
9+
import io.vertx.core.impl.future.PromiseInternal;
10+
11+
class StreamServer extends HandlerRegistration {
12+
private final EventBusImpl eventBus;
13+
private final Handler<MessageStream> handler;
14+
15+
public StreamServer(EventBusImpl eventBus, ContextInternal ctx, String address, Handler<MessageStream> handler) {
16+
super(ctx, eventBus, address, false);
17+
this.eventBus = eventBus;
18+
this.handler = handler;
19+
}
20+
21+
@Override
22+
protected boolean doReceive(Message msg) {
23+
if (msg.body() instanceof SynFrame) {
24+
SynFrame syn = (SynFrame) msg.body();
25+
String localAddress = eventBus.generateReplyAddress();
26+
StreamBase ss = new StreamBase(localAddress, context, eventBus, localAddress, false);
27+
ss.remoteAddress = syn.src;
28+
ss.base = (MessageImpl) msg;
29+
PromiseInternal<Void> p = context.promise();
30+
ss.register(false, false, p);
31+
p.onComplete(ar -> {
32+
if (ar.succeeded()) {
33+
MessageImpl reply = ((MessageImpl)msg).createReply(new SynFrame(localAddress, syn.src), new DeliveryOptions());
34+
reply.setReplyAddress(localAddress);
35+
eventBus.sendOrPub(context, reply, new DeliveryOptions(), context.promise());
36+
handler.handle(ss);
37+
}
38+
});
39+
}
40+
return true;
41+
}
42+
43+
@Override
44+
protected void dispatch(Message msg, ContextInternal context, Handler handler) {
45+
}
46+
}

0 commit comments

Comments
 (0)