Skip to content

Commit 506c949

Browse files
committed
rpc: move RPC record marking into GrizzlyRpcTransport
Motivation: The grizzly pipeline built as: TransportFilter <-> RpcMessageParserTCP <-> RpcProtocolFilter <-> RpcDispatcher which means, that every `Connection#write` called in RpcProtocolFilter and RpcDispatcher calls RpcMessageParserTCP#handleWrite. The later one always adds RPC frame marker, thus doesn't allow to use multiple writes within singe frame. Modification: Move RPC message separation marker appending from RpcMessageParserTCP into `GrizzlyRpcTransport#send` Result: The `GrizzlyRpcTransport#send` can use multiple `Connection#write` to send a rpc message. Acked-by: Albert Rossi Target: master
1 parent d2f26e2 commit 506c949

File tree

2 files changed

+25
-23
lines changed

2 files changed

+25
-23
lines changed

oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/grizzly/GrizzlyRpcTransport.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2009 - 2019 Deutsches Elektronen-Synchroton,
2+
* Copyright (c) 2009 - 2022 Deutsches Elektronen-Synchroton,
33
* Member of the Helmholtz Association, (DESY), HAMBURG, GERMANY
44
*
55
* This library is free software; you can redistribute it and/or modify
@@ -20,10 +20,14 @@
2020
package org.dcache.oncrpc4j.grizzly;
2121

2222

23+
import java.nio.ByteOrder;
2324
import org.dcache.oncrpc4j.rpc.ReplyQueue;
25+
import org.dcache.oncrpc4j.rpc.RpcMessageParserTCP;
2426
import org.dcache.oncrpc4j.xdr.Xdr;
2527
import java.net.InetSocketAddress;
2628
import java.nio.channels.CompletionHandler;
29+
import org.glassfish.grizzly.memory.BuffersBuffer;
30+
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
2731
import org.slf4j.Logger;
2832
import org.slf4j.LoggerFactory;
2933
import org.glassfish.grizzly.Buffer;
@@ -48,6 +52,12 @@ public class GrizzlyRpcTransport implements RpcTransport {
4852
private final InetSocketAddress _localAddress;
4953
private final InetSocketAddress _remoteAddress;
5054

55+
/**
56+
* If true, then underlying transport is stream-oriented (like TCP) and messages must be separated
57+
* by record marking.
58+
*/
59+
private final boolean _isStreaming;
60+
5161
private final static Logger _log = LoggerFactory.getLogger(GrizzlyRpcTransport.class);
5262

5363
public GrizzlyRpcTransport(Connection<InetSocketAddress> connection, ReplyQueue replyQueue) {
@@ -59,6 +69,7 @@ public GrizzlyRpcTransport(Connection<InetSocketAddress> connection, InetSocketA
5969
_replyQueue = replyQueue;
6070
_localAddress = _connection.getLocalAddress();
6171
_remoteAddress = remoteAddress;
72+
_isStreaming = connection.getTransport() instanceof TCPNIOTransport;
6273
}
6374

6475
@Override
@@ -68,10 +79,19 @@ public boolean isOpen() {
6879

6980
@Override
7081
public <A> void send(final Xdr xdr, A attachment, CompletionHandler<Integer, ? super A> handler) {
71-
final Buffer buffer = xdr.asBuffer();
72-
buffer.allowBufferDispose(true);
7382

7483
requireNonNull(handler, "CompletionHandler can't be null");
84+
Buffer buffer = xdr.asBuffer();
85+
86+
// add record marker, if needed
87+
if (_isStreaming) {
88+
int len = buffer.remaining() | RpcMessageParserTCP.RPC_LAST_FRAG;
89+
Buffer marker = _connection.getMemoryManager().allocate(Integer.BYTES);
90+
marker.order(ByteOrder.BIG_ENDIAN);
91+
marker.putInt(len);
92+
marker.flip();
93+
buffer = BuffersBuffer.create(_connection.getMemoryManager(), marker, buffer);
94+
}
7595

7696
// pass destination address to handle UDP connections as well
7797
_connection.write(_remoteAddress, buffer,

oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/RpcMessageParserTCP.java

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ public class RpcMessageParserTCP extends BaseFilter {
3535
/**
3636
* RPC fragment record marker mask
3737
*/
38-
private final static int RPC_LAST_FRAG = 0x80000000;
38+
public final static int RPC_LAST_FRAG = 0x80000000;
3939
/**
4040
* RPC fragment size mask
4141
*/
42-
private final static int RPC_SIZE_MASK = 0x7fffffff;
42+
public final static int RPC_SIZE_MASK = 0x7fffffff;
4343

4444
@Override
4545
public NextAction handleRead(FilterChainContext ctx) throws IOException {
@@ -61,24 +61,6 @@ public NextAction handleRead(FilterChainContext ctx) throws IOException {
6161
return ctx.getInvokeAction(reminder);
6262
}
6363

64-
@Override
65-
public NextAction handleWrite(FilterChainContext ctx) throws IOException {
66-
67-
Buffer b = ctx.getMessage();
68-
int len = b.remaining() | RPC_LAST_FRAG;
69-
70-
Buffer marker = ctx.getMemoryManager().allocate(4);
71-
marker.order(ByteOrder.BIG_ENDIAN);
72-
marker.putInt(len);
73-
marker.flip();
74-
marker.allowBufferDispose(true);
75-
b.allowBufferDispose(true);
76-
Buffer composite = BuffersBuffer.create(ctx.getMemoryManager(), marker, b);
77-
composite.allowBufferDispose(true);
78-
ctx.setMessage(composite);
79-
return ctx.getInvokeAction();
80-
}
81-
8264
private boolean isAllFragmentsArrived(Buffer messageBuffer) {
8365
final Buffer buffer = messageBuffer.duplicate();
8466
buffer.order(ByteOrder.BIG_ENDIAN);

0 commit comments

Comments
 (0)