Skip to content

Commit e3f447d

Browse files
whyvlvaci
authored andcommitted
use java nio AsynchronousByteChannel instead for Serialize
1 parent ee0d727 commit e3f447d

File tree

7 files changed

+43
-122
lines changed

7 files changed

+43
-122
lines changed

runtime-rpc/src/main/java/org/capnproto/EzRpcClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package org.capnproto;
22

3-
import java.nio.channels.AsynchronousSocketChannel;
3+
import java.nio.channels.AsynchronousByteChannel;
44
import java.util.concurrent.CompletableFuture;
55

66
public class EzRpcClient {
77

88
private final TwoPartyClient twoPartyRpc;
99
private final Capability.Client client;
1010

11-
public EzRpcClient(AsynchronousSocketChannel socket) {
11+
public EzRpcClient(AsynchronousByteChannel socket) {
1212
this.twoPartyRpc = new TwoPartyClient(socket);
1313
this.client = this.twoPartyRpc.bootstrap();
1414
}

runtime-rpc/src/main/java/org/capnproto/EzRpcServer.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22

33
import java.io.IOException;
44
import java.net.InetSocketAddress;
5-
import java.nio.channels.AsynchronousChannelGroup;
6-
import java.nio.channels.AsynchronousServerSocketChannel;
5+
import java.nio.channels.*;
76
import java.util.concurrent.CompletableFuture;
87
import java.util.concurrent.Executors;
98

@@ -32,6 +31,21 @@ public int getPort() {
3231
}
3332

3433
public CompletableFuture<java.lang.Void> start() {
35-
return this.twoPartyRpc.listen(this.serverAcceptSocket);
34+
return this.twoPartyRpc.listen(new AsynchronousByteListenChannel() {
35+
@Override
36+
public <A> void accept(A attachment, CompletionHandler<AsynchronousByteChannel, ? super A> handler) {
37+
serverAcceptSocket.accept(attachment, new CompletionHandler<>() {
38+
@Override
39+
public void completed(AsynchronousSocketChannel result, A attachment) {
40+
handler.completed(result, attachment);
41+
}
42+
43+
@Override
44+
public void failed(Throwable exc, A attachment) {
45+
handler.failed(exc, attachment);
46+
}
47+
});
48+
}
49+
});
3650
}
3751
}

runtime-rpc/src/main/java/org/capnproto/TwoPartyClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
package org.capnproto;
22

3-
import java.nio.channels.AsynchronousSocketChannel;
3+
import java.nio.channels.AsynchronousByteChannel;
44
import java.util.concurrent.CompletableFuture;
55

66
public class TwoPartyClient {
77

88
private final TwoPartyVatNetwork network;
99
private final RpcSystem<RpcTwoPartyProtocol.VatId.Reader> rpcSystem;
1010

11-
public TwoPartyClient(AsynchronousSocketChannel channel) {
11+
public TwoPartyClient(AsynchronousByteChannel channel) {
1212
this(channel, null);
1313
}
1414

15-
public TwoPartyClient(AsynchronousSocketChannel channel, Capability.Client bootstrapInterface) {
15+
public TwoPartyClient(AsynchronousByteChannel channel, Capability.Client bootstrapInterface) {
1616
this(channel, bootstrapInterface, RpcTwoPartyProtocol.Side.CLIENT);
1717
}
1818

19-
public TwoPartyClient(AsynchronousSocketChannel channel,
19+
public TwoPartyClient(AsynchronousByteChannel channel,
2020
Capability.Client bootstrapInterface,
2121
RpcTwoPartyProtocol.Side side) {
2222
this.network = new TwoPartyVatNetwork(channel, side);

runtime-rpc/src/main/java/org/capnproto/TwoPartyServer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
public class TwoPartyServer {
99

1010
private class AcceptedConnection {
11-
private final AsynchronousSocketChannel connection;
11+
private final AsynchronousByteChannel connection;
1212
private final TwoPartyVatNetwork network;
1313
private final RpcSystem<RpcTwoPartyProtocol.VatId.Reader> rpcSystem;
1414

15-
AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousSocketChannel connection) {
15+
AcceptedConnection(Capability.Client bootstrapInterface, AsynchronousByteChannel connection) {
1616
this.connection = connection;
1717
this.network = new TwoPartyVatNetwork(this.connection, RpcTwoPartyProtocol.Side.SERVER);
1818
this.rpcSystem = new RpcSystem<>(network, bootstrapInterface);
@@ -31,19 +31,19 @@ public TwoPartyServer(Capability.Server bootstrapServer) {
3131
this(new Capability.Client(bootstrapServer));
3232
}
3333

34-
public void accept(AsynchronousSocketChannel channel) {
34+
public void accept(AsynchronousByteChannel channel) {
3535
var connection = new AcceptedConnection(this.bootstrapInterface, channel);
3636
this.connections.add(connection);
3737
connection.network.onDisconnect().whenComplete((x, exc) -> {
3838
this.connections.remove(connection);
3939
});
4040
}
4141

42-
public CompletableFuture<java.lang.Void> listen(AsynchronousServerSocketChannel listener) {
43-
var result = new CompletableFuture<AsynchronousSocketChannel>();
42+
public CompletableFuture<java.lang.Void> listen(AsynchronousByteListenChannel listener) {
43+
var result = new CompletableFuture<AsynchronousByteChannel>();
4444
listener.accept(null, new CompletionHandler<>() {
4545
@Override
46-
public void completed(AsynchronousSocketChannel channel, Object attachment) {
46+
public void completed(AsynchronousByteChannel channel, Object attachment) {
4747
accept(channel);
4848
result.complete(null);
4949
}

runtime-rpc/src/main/java/org/capnproto/TwoPartyVatNetwork.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package org.capnproto;
22

33
import java.io.FileDescriptor;
4-
import java.nio.channels.AsynchronousSocketChannel;
4+
import java.nio.channels.AsynchronousByteChannel;
55
import java.util.List;
66
import java.util.concurrent.CompletableFuture;
77

@@ -11,12 +11,12 @@ public class TwoPartyVatNetwork
1111

1212
private CompletableFuture<java.lang.Void> previousWrite = CompletableFuture.completedFuture(null);
1313
private final CompletableFuture<java.lang.Void> disconnectPromise = new CompletableFuture<>();
14-
private final AsynchronousSocketChannel channel;
14+
private final AsynchronousByteChannel channel;
1515
private final RpcTwoPartyProtocol.Side side;
1616
private final MessageBuilder peerVatId = new MessageBuilder(4);
1717
private boolean accepted;
1818

19-
public TwoPartyVatNetwork(AsynchronousSocketChannel channel, RpcTwoPartyProtocol.Side side) {
19+
public TwoPartyVatNetwork(AsynchronousByteChannel channel, RpcTwoPartyProtocol.Side side) {
2020
this.channel = channel;
2121
this.side = side;
2222
this.peerVatId.initRoot(RpcTwoPartyProtocol.VatId.factory).setSide(
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package org.capnproto;
2+
3+
import java.nio.channels.AsynchronousByteChannel;
4+
import java.nio.channels.CompletionHandler;
5+
6+
public interface AsynchronousByteListenChannel {
7+
public abstract <A> void accept(A attachment, CompletionHandler<AsynchronousByteChannel,? super A> handler);
8+
}

runtime/src/main/java/org/capnproto/Serialize.java

Lines changed: 3 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -338,51 +338,10 @@ private void readSegments(int totalWords, int segmentCount, int segment0Size, in
338338
abstract void read(int bytes, Consumer<? super ByteBuffer> consumer);
339339
}
340340

341-
static class AsyncSocketReader extends AsyncMessageReader {
342-
private final AsynchronousSocketChannel channel;
343-
private final long timeout;
344-
private final TimeUnit timeUnit;
345-
346-
AsyncSocketReader(AsynchronousSocketChannel channel, ReaderOptions options, long timeout, TimeUnit timeUnit) {
347-
super(options);
348-
this.channel = channel;
349-
this.timeout = timeout;
350-
this.timeUnit = timeUnit;
351-
}
352-
353-
void read(int bytes, Consumer<? super ByteBuffer> consumer) {
354-
var buffer = Serialize.makeByteBuffer(bytes);
355-
var handler = new CompletionHandler<Integer, Object>() {
356-
@Override
357-
public void completed(Integer result, Object attachment) {
358-
//System.out.println(channel.toString() + ": read " + result + " bytes");
359-
if (result <= 0) {
360-
var text = result == 0
361-
? "Read zero bytes. Is the channel in non-blocking mode?"
362-
: "Premature EOF";
363-
readCompleted.completeExceptionally(new IOException(text));
364-
} else if (buffer.hasRemaining()) {
365-
// partial read
366-
channel.read(buffer, timeout, timeUnit, null, this);
367-
} else {
368-
consumer.accept(buffer);
369-
}
370-
}
371-
372-
@Override
373-
public void failed(Throwable exc, Object attachment) {
374-
readCompleted.completeExceptionally(exc);
375-
}
376-
};
377-
378-
this.channel.read(buffer, this.timeout, this.timeUnit, null, handler);
379-
}
380-
}
381-
382-
static class AsyncByteChannelReader extends AsyncMessageReader {
341+
static class AsynchronousByteChannelReader extends AsyncMessageReader {
383342
private final AsynchronousByteChannel channel;
384343

385-
AsyncByteChannelReader(AsynchronousByteChannel channel, ReaderOptions options) {
344+
AsynchronousByteChannelReader(AsynchronousByteChannel channel, ReaderOptions options) {
386345
super(options);
387346
this.channel = channel;
388347
}
@@ -421,23 +380,7 @@ public static CompletableFuture<MessageReader> readAsync(AsynchronousByteChannel
421380
}
422381

423382
public static CompletableFuture<MessageReader> readAsync(AsynchronousByteChannel channel, ReaderOptions options) {
424-
return new AsyncByteChannelReader(channel, options).getMessage();
425-
}
426-
427-
public static CompletableFuture<MessageReader> readAsync(AsynchronousSocketChannel channel) {
428-
return readAsync(channel, ReaderOptions.DEFAULT_READER_OPTIONS, Long.MAX_VALUE, TimeUnit.SECONDS);
429-
}
430-
431-
public static CompletableFuture<MessageReader> readAsync(AsynchronousSocketChannel channel, ReaderOptions options) {
432-
return readAsync(channel, options, Long.MAX_VALUE, TimeUnit.SECONDS);
433-
}
434-
435-
public static CompletableFuture<MessageReader> readAsync(AsynchronousSocketChannel channel, long timeout, TimeUnit timeUnit) {
436-
return readAsync(channel, ReaderOptions.DEFAULT_READER_OPTIONS, timeout, timeUnit);
437-
}
438-
439-
public static CompletableFuture<MessageReader> readAsync(AsynchronousSocketChannel channel, ReaderOptions options, long timeout, TimeUnit timeUnit) {
440-
return new AsyncSocketReader(channel, options, timeout, timeUnit).getMessage();
383+
return new AsynchronousByteChannelReader(channel, options).getMessage();
441384
}
442385

443386
public static CompletableFuture<java.lang.Void> writeAsync(AsynchronousByteChannel outputChannel, MessageBuilder message) {
@@ -477,50 +420,6 @@ public void failed(Throwable exc, Integer attachment) {
477420
return writeCompleted;
478421
}
479422

480-
public static CompletableFuture<java.lang.Void> writeAsync(AsynchronousSocketChannel outputChannel, MessageBuilder message) {
481-
return writeAsync(outputChannel, message, Long.MAX_VALUE, TimeUnit.SECONDS);
482-
}
483-
484-
public static CompletableFuture<java.lang.Void> writeAsync(AsynchronousSocketChannel outputChannel, MessageBuilder message, long timeout, TimeUnit timeUnit) {
485-
var writeCompleted = new CompletableFuture<java.lang.Void>();
486-
var segments = message.getSegmentsForOutput();
487-
var header = getHeaderForOutput(segments);
488-
long totalBytes = header.remaining();
489-
490-
// TODO avoid this copy?
491-
var allSegments = new ByteBuffer[segments.length+1];
492-
allSegments[0] = header;
493-
for (int ii = 0; ii < segments.length; ++ii) {
494-
var segment = segments[ii];
495-
allSegments[ii+1] = segment;
496-
totalBytes += segment.remaining();
497-
}
498-
499-
outputChannel.write(allSegments, 0, allSegments.length, timeout, timeUnit, totalBytes, new CompletionHandler<>() {
500-
@Override
501-
public void completed(Long result, Long totalBytes) {
502-
//System.out.println(outputChannel.toString() + ": Wrote " + result + "/" + totalBytes + " bytes");
503-
if (result < 0) {
504-
writeCompleted.completeExceptionally(new IOException("Write failed"));
505-
}
506-
else if (result < totalBytes) {
507-
// partial write
508-
outputChannel.write(allSegments, 0, allSegments.length, timeout, timeUnit, totalBytes - result, this);
509-
}
510-
else {
511-
writeCompleted.complete(null);
512-
}
513-
}
514-
515-
@Override
516-
public void failed(Throwable exc, Long attachment) {
517-
writeCompleted.completeExceptionally(exc);
518-
}
519-
});
520-
521-
return writeCompleted;
522-
}
523-
524423
private static ByteBuffer getHeaderForOutput(ByteBuffer[] segments) {
525424
assert segments.length > 0: "Empty message";
526425
int tableSize = (segments.length + 2) & (~1);

0 commit comments

Comments
 (0)