Skip to content

Commit f917a7d

Browse files
committed
rpc: remove complete timeout task after request is complete
Motivation: ReplyQueue uses ScheduledThreadPoolExecutor to collect expired request. However, non expired requests (those that get reply) not removed from the scheduler's queue. Modification: Configure ScheduledThreadPoolExecutor to remove canceled tasks. Add test to demonstrate this 'leak' pattern as well as improve overall test coverage. Result: Avoid potential memory leak for high number of requests with bit timeout values. Acked-by: Marina Sahakyan Target: master, 3.1 (cherry picked from commit 9328681) Signed-off-by: Tigran Mkrtchyan <[email protected]>
1 parent e533966 commit f917a7d

File tree

2 files changed

+73
-2
lines changed

2 files changed

+73
-2
lines changed

oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/ReplyQueue.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
*/
2020
package org.dcache.oncrpc4j.rpc;
2121

22+
import com.google.common.annotations.VisibleForTesting;
2223
import java.io.EOFException;
2324
import java.net.SocketAddress;
2425
import java.nio.channels.CompletionHandler;
2526
import java.util.Collection;
2627
import java.util.Collections;
28+
import java.util.concurrent.BlockingQueue;
2729
import java.util.concurrent.ConcurrentHashMap;
2830
import java.util.concurrent.ConcurrentMap;
29-
import java.util.concurrent.ScheduledExecutorService;
3031
import java.util.concurrent.ScheduledFuture;
3132
import java.util.concurrent.ScheduledThreadPoolExecutor;
3233
import java.util.concurrent.ThreadFactory;
@@ -36,7 +37,7 @@
3637

3738
public class ReplyQueue {
3839

39-
private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
40+
private final ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
4041
private final AtomicInteger counter = new AtomicInteger();
4142

4243
@Override
@@ -48,6 +49,10 @@ public Thread newThread(Runnable r) {
4849
});
4950
private final ConcurrentMap<Integer, PendingRequest> _queue = new ConcurrentHashMap<>();
5051

52+
public ReplyQueue() {
53+
executorService.setRemoveOnCancelPolicy(true);
54+
}
55+
5156
/**
5257
* Register callback handler for a given xid. The Callback is called when
5358
* client receives reply from the server, request failed of expired.
@@ -146,6 +151,11 @@ void failed(Throwable t) {
146151
}
147152
}
148153

154+
@VisibleForTesting
155+
BlockingQueue<Runnable> getTimeoutQueue() {
156+
return executorService.getQueue();
157+
}
158+
149159
/**
150160
* Shutdown all background activity, if any.
151161
*/
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package org.dcache.oncrpc4j.rpc;
2+
3+
import java.io.EOFException;
4+
import java.net.InetSocketAddress;
5+
import java.net.SocketAddress;
6+
import java.nio.channels.CompletionHandler;
7+
import java.util.concurrent.TimeUnit;
8+
import org.junit.Test;
9+
import org.junit.Before;
10+
11+
import static org.junit.Assert.assertFalse;
12+
import static org.junit.Assert.assertTrue;
13+
import static org.mockito.ArgumentMatchers.any;
14+
import static org.mockito.Mockito.mock;
15+
import static org.mockito.Mockito.verify;
16+
17+
public class ReplyQueueTest {
18+
19+
private ReplyQueue replyQueue;
20+
private SocketAddress addr;
21+
private CompletionHandler<RpcReply, RpcTransport> handler;
22+
23+
@Before
24+
public void setUp() {
25+
replyQueue = new ReplyQueue();
26+
addr = mock(InetSocketAddress.class);
27+
handler = mock(CompletionHandler.class);
28+
}
29+
30+
@Test
31+
public void testRemoveCancel() throws EOFException {
32+
33+
replyQueue.registerKey(1, addr, handler, 1, TimeUnit.MINUTES);
34+
35+
assertFalse(replyQueue.getTimeoutQueue().isEmpty());
36+
37+
replyQueue.get(1);
38+
39+
assertTrue(replyQueue.getTimeoutQueue().isEmpty());
40+
}
41+
42+
@Test
43+
public void testInvokeHandlerOnTimeout() throws EOFException, InterruptedException {
44+
45+
replyQueue.registerKey(1, addr, handler, 1, TimeUnit.NANOSECONDS);
46+
47+
TimeUnit.SECONDS.sleep(1);
48+
assertTrue(replyQueue.getPendingRequests().isEmpty());
49+
assertTrue(replyQueue.getTimeoutQueue().isEmpty());
50+
verify(handler).failed(any(), any());
51+
}
52+
53+
@Test
54+
public void testRequestWithoutOnTimeout() throws EOFException, InterruptedException {
55+
56+
replyQueue.registerKey(1, addr, handler);
57+
assertFalse(replyQueue.getPendingRequests().isEmpty());
58+
assertTrue(replyQueue.getTimeoutQueue().isEmpty());
59+
}
60+
61+
}

0 commit comments

Comments
 (0)