Skip to content

Commit c1bf322

Browse files
committed
Fix ByteBuf memleak and timeout exceed
1. [FIX] ByteBuf won't release when request timeout 2. [FIX] reconnect time exceed RPC_CONNECT_TIMEOUT when reconnect concurrently 3. [Chore] change the log printing mode from sync to async Link: https://code.alibaba-inc.com/oceanbase/obkv-table-client-java/codereview/12043375 * [FIX] ByteBuf won't release when request timeout * [FIX] reconnect time exceed RPC_CONNECT_TIMEOUT when reconnect concurrently * [FIX] ByteBuf won't release when request timeout * [FIX] reconnect time exceed RPC_CONNECT_TIMEOUT when reconnect concurrently * [chore] change the log printing mode from sync to async * [chore] change the log printing mode from sync to async
1 parent 545948a commit c1bf322

28 files changed

+377
-306
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1182,9 +1182,11 @@ private ReplicaLocation getPartitionLocation(TableEntry tableEntry, long partId,
11821182
ObServerRoute route) {
11831183
if (ObGlobal.OB_VERSION >= 4 && tableEntry.isPartitionTable()) {
11841184
long TabletId = tableEntry.getPartitionInfo().getPartTabletIdMap().get(partId);
1185-
return tableEntry.getPartitionEntry().getPartitionLocationWithTabletId(TabletId).getReplica(route);
1185+
return tableEntry.getPartitionEntry().getPartitionLocationWithTabletId(TabletId)
1186+
.getReplica(route);
11861187
} else {
1187-
return tableEntry.getPartitionEntry().getPartitionLocationWithPartId(partId).getReplica(route);
1188+
return tableEntry.getPartitionEntry().getPartitionLocationWithPartId(partId)
1189+
.getReplica(route);
11881190

11891191
}
11901192
}
@@ -1199,7 +1201,7 @@ private ReplicaLocation getPartitionLocation(TableEntry tableEntry, long partId,
11991201
* @throws Exception exception
12001202
*/
12011203
public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, boolean refresh,
1202-
boolean waitForRefresh) throws Exception {
1204+
boolean waitForRefresh) throws Exception {
12031205
return getTable(tableName, rowKey, refresh, waitForRefresh, getRoute(false));
12041206
}
12051207

@@ -1215,7 +1217,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, bo
12151217
*/
12161218
public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, boolean refresh,
12171219
boolean waitForRefresh, ObServerRoute route)
1218-
throws Exception {
1220+
throws Exception {
12191221
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh);
12201222

12211223
long partId = getPartition(tableEntry, rowKey);
@@ -1286,7 +1288,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, List<ObNewRange> ke
12861288
*/
12871289
public ObPair<Long, ObTableParam> getTable(String tableName, long partId, boolean refresh,
12881290
boolean waitForRefresh, ObServerRoute route)
1289-
throws Exception {
1291+
throws Exception {
12901292
return getTable(tableName, getOrRefreshTableEntry(tableName, refresh, waitForRefresh),
12911293
partId, waitForRefresh, route);
12921294
}
@@ -1301,9 +1303,9 @@ public ObPair<Long, ObTableParam> getTable(String tableName, long partId, boolea
13011303
* @return ObPair of partId and table
13021304
* @throws Exception exception
13031305
*/
1304-
public ObPair<Long, ObTableParam> getTable(String tableName, TableEntry tableEntry, long partId,
1305-
boolean waitForRefresh, ObServerRoute route)
1306-
throws Exception {
1306+
public ObPair<Long, ObTableParam> getTable(String tableName, TableEntry tableEntry,
1307+
long partId, boolean waitForRefresh,
1308+
ObServerRoute route) throws Exception {
13071309
ObPair<Long, ReplicaLocation> partitionReplica = getPartitionReplica(tableEntry, partId,
13081310
route);
13091311

@@ -1334,7 +1336,8 @@ public ObPair<Long, ObTableParam> getTable(String tableName, TableEntry tableEnt
13341336

13351337
ObTableParam param = new ObTableParam(obTable);
13361338
if (ObGlobal.OB_VERSION >= 4) {
1337-
partId = tableEntry.isPartitionTable() ? tableEntry.getPartitionInfo().getPartTabletIdMap().get(partId) : partId;
1339+
partId = tableEntry.isPartitionTable() ? tableEntry.getPartitionInfo()
1340+
.getPartTabletIdMap().get(partId) : partId;
13381341
}
13391342

13401343
param.setTableId(tableEntry.getTableId());
@@ -1429,7 +1432,7 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, Object[] sta
14291432
boolean startInclusive, Object[] end,
14301433
boolean endInclusive, boolean refresh,
14311434
boolean waitForRefresh, ObServerRoute route)
1432-
throws Exception {
1435+
throws Exception {
14331436

14341437
// 1. get TableEntry information
14351438
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh);
@@ -1463,7 +1466,8 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, Object[] sta
14631466

14641467
ObTableParam param = new ObTableParam(obTable);
14651468
if (ObGlobal.OB_VERSION >= 4) {
1466-
partId = tableEntry.isPartitionTable() ? tableEntry.getPartitionInfo().getPartTabletIdMap().get(partId) : partId;
1469+
partId = tableEntry.isPartitionTable() ? tableEntry.getPartitionInfo()
1470+
.getPartTabletIdMap().get(partId) : partId;
14671471
}
14681472

14691473
param.setTableId(tableEntry.getTableId());
@@ -2012,7 +2016,7 @@ public ObPayload incrementWithResult(final String tableName, final Object[] rowK
20122016
public ObPayload execute(ObPair<Long, ObTableParam> obPair) throws Exception {
20132017
long TableTime = System.currentTimeMillis();
20142018
ObTableParam tableParam = obPair.getRight();
2015-
ObTable obTable =tableParam.getObTable();
2019+
ObTable obTable = tableParam.getObTable();
20162020
ObTableOperationRequest request = ObTableOperationRequest.getInstance(
20172021
tableName, INCREMENT, rowKey, columns, values,
20182022
obTable.getObTableOperationTimeout());
@@ -2346,9 +2350,9 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
23462350
end[i] = endKey.getObj(i).getValue();
23472351
}
23482352
ObBorderFlag borderFlag = rang.getBorderFlag();
2349-
List<ObPair<Long, ObTableParam>> pairList = getTables(request.getTableName(), start,
2350-
borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(), false,
2351-
false);
2353+
List<ObPair<Long, ObTableParam>> pairList = getTables(request.getTableName(),
2354+
start, borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(),
2355+
false, false);
23522356
for (ObPair<Long, ObTableParam> pair : pairList) {
23532357
partIdMapObTable.put(pair.getLeft(), pair.getRight());
23542358
}

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObClientFuture.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ public RemotingCommand waitResponse(long timeoutMillis) throws InterruptedExcept
4949
if (waiter.await(timeoutMillis, TimeUnit.MILLISECONDS)) {
5050
return response;
5151
} else {
52-
return ObTablePacket.createTransportErrorPacket(TransportCodes.BOLT_TIMEOUT,
53-
"wait timeout: " + timeoutMillis, null);
52+
return null;
5453
}
5554
}
5655

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,23 @@
2525
import com.alipay.remoting.Connection;
2626
import org.slf4j.Logger;
2727

28+
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.concurrent.atomic.AtomicLong;
2930

3031
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.MONITOR;
3132
import static com.alipay.oceanbase.rpc.util.TraceUtil.formatTraceMessage;
3233

3334
public class ObTableConnection {
3435

35-
private static final Logger LOGGER = TableClientLoggerFactory
36-
.getLogger(ObTableConnection.class);
36+
private static final Logger LOGGER = TableClientLoggerFactory
37+
.getLogger(ObTableConnection.class);
3738
private ObBytesString credential;
38-
private long tenantId = 1; //默认值切勿不要随意改动
39+
private long tenantId = 1; //默认值切勿不要随意改动
3940
private Connection connection;
4041
private final ObTable obTable;
41-
private long uniqueId; // as trace0 in rpc header
42-
private AtomicLong sequence; // as trace1 in rpc header
42+
private long uniqueId; // as trace0 in rpc header
43+
private AtomicLong sequence; // as trace1 in rpc header
44+
private AtomicBoolean isReConnecting = new AtomicBoolean(false); // indicate is re-connecting or not
4345

4446
public static long ipToLong(String strIp) {
4547
String[] ip = strIp.split("\\.");
@@ -76,7 +78,7 @@ public void init() throws Exception {
7678
uniqueId = ip | port | isUserRequest | reserved;
7779
}
7880

79-
private synchronized boolean connect() throws Exception {
81+
private boolean connect() throws Exception {
8082
if (checkAvailable()) { // double check status available
8183
return false;
8284
}
@@ -116,7 +118,7 @@ private synchronized boolean connect() throws Exception {
116118
return true;
117119
}
118120

119-
private synchronized void login() throws Exception {
121+
private void login() throws Exception {
120122
final long start = System.currentTimeMillis();
121123
ObTableLoginRequest request = new ObTableLoginRequest();
122124
request.setTenantName(obTable.getTenantName());
@@ -197,11 +199,36 @@ public void checkStatus() throws Exception {
197199
}
198200
}
199201

200-
private synchronized void reconnect(String msg) throws Exception {
201-
if (connect()) {
202-
LOGGER.warn("reconnect success. reconnect reason: [{}]", msg);
202+
/**
203+
* Reconnect current connection and login
204+
*
205+
* @param msg the reconnect reason
206+
* @exception Exception if connect successfully or connection already reconnected by others
207+
* throw exception if connect failed
208+
*
209+
*/
210+
private void reconnect(String msg) throws Exception {
211+
if (isReConnecting.compareAndSet(false, true)) {
212+
try {
213+
if (connect()) {
214+
LOGGER.warn("reconnect success. reconnect reason: [{}]", msg);
215+
} else {
216+
LOGGER.info(
217+
"connection maybe reconnect by other thread. reconnect reason: [{}]", msg);
218+
}
219+
} catch (Exception e) {
220+
throw e;
221+
} finally {
222+
if (!isReConnecting.compareAndSet(true, false)) {
223+
LOGGER
224+
.error(
225+
"failed to set connecting to false after connect finished, reconnect reason: [{}]",
226+
msg);
227+
}
228+
}
203229
} else {
204-
LOGGER.warn("connection maybe reconnect by other thread. reconnect reason: [{}]", msg);
230+
LOGGER.warn("There is someone connecting, no need reconnect");
231+
throw new ObTableException("This connection is already Connecting");
205232
}
206233
}
207234

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTablePacketProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public void process(RemotingContext ctx, ObTablePacket msg, ExecutorService defa
6262
.warn("Cannot find InvokeFuture, maybe already timeout, id={}, from={} ",
6363
msg.getId(),
6464
RemotingUtil.parseRemoteAddress(ctx.getChannelContext().channel()));
65+
msg.releaseByteBuf();
6566
}
6667
} finally {
6768
if (null != oldClassLoader) {

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
7878
String errMessage = TraceUtil.formatTraceMessage(conn, request,
7979
"get an error response: " + response.getMessage());
8080
logger.warn(errMessage);
81+
response.releaseByteBuf();
8182
ExceptionUtil.throwObTableTransportException(errMessage, response.getTransportCode());
8283
return null;
8384
}

0 commit comments

Comments
 (0)