diff --git a/oncrpc4j-benchmark/pom.xml b/oncrpc4j-benchmark/pom.xml index 815edf91..58a09d10 100644 --- a/oncrpc4j-benchmark/pom.xml +++ b/oncrpc4j-benchmark/pom.xml @@ -1,11 +1,11 @@ - + 4.0.0 org.dcache oncrpc4j - 3.1.0-SNAPSHOT + 3.1.1-SNAPSHOT Set of JMH benchmarks for oncrpc4j diff --git a/oncrpc4j-core/pom.xml b/oncrpc4j-core/pom.xml index 263c141f..ab47243b 100755 --- a/oncrpc4j-core/pom.xml +++ b/oncrpc4j-core/pom.xml @@ -5,7 +5,7 @@ org.dcache oncrpc4j - 3.1.0-SNAPSHOT + 3.1.1-SNAPSHOT oncrpc4j-core diff --git a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/ReplyQueue.java b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/ReplyQueue.java index f3798a3b..d5cd862f 100644 --- a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/ReplyQueue.java +++ b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/ReplyQueue.java @@ -19,14 +19,15 @@ */ package org.dcache.oncrpc4j.rpc; +import com.google.common.annotations.VisibleForTesting; import java.io.EOFException; import java.net.SocketAddress; import java.nio.channels.CompletionHandler; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -36,7 +37,7 @@ public class ReplyQueue { - private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { + private final ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(); @Override @@ -48,6 +49,10 @@ public Thread newThread(Runnable r) { }); private final ConcurrentMap _queue = new ConcurrentHashMap<>(); + public ReplyQueue() { + executorService.setRemoveOnCancelPolicy(true); + } + /** * Register callback handler for a given xid. The Callback is called when * client receives reply from the server, request failed of expired. @@ -146,6 +151,11 @@ void failed(Throwable t) { } } + @VisibleForTesting + BlockingQueue getTimeoutQueue() { + return executorService.getQueue(); + } + /** * Shutdown all background activity, if any. */ diff --git a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/util/Bytes.java b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/util/Bytes.java index 92237587..94ce3166 100644 --- a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/util/Bytes.java +++ b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/util/Bytes.java @@ -81,7 +81,11 @@ public static void putInt(byte[] bytes, int offset, int value) * @param offset the offset at which data should be read * @return long value */ - public static long getLong(byte[] bytes, int offset) { + public static long getLong(byte[] bytes, int offset) + throws IllegalArgumentException { + + checkArrayLength(bytes.length, offset, 8); + return (bytes[offset] & 0xFFL) << 56 | (bytes[offset + 1] & 0xFFL) << 48 | (bytes[offset + 2] & 0xFFL) << 40 @@ -99,10 +103,20 @@ public static long getLong(byte[] bytes, int offset) { * @param offset the offset at which data should be read * @return int value */ - public static int getInt(byte[] bytes, int offset) { + public static int getInt(byte[] bytes, int offset) + throws IllegalArgumentException { + + checkArrayLength(bytes.length, offset, 4); + return (bytes[offset] & 0xFF) << 24 | (bytes[offset + 1] & 0xFF) << 16 | (bytes[offset + 2] & 0xFF) << 8 | (bytes[offset + 3] & 0xFF); } + + private static void checkArrayLength(int length, int offset, int nrOfBytes) { + boolean arraySufficientlyLong = length > offset + (nrOfBytes - 1); + + if (!arraySufficientlyLong) throw new IllegalArgumentException("Array not sufficiently long"); + } } diff --git a/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/rpc/ReplyQueueTest.java b/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/rpc/ReplyQueueTest.java new file mode 100644 index 00000000..6c63e4d9 --- /dev/null +++ b/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/rpc/ReplyQueueTest.java @@ -0,0 +1,61 @@ +package org.dcache.oncrpc4j.rpc; + +import java.io.EOFException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.Before; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class ReplyQueueTest { + + private ReplyQueue replyQueue; + private SocketAddress addr; + private CompletionHandler handler; + + @Before + public void setUp() { + replyQueue = new ReplyQueue(); + addr = mock(InetSocketAddress.class); + handler = mock(CompletionHandler.class); + } + + @Test + public void testRemoveCancel() throws EOFException { + + replyQueue.registerKey(1, addr, handler, 1, TimeUnit.MINUTES); + + assertFalse(replyQueue.getTimeoutQueue().isEmpty()); + + replyQueue.get(1); + + assertTrue(replyQueue.getTimeoutQueue().isEmpty()); + } + + @Test + public void testInvokeHandlerOnTimeout() throws EOFException, InterruptedException { + + replyQueue.registerKey(1, addr, handler, 1, TimeUnit.NANOSECONDS); + + TimeUnit.SECONDS.sleep(1); + assertTrue(replyQueue.getPendingRequests().isEmpty()); + assertTrue(replyQueue.getTimeoutQueue().isEmpty()); + verify(handler).failed(any(), any()); + } + + @Test + public void testRequestWithoutOnTimeout() throws EOFException, InterruptedException { + + replyQueue.registerKey(1, addr, handler); + assertFalse(replyQueue.getPendingRequests().isEmpty()); + assertTrue(replyQueue.getTimeoutQueue().isEmpty()); + } + +} diff --git a/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/util/BytesTest.java b/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/util/BytesTest.java index 43007c8c..ced7a3d0 100644 --- a/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/util/BytesTest.java +++ b/oncrpc4j-core/src/test/java/org/dcache/oncrpc4j/util/BytesTest.java @@ -69,4 +69,20 @@ public void testPutGetInt() { Bytes.putInt(_b, 0, value); assertEquals("put/get mismatch", value, Bytes.getInt(_b, 0)); } + + @Test(expected = IllegalArgumentException.class) + public void testGetLongTooSmallArray() { + int value = 1717; + Bytes.putLong(_b, 0, value); + + Bytes.getLong(_b, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetIntTooSmallArray() { + int value = 1717; + Bytes.putInt(_b, 0, value); + + Bytes.getInt(_b, 5); + } } \ No newline at end of file diff --git a/oncrpc4j-portmapdaemon/pom.xml b/oncrpc4j-portmapdaemon/pom.xml index 08d94141..cceb6f5a 100644 --- a/oncrpc4j-portmapdaemon/pom.xml +++ b/oncrpc4j-portmapdaemon/pom.xml @@ -5,7 +5,7 @@ org.dcache oncrpc4j - 3.1.0-SNAPSHOT + 3.1.1-SNAPSHOT oncrpc4j-portmapdaemon diff --git a/oncrpc4j-rpcgen/pom.xml b/oncrpc4j-rpcgen/pom.xml index ea23577c..b9fa2983 100644 --- a/oncrpc4j-rpcgen/pom.xml +++ b/oncrpc4j-rpcgen/pom.xml @@ -4,7 +4,7 @@ org.dcache oncrpc4j - 3.1.0-SNAPSHOT + 3.1.1-SNAPSHOT oncrpc4j-rpcgen diff --git a/oncrpc4j-spring/pom.xml b/oncrpc4j-spring/pom.xml index 2e5e2a17..151910c5 100644 --- a/oncrpc4j-spring/pom.xml +++ b/oncrpc4j-spring/pom.xml @@ -4,7 +4,7 @@ org.dcache oncrpc4j - 3.1.0-SNAPSHOT + 3.1.1-SNAPSHOT oncrpc4j-spring diff --git a/pom.xml b/pom.xml index 5e89b004..09a03b37 100755 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.dcache oncrpc4j - 3.1.0-SNAPSHOT + 3.1.1-SNAPSHOT ONCRPC4J parent project pom @@ -322,4 +322,37 @@ + + + sign-artifacts + + + performRelease + true + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + ${gpg.passphrase} + + + + sign-artifacts + verify + + sign + + + + + + + + +