Skip to content

Commit ec0e079

Browse files
committed
Fix infinite loop caused by removed reference
1 parent 15ee532 commit ec0e079

File tree

4 files changed

+103
-131
lines changed

4 files changed

+103
-131
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 68 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,64 +1331,53 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName)
13311331

13321332
public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String tableName, Long tabletId) throws ObTableAuthException {
13331333
TableEntryKey tableEntryKey = new TableEntryKey(clusterName, tenantName, database, tableName);
1334-
Lock lock = null;
13351334
try {
1336-
// if table entry is null, throw exception
13371335
if (tableEntry == null) {
1338-
throw new ObTableEntryRefreshException("table entry is null, tableName=" + tableName);
1336+
throw new ObTableEntryRefreshException("Table entry is null, tableName=" + tableName);
13391337
}
1340-
1341-
long lastRefreshTime = tableEntry.getTabletLocationLastRefreshTimeMills(tabletId);
1338+
1339+
long lastRefreshTime = tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getLastUpdateTime();
13421340
long currentTime = System.currentTimeMillis();
1343-
// 100ms
1344-
if (currentTime - lastRefreshTime < 100) {
1341+
if (currentTime - lastRefreshTime < 200) {
13451342
return tableEntry;
13461343
}
1347-
Lock tempLock = new ReentrantLock();
1348-
lock = tableEntry.refreshLockMap.putIfAbsent(tabletId, tempLock);
1349-
lock = (lock == null) ? tempLock : lock; // check the first lock
1350-
1351-
boolean acquired = lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS);
1352-
tableEntry.setTableLocationLastRefreshTimeMills(tabletId, currentTime);
13531344

1354-
if (!acquired) {
1355-
String errMsg = "try to lock table-entry refreshing timeout " + "dataSource:"
1356-
+ dataSourceName + " ,tableName:" + tableName + ", refresh:" +
1357-
" , timeout:" + tableEntryRefreshLockTimeout + ".";
1345+
Lock lock = tableEntry.refreshLockMap.computeIfAbsent(tabletId, k -> new ReentrantLock());
1346+
1347+
if (!lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS)) {
1348+
String errMsg = String.format("Try to lock table-entry refreshing timeout. DataSource: %s, TableName: %s, Timeout: %d.",
1349+
dataSourceName, tableName, tableEntryRefreshLockTimeout);
13581350
RUNTIME.error(errMsg);
13591351
throw new ObTableEntryRefreshException(errMsg);
13601352
}
1361-
tableEntry = loadTableEntryLocationWithPriority(serverRoster, //
1362-
tableEntryKey,//
1363-
tableEntry,//
1364-
tabletId,
1365-
tableEntryAcquireConnectTimeout,//
1366-
tableEntryAcquireSocketTimeout,//
1367-
serverAddressPriorityTimeout, //
1368-
serverAddressCachingTimeout, sysUA);
1369-
// prepare the table entry for weak read.
1370-
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
1353+
1354+
try {
1355+
tableEntry = loadTableEntryLocationWithPriority(serverRoster, tableEntryKey, tableEntry, tabletId,
1356+
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
1357+
serverAddressPriorityTimeout, serverAddressCachingTimeout, sysUA);
1358+
1359+
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
1360+
} finally {
1361+
lock.unlock();
1362+
}
1363+
13711364
} catch (ObTableNotExistException | ObTableServerCacheExpiredException e) {
1372-
RUNTIME.error("refreshTableEntry meet exception", e);
1365+
RUNTIME.error("RefreshTableEntry encountered an exception", e);
13731366
throw e;
13741367
} catch (Exception e) {
1368+
String errorMsg = String.format("Failed to get table entry. Key=%s, Original TableEntry=%s", tableEntryKey, tableEntry);
13751369
RUNTIME.error(LCD.convert("01-00020"), tableEntryKey, tableEntry, e);
1376-
throw new ObTableEntryRefreshException(String.format(
1377-
"failed to get table entry key=%s original tableEntry=%s ", tableEntryKey,
1378-
tableEntry), e);
1379-
} finally {
1380-
if (lock != null) {
1381-
lock.unlock();
1382-
}
1370+
throw new ObTableEntryRefreshException(errorMsg, e);
13831371
}
1384-
tableLocations.put(tableName, tableEntry);
13851372

1373+
tableLocations.put(tableName, tableEntry);
13861374
tableEntryRefreshContinuousFailureCount.set(0);
1375+
13871376
if (logger.isInfoEnabled()) {
1388-
logger.info(
1389-
"refresh table entry, dataSource: {}, tableName: {}, refresh: {} key:{} entry:{} ",
1390-
dataSourceName, tableName, true, tableEntryKey, JSON.toJSON(tableEntry));
1377+
logger.info("Refreshed table entry. DataSource: {}, TableName: {}, Key: {}, Entry: {}",
1378+
dataSourceName, tableName, tableEntryKey, JSON.toJSON(tableEntry));
13911379
}
1380+
13921381
return tableEntry;
13931382
}
13941383

@@ -1607,13 +1596,6 @@ private ReplicaLocation getPartitionLocation(TableEntry tableEntry, long partId,
16071596

16081597
private ReplicaLocation getPartitionLocation(ObPartitionLocationInfo obPartitionLocationInfo,
16091598
ObServerRoute route) {
1610-
while (obPartitionLocationInfo.getPartitionLocation() == null) {
1611-
try {
1612-
Thread.sleep(10);
1613-
} catch (InterruptedException e) {
1614-
e.printStackTrace();
1615-
}
1616-
}
16171599
return obPartitionLocationInfo.getPartitionLocation().getReplica(route);
16181600
}
16191601

@@ -1854,48 +1836,61 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
18541836
long partId, boolean waitForRefresh,
18551837
ObServerRoute route) throws Exception {
18561838
long tabletId = getTabletIdByPartId(tableEntry, partId);
1857-
ObPartitionLocationInfo obPartitionLocationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1858-
if (!obPartitionLocationInfo.initialized.get()) {
1859-
tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1860-
obPartitionLocationInfo.initialized.compareAndSet(false, true);
1861-
}
1862-
1839+
ObPartitionLocationInfo obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
1840+
18631841
ReplicaLocation replica = getPartitionLocation(obPartitionLocationInfo, route);
18641842
ObServerAddr addr = replica.getAddr();
18651843
ObTable obTable = tableRoster.get(addr);
1866-
boolean addrExpired = addr.isExpired(serverAddressCachingTimeout);
1867-
if (obTable == null) {
1868-
logger.warn("can not get ObTable by addr {}, refresh metadata.", addr);
1869-
syncRefreshMetadata();
1870-
}
1871-
if (addrExpired || obTable == null) {
1872-
if (logger.isInfoEnabled() && addrExpired) {
1873-
logger.info("server addr {} is expired, refresh tableEntry.", addr);
1874-
}
18751844

1876-
tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1877-
obPartitionLocationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1845+
if (obTable == null || addr.isExpired(serverAddressCachingTimeout)) {
1846+
if (obTable == null) {
1847+
logger.warn("Cannot get ObTable by addr {}, refreshing metadata.", addr);
1848+
syncRefreshMetadata();
1849+
}
1850+
if (addr.isExpired(serverAddressCachingTimeout)) {
1851+
logger.info("Server addr {} is expired, refreshing tableEntry.", addr);
1852+
}
1853+
1854+
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
18781855
replica = getPartitionLocation(obPartitionLocationInfo, route);
18791856
addr = replica.getAddr();
18801857
obTable = tableRoster.get(addr);
1858+
1859+
if (obTable == null) {
1860+
RUNTIME.error("Cannot get table by addr: " + addr);
1861+
throw new ObTableGetException("Cannot get table by addr: " + addr);
1862+
}
18811863
}
18821864

1883-
if (obTable == null) {
1884-
RUNTIME.error("cannot get table by addr: " + addr);
1885-
throw new ObTableGetException("cannot get table by addr: " + addr);
1865+
ObTableParam param = createTableParam(obTable, tableEntry, obPartitionLocationInfo, partId, tabletId);
1866+
addr.recordAccess();
1867+
return new ObPair<>(tabletId, param);
1868+
}
1869+
1870+
private ObPartitionLocationInfo getOrRefreshPartitionInfo(TableEntry tableEntry,
1871+
String tableName, long tabletId)
1872+
throws Exception {
1873+
ObPartitionLocationInfo obPartitionLocationInfo = tableEntry.getPartitionEntry()
1874+
.getPartitionInfo(tabletId);
1875+
if (!obPartitionLocationInfo.initialized.get()) {
1876+
tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1877+
obPartitionLocationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1878+
obPartitionLocationInfo.initializationLatch.await();
18861879
}
1880+
return obPartitionLocationInfo;
1881+
}
18871882

1883+
private ObTableParam createTableParam(ObTable obTable, TableEntry tableEntry,
1884+
ObPartitionLocationInfo obPartitionLocationInfo,
1885+
long partId, long tabletId) {
18881886
ObTableParam param = new ObTableParam(obTable);
1889-
param.setPartId(partId); // used in getTable(), 4.x may change the origin partId
1887+
param.setPartId(partId);
18901888
if (ObGlobal.obVsnMajor() >= 4 && tableEntry != null) {
18911889
param.setLsId(obPartitionLocationInfo.getTabletLsId());
18921890
}
1893-
18941891
param.setTableId(tableEntry.getTableId());
18951892
param.setPartitionId(tabletId);
1896-
1897-
addr.recordAccess();
1898-
return new ObPair<>(tabletId, param);
1893+
return param;
18991894
}
19001895

19011896
/**
@@ -1920,11 +1915,7 @@ private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry table
19201915

19211916
if (!tableEntry.isPartitionTable() || tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ZERO) {
19221917
long tabletId = getTabletIdByPartId(tableEntry, 0L);
1923-
ObPartitionLocationInfo locationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1924-
if (!locationInfo.initialized.get()) {
1925-
refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1926-
locationInfo.initialized.compareAndSet(false, true);
1927-
}
1918+
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
19281919
replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route)));
19291920
return replicas;
19301921
}
@@ -1934,11 +1925,7 @@ private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry table
19341925

19351926
for (Long partId : partIds) {
19361927
long tabletId = getTabletIdByPartId(tableEntry, partId);
1937-
ObPartitionLocationInfo locationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1938-
if (!locationInfo.initialized.get()) {
1939-
refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1940-
locationInfo.initialized.compareAndSet(false, true);
1941-
}
1928+
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
19421929
replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route)));
19431930
}
19441931

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,53 +1160,54 @@ private static ObPartitionEntry getPartitionLocationFromResultSetByTablet(TableE
11601160
long tabletId)
11611161
throws SQLException,
11621162
ObTablePartitionLocationRefreshException {
1163+
11631164
if (partitionEntry == null || tableEntry == null) {
11641165
throw new IllegalArgumentException("partitionEntry: " + partitionEntry
11651166
+ " tableEntry: " + tableEntry);
11661167
}
1168+
11671169
ObPartitionLocationInfo partitionLocationInfo = partitionEntry.getPartitionInfo(tabletId);
1170+
1171+
partitionLocationInfo.rwLock.writeLock().lock();
11681172
try {
1169-
partitionLocationInfo.rwLock.writeLock().lock();
11701173
while (rs.next()) {
11711174
ReplicaLocation replica = buildReplicaLocation(rs);
1172-
long partitionId;
1173-
long lsId;
1174-
if (ObGlobal.obVsnMajor() >= 4) {
1175-
partitionId = rs.getLong("tablet_id");
1176-
lsId = rs.getLong("ls_id");
1177-
if (rs.wasNull()) {
1178-
lsId = INVALID_LS_ID; // non-partitioned table
1179-
}
1180-
partitionLocationInfo.setTabletLsId(lsId);
1181-
} else {
1182-
partitionId = rs.getLong("partition_id");
1183-
if (tableEntry.isPartitionTable()
1184-
&& null != tableEntry.getPartitionInfo().getSubPartDesc()) {
1185-
partitionId = ObPartIdCalculator.getPartIdx(partitionId, tableEntry
1186-
.getPartitionInfo().getSubPartDesc().getPartNum());
1187-
}
1175+
1176+
long partitionId = (ObGlobal.obVsnMajor() >= 4) ? rs.getLong("tablet_id") : rs
1177+
.getLong("partition_id");
1178+
long lsId = ObGlobal.obVsnMajor() >= 4 ? rs.getLong("ls_id") : INVALID_LS_ID;
1179+
if (rs.wasNull() && ObGlobal.obVsnMajor() >= 4) {
1180+
lsId = INVALID_LS_ID; // For non-partitioned table
11881181
}
1182+
partitionLocationInfo.setTabletLsId(lsId);
1183+
1184+
if (ObGlobal.obVsnMajor() < 4 && tableEntry.isPartitionTable()
1185+
&& tableEntry.getPartitionInfo().getSubPartDesc() != null) {
1186+
partitionId = ObPartIdCalculator.getPartIdx(partitionId, tableEntry
1187+
.getPartitionInfo().getSubPartDesc().getPartNum());
1188+
}
1189+
11891190
if (!replica.isValid()) {
11901191
RUNTIME
11911192
.warn(format(
1192-
"replica is invalid, continue, replica=%s, partitionId/tabletId=%d, tableId=%d",
1193+
"Replica is invalid; continuing. Replica=%s, PartitionId/TabletId=%d, TableId=%d",
11931194
replica, partitionId, tableEntry.getTableId()));
11941195
continue;
11951196
}
11961197
ObPartitionLocation location = partitionLocationInfo.getPartitionLocation();
1197-
11981198
if (location == null) {
11991199
location = new ObPartitionLocation();
1200-
partitionLocationInfo.setPartitionLocation(location);
1200+
partitionLocationInfo.updateLocation(location);
12011201
}
12021202
location.addReplicaLocation(replica);
1203+
1204+
if (partitionLocationInfo.initialized.compareAndSet(false, true)) {
1205+
partitionLocationInfo.initializationLatch.countDown();
1206+
}
12031207
}
12041208
} finally {
12051209
partitionLocationInfo.rwLock.writeLock().unlock();
12061210
}
1207-
// TODO: v3
1208-
if (ObGlobal.obVsnMajor() < 4) {
1209-
}
12101211
return partitionEntry;
12111212
}
12121213

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionEntry.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,9 @@ public class ObPartitionEntry {
3636
// 写的场景就是更新,读的场景是正常的请求执行,需要保证读写的安全性,更新的时候一方面是保证线程安全,另一方面还需要保证不能频繁更新
3737
private ConcurrentHashMap<Long, ObPartitionLocationInfo> partitionInfos = new ConcurrentHashMap<>();
3838

39-
39+
4040
public ObPartitionLocationInfo getPartitionInfo(long tabletId) {
41-
if (!partitionInfos.containsKey(tabletId)) {
42-
ObPartitionLocationInfo partitionInfo = new ObPartitionLocationInfo();
43-
partitionInfos.put(tabletId, partitionInfo);
44-
}
45-
return partitionInfos.get(tabletId);
41+
return partitionInfos.computeIfAbsent(tabletId, id -> new ObPartitionLocationInfo());
4642
}
4743

4844
public Map<Long, ObPartitionLocation> getPartitionLocation() {

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocationInfo.java

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,19 @@
1717

1818
package com.alipay.oceanbase.rpc.location.model.partition;
1919

20+
import java.util.concurrent.CountDownLatch;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.concurrent.locks.ReentrantReadWriteLock;
2223

2324
import static com.alipay.oceanbase.rpc.protocol.payload.Constants.OB_INVALID_ID;
2425

25-
// 这个类不做线程安全之类的处理
2626
public class ObPartitionLocationInfo {
27-
private ObPartitionLocation partitionLocation = null;
28-
private Long tabletLsId = OB_INVALID_ID;
29-
private Long lastUpdateTime; // 最后更新时间
30-
public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); // 读写锁
31-
public AtomicBoolean initialized = new AtomicBoolean(false);
32-
33-
public ObPartitionLocationInfo() {
34-
this.lastUpdateTime = System.currentTimeMillis(); // 初始化为当前时间
35-
}
27+
private ObPartitionLocation partitionLocation = null;
28+
private Long tabletLsId = OB_INVALID_ID;
29+
private Long lastUpdateTime = 0L;
30+
public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
31+
public AtomicBoolean initialized = new AtomicBoolean(false);
32+
public final CountDownLatch initializationLatch = new CountDownLatch(1);
3633

3734
public ObPartitionLocation getPartitionLocation() {
3835
rwLock.readLock().lock();
@@ -43,18 +40,9 @@ public ObPartitionLocation getPartitionLocation() {
4340
}
4441
}
4542

46-
public void setPartitionLocation(ObPartitionLocation partitionLocation) {
47-
this.partitionLocation = partitionLocation;
48-
}
49-
5043
public void updateLocation(ObPartitionLocation newLocation) {
51-
rwLock.writeLock().lock();
52-
try {
53-
this.partitionLocation = newLocation;
54-
this.lastUpdateTime = System.currentTimeMillis();
55-
} finally {
56-
rwLock.writeLock().unlock();
57-
}
44+
this.partitionLocation = newLocation;
45+
this.lastUpdateTime = System.currentTimeMillis();
5846
}
5947

6048
public Long getTabletLsId() {

0 commit comments

Comments
 (0)