Skip to content

Commit 6874323

Browse files
committed
Fix infinite loop caused by removed reference
1 parent a82d5d0 commit 6874323

File tree

4 files changed

+103
-131
lines changed

4 files changed

+103
-131
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 68 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,64 +1330,53 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName)
13301330

13311331
public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String tableName, Long tabletId) throws ObTableAuthException {
13321332
TableEntryKey tableEntryKey = new TableEntryKey(clusterName, tenantName, database, tableName);
1333-
Lock lock = null;
13341333
try {
1335-
// if table entry is null, throw exception
13361334
if (tableEntry == null) {
1337-
throw new ObTableEntryRefreshException("table entry is null, tableName=" + tableName);
1335+
throw new ObTableEntryRefreshException("Table entry is null, tableName=" + tableName);
13381336
}
1339-
1340-
long lastRefreshTime = tableEntry.getTabletLocationLastRefreshTimeMills(tabletId);
1337+
1338+
long lastRefreshTime = tableEntry.getPartitionEntry().getPartitionInfo(tabletId).getLastUpdateTime();
13411339
long currentTime = System.currentTimeMillis();
1342-
// 100ms
1343-
if (currentTime - lastRefreshTime < 100) {
1340+
if (currentTime - lastRefreshTime < 200) {
13441341
return tableEntry;
13451342
}
1346-
Lock tempLock = new ReentrantLock();
1347-
lock = tableEntry.refreshLockMap.putIfAbsent(tabletId, tempLock);
1348-
lock = (lock == null) ? tempLock : lock; // check the first lock
1349-
1350-
boolean acquired = lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS);
1351-
tableEntry.setTableLocationLastRefreshTimeMills(tabletId, currentTime);
13521343

1353-
if (!acquired) {
1354-
String errMsg = "try to lock table-entry refreshing timeout " + "dataSource:"
1355-
+ dataSourceName + " ,tableName:" + tableName + ", refresh:" +
1356-
" , timeout:" + tableEntryRefreshLockTimeout + ".";
1344+
Lock lock = tableEntry.refreshLockMap.computeIfAbsent(tabletId, k -> new ReentrantLock());
1345+
1346+
if (!lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS)) {
1347+
String errMsg = String.format("Try to lock table-entry refreshing timeout. DataSource: %s, TableName: %s, Timeout: %d.",
1348+
dataSourceName, tableName, tableEntryRefreshLockTimeout);
13571349
RUNTIME.error(errMsg);
13581350
throw new ObTableEntryRefreshException(errMsg);
13591351
}
1360-
tableEntry = loadTableEntryLocationWithPriority(serverRoster, //
1361-
tableEntryKey,//
1362-
tableEntry,//
1363-
tabletId,
1364-
tableEntryAcquireConnectTimeout,//
1365-
tableEntryAcquireSocketTimeout,//
1366-
serverAddressPriorityTimeout, //
1367-
serverAddressCachingTimeout, sysUA);
1368-
// prepare the table entry for weak read.
1369-
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
1352+
1353+
try {
1354+
tableEntry = loadTableEntryLocationWithPriority(serverRoster, tableEntryKey, tableEntry, tabletId,
1355+
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
1356+
serverAddressPriorityTimeout, serverAddressCachingTimeout, sysUA);
1357+
1358+
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
1359+
} finally {
1360+
lock.unlock();
1361+
}
1362+
13701363
} catch (ObTableNotExistException | ObTableServerCacheExpiredException e) {
1371-
RUNTIME.error("refreshTableEntry meet exception", e);
1364+
RUNTIME.error("RefreshTableEntry encountered an exception", e);
13721365
throw e;
13731366
} catch (Exception e) {
1367+
String errorMsg = String.format("Failed to get table entry. Key=%s, Original TableEntry=%s", tableEntryKey, tableEntry);
13741368
RUNTIME.error(LCD.convert("01-00020"), tableEntryKey, tableEntry, e);
1375-
throw new ObTableEntryRefreshException(String.format(
1376-
"failed to get table entry key=%s original tableEntry=%s ", tableEntryKey,
1377-
tableEntry), e);
1378-
} finally {
1379-
if (lock != null) {
1380-
lock.unlock();
1381-
}
1369+
throw new ObTableEntryRefreshException(errorMsg, e);
13821370
}
1383-
tableLocations.put(tableName, tableEntry);
13841371

1372+
tableLocations.put(tableName, tableEntry);
13851373
tableEntryRefreshContinuousFailureCount.set(0);
1374+
13861375
if (logger.isInfoEnabled()) {
1387-
logger.info(
1388-
"refresh table entry, dataSource: {}, tableName: {}, refresh: {} key:{} entry:{} ",
1389-
dataSourceName, tableName, true, tableEntryKey, JSON.toJSON(tableEntry));
1376+
logger.info("Refreshed table entry. DataSource: {}, TableName: {}, Key: {}, Entry: {}",
1377+
dataSourceName, tableName, tableEntryKey, JSON.toJSON(tableEntry));
13901378
}
1379+
13911380
return tableEntry;
13921381
}
13931382

@@ -1606,13 +1595,6 @@ private ReplicaLocation getPartitionLocation(TableEntry tableEntry, long partId,
16061595

16071596
private ReplicaLocation getPartitionLocation(ObPartitionLocationInfo obPartitionLocationInfo,
16081597
ObServerRoute route) {
1609-
while (obPartitionLocationInfo.getPartitionLocation() == null) {
1610-
try {
1611-
Thread.sleep(10);
1612-
} catch (InterruptedException e) {
1613-
e.printStackTrace();
1614-
}
1615-
}
16161598
return obPartitionLocationInfo.getPartitionLocation().getReplica(route);
16171599
}
16181600

@@ -1844,48 +1826,61 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
18441826
long partId, boolean waitForRefresh,
18451827
ObServerRoute route) throws Exception {
18461828
long tabletId = getTabletIdByPartId(tableEntry, partId);
1847-
ObPartitionLocationInfo obPartitionLocationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1848-
if (!obPartitionLocationInfo.initialized.get()) {
1849-
tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1850-
obPartitionLocationInfo.initialized.compareAndSet(false, true);
1851-
}
1852-
1829+
ObPartitionLocationInfo obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
1830+
18531831
ReplicaLocation replica = getPartitionLocation(obPartitionLocationInfo, route);
18541832
ObServerAddr addr = replica.getAddr();
18551833
ObTable obTable = tableRoster.get(addr);
1856-
boolean addrExpired = addr.isExpired(serverAddressCachingTimeout);
1857-
if (obTable == null) {
1858-
logger.warn("can not get ObTable by addr {}, refresh metadata.", addr);
1859-
syncRefreshMetadata();
1860-
}
1861-
if (addrExpired || obTable == null) {
1862-
if (logger.isInfoEnabled() && addrExpired) {
1863-
logger.info("server addr {} is expired, refresh tableEntry.", addr);
1864-
}
18651834

1866-
tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1867-
obPartitionLocationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1835+
if (obTable == null || addr.isExpired(serverAddressCachingTimeout)) {
1836+
if (obTable == null) {
1837+
logger.warn("Cannot get ObTable by addr {}, refreshing metadata.", addr);
1838+
syncRefreshMetadata();
1839+
}
1840+
if (addr.isExpired(serverAddressCachingTimeout)) {
1841+
logger.info("Server addr {} is expired, refreshing tableEntry.", addr);
1842+
}
1843+
1844+
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
18681845
replica = getPartitionLocation(obPartitionLocationInfo, route);
18691846
addr = replica.getAddr();
18701847
obTable = tableRoster.get(addr);
1848+
1849+
if (obTable == null) {
1850+
RUNTIME.error("Cannot get table by addr: " + addr);
1851+
throw new ObTableGetException("Cannot get table by addr: " + addr);
1852+
}
18711853
}
18721854

1873-
if (obTable == null) {
1874-
RUNTIME.error("cannot get table by addr: " + addr);
1875-
throw new ObTableGetException("cannot get table by addr: " + addr);
1855+
ObTableParam param = createTableParam(obTable, tableEntry, obPartitionLocationInfo, partId, tabletId);
1856+
addr.recordAccess();
1857+
return new ObPair<>(tabletId, param);
1858+
}
1859+
1860+
private ObPartitionLocationInfo getOrRefreshPartitionInfo(TableEntry tableEntry,
1861+
String tableName, long tabletId)
1862+
throws Exception {
1863+
ObPartitionLocationInfo obPartitionLocationInfo = tableEntry.getPartitionEntry()
1864+
.getPartitionInfo(tabletId);
1865+
if (!obPartitionLocationInfo.initialized.get()) {
1866+
tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1867+
obPartitionLocationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1868+
obPartitionLocationInfo.initializationLatch.await();
18761869
}
1870+
return obPartitionLocationInfo;
1871+
}
18771872

1873+
private ObTableParam createTableParam(ObTable obTable, TableEntry tableEntry,
1874+
ObPartitionLocationInfo obPartitionLocationInfo,
1875+
long partId, long tabletId) {
18781876
ObTableParam param = new ObTableParam(obTable);
1879-
param.setPartId(partId); // used in getTable(), 4.x may change the origin partId
1877+
param.setPartId(partId);
18801878
if (ObGlobal.obVsnMajor() >= 4 && tableEntry != null) {
18811879
param.setLsId(obPartitionLocationInfo.getTabletLsId());
18821880
}
1883-
18841881
param.setTableId(tableEntry.getTableId());
18851882
param.setPartitionId(tabletId);
1886-
1887-
addr.recordAccess();
1888-
return new ObPair<>(tabletId, param);
1883+
return param;
18891884
}
18901885

18911886
/**
@@ -1910,11 +1905,7 @@ private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry table
19101905

19111906
if (!tableEntry.isPartitionTable() || tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ZERO) {
19121907
long tabletId = getTabletIdByPartId(tableEntry, 0L);
1913-
ObPartitionLocationInfo locationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1914-
if (!locationInfo.initialized.get()) {
1915-
refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1916-
locationInfo.initialized.compareAndSet(false, true);
1917-
}
1908+
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
19181909
replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route)));
19191910
return replicas;
19201911
}
@@ -1924,11 +1915,7 @@ private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry table
19241915

19251916
for (Long partId : partIds) {
19261917
long tabletId = getTabletIdByPartId(tableEntry, partId);
1927-
ObPartitionLocationInfo locationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1928-
if (!locationInfo.initialized.get()) {
1929-
refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1930-
locationInfo.initialized.compareAndSet(false, true);
1931-
}
1918+
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
19321919
replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route)));
19331920
}
19341921

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,53 +1160,54 @@ private static ObPartitionEntry getPartitionLocationFromResultSetByTablet(TableE
11601160
long tabletId)
11611161
throws SQLException,
11621162
ObTablePartitionLocationRefreshException {
1163+
11631164
if (partitionEntry == null || tableEntry == null) {
11641165
throw new IllegalArgumentException("partitionEntry: " + partitionEntry
11651166
+ " tableEntry: " + tableEntry);
11661167
}
1168+
11671169
ObPartitionLocationInfo partitionLocationInfo = partitionEntry.getPartitionInfo(tabletId);
1170+
1171+
partitionLocationInfo.rwLock.writeLock().lock();
11681172
try {
1169-
partitionLocationInfo.rwLock.writeLock().lock();
11701173
while (rs.next()) {
11711174
ReplicaLocation replica = buildReplicaLocation(rs);
1172-
long partitionId;
1173-
long lsId;
1174-
if (ObGlobal.obVsnMajor() >= 4) {
1175-
partitionId = rs.getLong("tablet_id");
1176-
lsId = rs.getLong("ls_id");
1177-
if (rs.wasNull()) {
1178-
lsId = INVALID_LS_ID; // non-partitioned table
1179-
}
1180-
partitionLocationInfo.setTabletLsId(lsId);
1181-
} else {
1182-
partitionId = rs.getLong("partition_id");
1183-
if (tableEntry.isPartitionTable()
1184-
&& null != tableEntry.getPartitionInfo().getSubPartDesc()) {
1185-
partitionId = ObPartIdCalculator.getPartIdx(partitionId, tableEntry
1186-
.getPartitionInfo().getSubPartDesc().getPartNum());
1187-
}
1175+
1176+
long partitionId = (ObGlobal.obVsnMajor() >= 4) ? rs.getLong("tablet_id") : rs
1177+
.getLong("partition_id");
1178+
long lsId = ObGlobal.obVsnMajor() >= 4 ? rs.getLong("ls_id") : INVALID_LS_ID;
1179+
if (rs.wasNull() && ObGlobal.obVsnMajor() >= 4) {
1180+
lsId = INVALID_LS_ID; // For non-partitioned table
11881181
}
1182+
partitionLocationInfo.setTabletLsId(lsId);
1183+
1184+
if (ObGlobal.obVsnMajor() < 4 && tableEntry.isPartitionTable()
1185+
&& tableEntry.getPartitionInfo().getSubPartDesc() != null) {
1186+
partitionId = ObPartIdCalculator.getPartIdx(partitionId, tableEntry
1187+
.getPartitionInfo().getSubPartDesc().getPartNum());
1188+
}
1189+
11891190
if (!replica.isValid()) {
11901191
RUNTIME
11911192
.warn(format(
1192-
"replica is invalid, continue, replica=%s, partitionId/tabletId=%d, tableId=%d",
1193+
"Replica is invalid; continuing. Replica=%s, PartitionId/TabletId=%d, TableId=%d",
11931194
replica, partitionId, tableEntry.getTableId()));
11941195
continue;
11951196
}
11961197
ObPartitionLocation location = partitionLocationInfo.getPartitionLocation();
1197-
11981198
if (location == null) {
11991199
location = new ObPartitionLocation();
1200-
partitionLocationInfo.setPartitionLocation(location);
1200+
partitionLocationInfo.updateLocation(location);
12011201
}
12021202
location.addReplicaLocation(replica);
1203+
1204+
if (partitionLocationInfo.initialized.compareAndSet(false, true)) {
1205+
partitionLocationInfo.initializationLatch.countDown();
1206+
}
12031207
}
12041208
} finally {
12051209
partitionLocationInfo.rwLock.writeLock().unlock();
12061210
}
1207-
// TODO: v3
1208-
if (ObGlobal.obVsnMajor() < 4) {
1209-
}
12101211
return partitionEntry;
12111212
}
12121213

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionEntry.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,9 @@ public class ObPartitionEntry {
3636
// 写的场景就是更新,读的场景是正常的请求执行,需要保证读写的安全性,更新的时候一方面是保证线程安全,另一方面还需要保证不能频繁更新
3737
private ConcurrentHashMap<Long, ObPartitionLocationInfo> partitionInfos = new ConcurrentHashMap<>();
3838

39-
39+
4040
public ObPartitionLocationInfo getPartitionInfo(long tabletId) {
41-
if (!partitionInfos.containsKey(tabletId)) {
42-
ObPartitionLocationInfo partitionInfo = new ObPartitionLocationInfo();
43-
partitionInfos.put(tabletId, partitionInfo);
44-
}
45-
return partitionInfos.get(tabletId);
41+
return partitionInfos.computeIfAbsent(tabletId, id -> new ObPartitionLocationInfo());
4642
}
4743

4844
public Map<Long, ObPartitionLocation> getPartitionLocation() {

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocationInfo.java

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,19 @@
1717

1818
package com.alipay.oceanbase.rpc.location.model.partition;
1919

20+
import java.util.concurrent.CountDownLatch;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.concurrent.locks.ReentrantReadWriteLock;
2223

2324
import static com.alipay.oceanbase.rpc.protocol.payload.Constants.OB_INVALID_ID;
2425

25-
// 这个类不做线程安全之类的处理
2626
public class ObPartitionLocationInfo {
27-
private ObPartitionLocation partitionLocation = null;
28-
private Long tabletLsId = OB_INVALID_ID;
29-
private Long lastUpdateTime; // 最后更新时间
30-
public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); // 读写锁
31-
public AtomicBoolean initialized = new AtomicBoolean(false);
32-
33-
public ObPartitionLocationInfo() {
34-
this.lastUpdateTime = System.currentTimeMillis(); // 初始化为当前时间
35-
}
27+
private ObPartitionLocation partitionLocation = null;
28+
private Long tabletLsId = OB_INVALID_ID;
29+
private Long lastUpdateTime = 0L;
30+
public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
31+
public AtomicBoolean initialized = new AtomicBoolean(false);
32+
public final CountDownLatch initializationLatch = new CountDownLatch(1);
3633

3734
public ObPartitionLocation getPartitionLocation() {
3835
rwLock.readLock().lock();
@@ -43,18 +40,9 @@ public ObPartitionLocation getPartitionLocation() {
4340
}
4441
}
4542

46-
public void setPartitionLocation(ObPartitionLocation partitionLocation) {
47-
this.partitionLocation = partitionLocation;
48-
}
49-
5043
public void updateLocation(ObPartitionLocation newLocation) {
51-
rwLock.writeLock().lock();
52-
try {
53-
this.partitionLocation = newLocation;
54-
this.lastUpdateTime = System.currentTimeMillis();
55-
} finally {
56-
rwLock.writeLock().unlock();
57-
}
44+
this.partitionLocation = newLocation;
45+
this.lastUpdateTime = System.currentTimeMillis();
5846
}
5947

6048
public Long getTabletLsId() {

0 commit comments

Comments
 (0)