Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 27 additions & 22 deletions oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/RpcCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,25 @@
*/
package org.dcache.oncrpc4j.rpc;

import com.google.common.base.Throwables;
import org.dcache.oncrpc4j.grizzly.GrizzlyMemoryManager;
import org.dcache.oncrpc4j.xdr.Xdr;
import org.dcache.oncrpc4j.xdr.XdrVoid;
import org.dcache.oncrpc4j.xdr.XdrEncodingStream;
import org.dcache.oncrpc4j.xdr.XdrAble;
import com.google.common.base.Throwables;
import java.io.EOFException;
import org.dcache.oncrpc4j.xdr.XdrEncodingStream;
import org.dcache.oncrpc4j.xdr.XdrVoid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.nio.channels.CompletionHandler;
import java.util.Random;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class RpcCall {
Expand Down Expand Up @@ -440,21 +437,29 @@ private int callInternal(int procedure, XdrAble args, CompletionHandler<RpcReply

int xid = nextXid();

Xdr xdr = new Xdr(Xdr.INITIAL_XDR_SIZE);
xdr.beginEncoding();
Xdr header = new Xdr(Xdr.INITIAL_XDR_SIZE);
header.beginEncoding();
RpcMessage rpcMessage = new RpcMessage(xid, RpcMessageType.CALL);
rpcMessage.xdrEncode(xdr);
xdr.xdrEncodeInt(RPCVERS);
xdr.xdrEncodeInt(_prog);
xdr.xdrEncodeInt(_version);
xdr.xdrEncodeInt(procedure);
rpcMessage.xdrEncode(header);
header.xdrEncodeInt(RPCVERS);
header.xdrEncodeInt(_prog);
header.xdrEncodeInt(_version);
header.xdrEncodeInt(procedure);
if (auth != null) {
auth.xdrEncode(xdr);
auth.xdrEncode(header);
} else {
_cred.xdrEncode(xdr);
_cred.xdrEncode(header);
}
args.xdrEncode(xdr);
xdr.endEncoding();
header.endEncoding();

Xdr body = args.xdrEncode();

Xdr request = new Xdr(
GrizzlyMemoryManager.createComposite(
header.asBuffer(),
body.asBuffer()
)
);

ReplyQueue replyQueue = _transport.getReplyQueue();
if (callback != null) {
Expand All @@ -466,7 +471,7 @@ private int callInternal(int procedure, XdrAble args, CompletionHandler<RpcReply
}
}

_transport.send(xdr, _transport.getRemoteSocketAddress(), new NotifyListenersCompletionHandler() {
_transport.send(header, _transport.getRemoteSocketAddress(), new NotifyListenersCompletionHandler() {

@Override
public void failed(Throwable t, InetSocketAddress attachment) {
Expand Down
15 changes: 15 additions & 0 deletions oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/xdr/XdrAble.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ public interface XdrAble {
*/
public void xdrDecode(XdrDecodingStream xdr) throws OncRpcException, IOException;

/**
* Encodes -- that is: serializes -- an object into a XDR object in
* compliance to RFC 1832.
*
* @return the XDR representation of this object.
* @throws OncRpcException if an ONC/RPC error occurs.
*/
public default Xdr xdrEncode() throws OncRpcException, IOException {
Xdr xdr = new Xdr(Xdr.INITIAL_XDR_SIZE);
xdr.beginEncoding();
xdrEncode(xdr);
xdr.endEncoding();
return xdr;
}

/**
* Encodes -- that is: serializes -- an object into a XDR stream in
* compliance to RFC 1832.
Expand Down