Skip to content

Commit c442d4c

Browse files
committed
fix potential ByteBuffer leak problem
When the worker waits for the response to time out and the netty RPC thread deletes the future from the future map before the worker thread, the response's bytebuf is not released Link: https://code.alibaba-inc.com/oceanbase/obkv-table-client-java/codereview/12756098 * [Doc] update readme 1. use partition table in example 2. add link for deploying config server 3. add Chinese doc link * fix potential ByteBuffer leak problem - When the worker waits for the response to time out and the netty RPC thread deletes the future from the future map before the worker thread, the response's bytebuf is not released * fix potential ByteBuffer leak problem - When the worker waits for the response to time out and the netty RPC thread deletes the future from the future map before the worker thread, the response's bytebuf is not released * fix potential ByteBuffer leak problem - When the worker waits for the response to time out and the netty RPC thread deletes the future from the future map before the worker thread, the response's bytebuf is not released * fix potential ByteBuffer leak problem - When the worker waits for the response to time out and the netty RPC thread deletes the future from the future map before the worker thread, the response's bytebuf is not released * fix potential ByteBuffer leak problem - When the worker waits for the response to time out and the netty RPC thread deletes the future from the future map before the worker thread, the response's bytebuf is not released
1 parent 7f4ad14 commit c442d4c

File tree

1 file changed

+28
-5
lines changed

1 file changed

+28
-5
lines changed

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObClientFuture.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,28 @@
2222
import com.alipay.remoting.InvokeContext;
2323
import com.alipay.remoting.InvokeFuture;
2424
import com.alipay.remoting.RemotingCommand;
25+
import com.alipay.oceanbase.rpc.exception.ObTableTimeoutExcetion;
2526
import io.netty.util.Timeout;
2627

2728
import java.net.InetSocketAddress;
2829
import java.util.concurrent.CountDownLatch;
2930
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicInteger;
3032

3133
public class ObClientFuture implements InvokeFuture {
3234

3335
private CountDownLatch waiter = new CountDownLatch(1);
3436
private RemotingCommand response;
3537
private int channelId;
3638

39+
// BY_WORKER indicate response must be release by worker itself.
40+
// BY_BACKGROUND indicate response must be release by background decoder thread
41+
private static int INIT = 0;
42+
private static int BY_WORKER = 1;
43+
private static int BY_BACKGROUND = 2;
44+
45+
private AtomicInteger releaseFlag = new AtomicInteger(INIT);
46+
3747
/*
3848
* Ob client future.
3949
*/
@@ -46,11 +56,19 @@ public ObClientFuture(int channelId) {
4656
*/
4757
@Override
4858
public RemotingCommand waitResponse(long timeoutMillis) throws InterruptedException {
49-
if (waiter.await(timeoutMillis, TimeUnit.MILLISECONDS)) {
50-
return response;
51-
} else {
52-
return null;
53-
}
59+
try {
60+
if (waiter.await(timeoutMillis, TimeUnit.MILLISECONDS) || !releaseFlag.compareAndSet(INIT, BY_BACKGROUND)) {
61+
return response;
62+
} else {
63+
return null;
64+
}
65+
} catch (InterruptedException e) {
66+
releaseFlag.set(BY_BACKGROUND);
67+
if (response instanceof ObTablePacket) {
68+
((ObTablePacket) response).releaseByteBuf();
69+
}
70+
throw e;
71+
} finally {}
5472
}
5573

5674
/*
@@ -69,6 +87,11 @@ public RemotingCommand waitResponse() throws InterruptedException {
6987
public void putResponse(RemotingCommand response) {
7088
this.response = response;
7189
waiter.countDown();
90+
if (!releaseFlag.compareAndSet(INIT, BY_WORKER)) {
91+
if (response instanceof ObTablePacket) {
92+
((ObTablePacket) response).releaseByteBuf();
93+
}
94+
}
7295
}
7396

7497
/*

0 commit comments

Comments
 (0)