Skip to content

Commit 15ee532

Browse files
committed
partical refresh
1 parent eaabd65 commit 15ee532

File tree

6 files changed

+462
-148
lines changed

6 files changed

+462
-148
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 158 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1242,8 +1242,7 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r
12421242
// the server roster is ordered by priority
12431243
long punishInterval = (long) (tableEntryRefreshIntervalBase * Math.pow(2,
12441244
-serverRoster.getMaxPriority()));
1245-
punishInterval = punishInterval <= tableEntryRefreshIntervalCeiling ? punishInterval
1246-
: tableEntryRefreshIntervalCeiling;
1245+
punishInterval = Math.min(punishInterval, tableEntryRefreshIntervalCeiling);
12471246
// control refresh frequency less than 100 milli second
12481247
// just in case of connecting to OB Server failed or change master
12491248
long interval = System.currentTimeMillis() - tableEntry.getRefreshTimeMills();
@@ -1268,8 +1267,7 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r
12681267
}
12691268

12701269
int serverSize = serverRoster.getMembers().size();
1271-
int refreshTryTimes = tableEntryRefreshTryTimes > serverSize ? serverSize
1272-
: tableEntryRefreshTryTimes;
1270+
int refreshTryTimes = Math.min(tableEntryRefreshTryTimes, serverSize);
12731271

12741272
for (int i = 0; i < refreshTryTimes; i++) {
12751273
try {
@@ -1331,6 +1329,69 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName)
13311329
return refreshTableEntry(tableEntry, tableName, false);
13321330
}
13331331

1332+
public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String tableName, Long tabletId) throws ObTableAuthException {
1333+
TableEntryKey tableEntryKey = new TableEntryKey(clusterName, tenantName, database, tableName);
1334+
Lock lock = null;
1335+
try {
1336+
// if table entry is null, throw exception
1337+
if (tableEntry == null) {
1338+
throw new ObTableEntryRefreshException("table entry is null, tableName=" + tableName);
1339+
}
1340+
1341+
long lastRefreshTime = tableEntry.getTabletLocationLastRefreshTimeMills(tabletId);
1342+
long currentTime = System.currentTimeMillis();
1343+
// 100ms
1344+
if (currentTime - lastRefreshTime < 100) {
1345+
return tableEntry;
1346+
}
1347+
Lock tempLock = new ReentrantLock();
1348+
lock = tableEntry.refreshLockMap.putIfAbsent(tabletId, tempLock);
1349+
lock = (lock == null) ? tempLock : lock; // check the first lock
1350+
1351+
boolean acquired = lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS);
1352+
tableEntry.setTableLocationLastRefreshTimeMills(tabletId, currentTime);
1353+
1354+
if (!acquired) {
1355+
String errMsg = "try to lock table-entry refreshing timeout " + "dataSource:"
1356+
+ dataSourceName + " ,tableName:" + tableName + ", refresh:" +
1357+
" , timeout:" + tableEntryRefreshLockTimeout + ".";
1358+
RUNTIME.error(errMsg);
1359+
throw new ObTableEntryRefreshException(errMsg);
1360+
}
1361+
tableEntry = loadTableEntryLocationWithPriority(serverRoster, //
1362+
tableEntryKey,//
1363+
tableEntry,//
1364+
tabletId,
1365+
tableEntryAcquireConnectTimeout,//
1366+
tableEntryAcquireSocketTimeout,//
1367+
serverAddressPriorityTimeout, //
1368+
serverAddressCachingTimeout, sysUA);
1369+
// prepare the table entry for weak read.
1370+
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
1371+
} catch (ObTableNotExistException | ObTableServerCacheExpiredException e) {
1372+
RUNTIME.error("refreshTableEntry meet exception", e);
1373+
throw e;
1374+
} catch (Exception e) {
1375+
RUNTIME.error(LCD.convert("01-00020"), tableEntryKey, tableEntry, e);
1376+
throw new ObTableEntryRefreshException(String.format(
1377+
"failed to get table entry key=%s original tableEntry=%s ", tableEntryKey,
1378+
tableEntry), e);
1379+
} finally {
1380+
if (lock != null) {
1381+
lock.unlock();
1382+
}
1383+
}
1384+
tableLocations.put(tableName, tableEntry);
1385+
1386+
tableEntryRefreshContinuousFailureCount.set(0);
1387+
if (logger.isInfoEnabled()) {
1388+
logger.info(
1389+
"refresh table entry, dataSource: {}, tableName: {}, refresh: {} key:{} entry:{} ",
1390+
dataSourceName, tableName, true, tableEntryKey, JSON.toJSON(tableEntry));
1391+
}
1392+
return tableEntry;
1393+
}
1394+
13341395
/**
13351396
* 刷新 table entry 元数据
13361397
* @param tableEntry
@@ -1346,13 +1407,7 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName, bo
13461407
try {
13471408
// if table entry is exist we just need to refresh table locations
13481409
if (tableEntry != null && !fetchAll) {
1349-
tableEntry = loadTableEntryLocationWithPriority(serverRoster, //
1350-
tableEntryKey,//
1351-
tableEntry,//
1352-
tableEntryAcquireConnectTimeout,//
1353-
tableEntryAcquireSocketTimeout,//
1354-
serverAddressPriorityTimeout, //
1355-
serverAddressCachingTimeout, sysUA);
1410+
// do nothing
13561411
} else {
13571412
// if table entry is not exist we should fetch partition info and table locations
13581413
tableEntry = loadTableEntryWithPriority(serverRoster, //
@@ -1361,7 +1416,6 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName, bo
13611416
tableEntryAcquireSocketTimeout,//
13621417
serverAddressPriorityTimeout,//
13631418
serverAddressCachingTimeout, sysUA);
1364-
13651419
if (tableEntry.isPartitionTable()) {
13661420
switch (runningMode) {
13671421
case HBASE:
@@ -1545,17 +1599,22 @@ private ObPair<Long, ReplicaLocation> getPartitionReplica(TableEntry tableEntry,
15451599
*/
15461600
private ReplicaLocation getPartitionLocation(TableEntry tableEntry, long partId,
15471601
ObServerRoute route) {
1548-
if (ObGlobal.obVsnMajor() >= 4 && tableEntry.isPartitionTable()) {
1549-
ObPartitionInfo partInfo = tableEntry.getPartitionInfo();
1550-
Map<Long, Long> tabletIdMap = partInfo.getPartTabletIdMap();
1551-
long partIdx = tableEntry.getPartIdx(partId);
1552-
long TabletId = tabletIdMap.get(partIdx);
1553-
return tableEntry.getPartitionEntry().getPartitionLocationWithTabletId(TabletId)
1554-
.getReplica(route);
1555-
} else {
1556-
return tableEntry.getPartitionEntry().getPartitionLocationWithPartId(partId)
1557-
.getReplica(route);
1602+
long tabletId = getTabletIdByPartId(tableEntry, partId);
1603+
return tableEntry.getPartitionEntry().getPartitionLocationWithTabletId(tabletId)
1604+
.getReplica(route);
1605+
1606+
}
1607+
1608+
private ReplicaLocation getPartitionLocation(ObPartitionLocationInfo obPartitionLocationInfo,
1609+
ObServerRoute route) {
1610+
while (obPartitionLocationInfo.getPartitionLocation() == null) {
1611+
try {
1612+
Thread.sleep(10);
1613+
} catch (InterruptedException e) {
1614+
e.printStackTrace();
1615+
}
15581616
}
1617+
return obPartitionLocationInfo.getPartitionLocation().getReplica(route);
15591618
}
15601619

15611620
/**
@@ -1794,11 +1853,14 @@ public ObTable addTable(ObServerAddr addr){
17941853
public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry tableEntry,
17951854
long partId, boolean waitForRefresh,
17961855
ObServerRoute route) throws Exception {
1797-
ObPair<Long, ReplicaLocation> partitionReplica = getPartitionReplica(tableEntry, partId,
1798-
route);
1799-
1800-
ReplicaLocation replica = partitionReplica.getRight();
1801-
1856+
long tabletId = getTabletIdByPartId(tableEntry, partId);
1857+
ObPartitionLocationInfo obPartitionLocationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1858+
if (!obPartitionLocationInfo.initialized.get()) {
1859+
tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1860+
obPartitionLocationInfo.initialized.compareAndSet(false, true);
1861+
}
1862+
1863+
ReplicaLocation replica = getPartitionLocation(obPartitionLocationInfo, route);
18021864
ObServerAddr addr = replica.getAddr();
18031865
ObTable obTable = tableRoster.get(addr);
18041866
boolean addrExpired = addr.isExpired(serverAddressCachingTimeout);
@@ -1811,8 +1873,9 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
18111873
logger.info("server addr {} is expired, refresh tableEntry.", addr);
18121874
}
18131875

1814-
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
1815-
replica = getPartitionReplica(tableEntry, partId, route).getRight();
1876+
tableEntry = refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1877+
obPartitionLocationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1878+
replica = getPartitionLocation(obPartitionLocationInfo, route);
18161879
addr = replica.getAddr();
18171880
obTable = tableRoster.get(addr);
18181881
}
@@ -1825,17 +1888,14 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
18251888
ObTableParam param = new ObTableParam(obTable);
18261889
param.setPartId(partId); // used in getTable(), 4.x may change the origin partId
18271890
if (ObGlobal.obVsnMajor() >= 4 && tableEntry != null) {
1828-
long partIdx = tableEntry.getPartIdx(partId);
1829-
partId = tableEntry.isPartitionTable() ? tableEntry.getPartitionInfo()
1830-
.getPartTabletIdMap().get(partIdx) : partId;
1831-
param.setLsId(tableEntry.getPartitionEntry().getLsId(partId));
1891+
param.setLsId(obPartitionLocationInfo.getTabletLsId());
18321892
}
18331893

18341894
param.setTableId(tableEntry.getTableId());
1835-
param.setPartitionId(partId);
1895+
param.setPartitionId(tabletId);
18361896

18371897
addr.recordAccess();
1838-
return new ObPair<Long, ObTableParam>(partitionReplica.getLeft(), param);
1898+
return new ObPair<>(tabletId, param);
18391899
}
18401900

18411901
/**
@@ -1850,39 +1910,66 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
18501910
* @throws Exception
18511911
*/
18521912
private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry tableEntry,
1913+
String tableName,
18531914
Row startRow,
18541915
boolean startIncluded,
18551916
Row endRow,
18561917
boolean endIncluded,
1857-
ObServerRoute route)
1858-
throws Exception {
1859-
// non partition
1860-
List<ObPair<Long, ReplicaLocation>> replicas = new ArrayList<ObPair<Long, ReplicaLocation>>();
1861-
if (!tableEntry.isPartitionTable()
1862-
|| tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ZERO) {
1863-
replicas.add(new ObPair<Long, ReplicaLocation>(0L, getPartitionLocation(tableEntry, 0L,
1864-
route)));
1865-
return replicas;
1866-
} else if (tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ONE) {
1867-
List<Long> partIds = tableEntry.getPartitionInfo().getFirstPartDesc()
1868-
.getPartIds(startRow, startIncluded, endRow, endIncluded);
1869-
for (Long partId : partIds) {
1870-
replicas.add(new ObPair<Long, ReplicaLocation>(partId, getPartitionLocation(
1871-
tableEntry, partId, route)));
1918+
ObServerRoute route) throws Exception {
1919+
List<ObPair<Long, ReplicaLocation>> replicas = new ArrayList<>();
1920+
1921+
if (!tableEntry.isPartitionTable() || tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ZERO) {
1922+
long tabletId = getTabletIdByPartId(tableEntry, 0L);
1923+
ObPartitionLocationInfo locationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1924+
if (!locationInfo.initialized.get()) {
1925+
refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1926+
locationInfo.initialized.compareAndSet(false, true);
18721927
}
1873-
} else if (tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_TWO) {
1874-
List<Long> partIds = getPartitionsForLevelTwo(tableEntry, startRow, startIncluded,
1875-
endRow, endIncluded);
1876-
for (Long partId : partIds) {
1877-
replicas.add(new ObPair<Long, ReplicaLocation>(partId, getPartitionLocation(
1878-
tableEntry, partId, route)));
1928+
replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route)));
1929+
return replicas;
1930+
}
1931+
1932+
ObPartitionLevel partitionLevel = tableEntry.getPartitionInfo().getLevel();
1933+
List<Long> partIds = getPartitionTablePartitionIds(tableEntry, startRow, startIncluded, endRow, endIncluded, partitionLevel);
1934+
1935+
for (Long partId : partIds) {
1936+
long tabletId = getTabletIdByPartId(tableEntry, partId);
1937+
ObPartitionLocationInfo locationInfo = tableEntry.getPartitionEntry().getPartitionInfo(tabletId);
1938+
if (!locationInfo.initialized.get()) {
1939+
refreshTableLocationByTabletId(tableEntry, tableName, tabletId);
1940+
locationInfo.initialized.compareAndSet(false, true);
18791941
}
1942+
replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route)));
1943+
}
1944+
1945+
return replicas;
1946+
}
1947+
1948+
private List<Long> getPartitionTablePartitionIds(TableEntry tableEntry,
1949+
Row startRow, boolean startIncluded,
1950+
Row endRow, boolean endIncluded,
1951+
ObPartitionLevel level)
1952+
throws Exception {
1953+
if (level == ObPartitionLevel.LEVEL_ONE) {
1954+
return tableEntry.getPartitionInfo().getFirstPartDesc()
1955+
.getPartIds(startRow, startIncluded, endRow, endIncluded);
1956+
} else if (level == ObPartitionLevel.LEVEL_TWO) {
1957+
return getPartitionsForLevelTwo(tableEntry, startRow, startIncluded,
1958+
endRow, endIncluded);
18801959
} else {
18811960
RUNTIME.error("not allowed bigger than level two");
18821961
throw new ObTableGetException("not allowed bigger than level two");
18831962
}
1963+
}
18841964

1885-
return replicas;
1965+
private long getTabletIdByPartId(TableEntry tableEntry, Long partId) {
1966+
if (ObGlobal.obVsnMajor() >= 4 && tableEntry.isPartitionTable()) {
1967+
ObPartitionInfo partInfo = tableEntry.getPartitionInfo();
1968+
Map<Long, Long> tabletIdMap = partInfo.getPartTabletIdMap();
1969+
long partIdx = tableEntry.getPartIdx(partId);
1970+
return tabletIdMap.getOrDefault(partIdx, partId);
1971+
}
1972+
return partId;
18861973
}
18871974

18881975
/**
@@ -1956,44 +2043,38 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery
19562043
}
19572044
}
19582045

1959-
List<ObPair<Long, ReplicaLocation>> partIdWithReplicaList = getPartitionReplica(tableEntry,
2046+
List<ObPair<Long, ReplicaLocation>> partIdWithReplicaList = getPartitionReplica(tableEntry, tableName,
19602047
startRow, startInclusive, endRow, endInclusive, route);
19612048

19622049
// obTableParams -> List<Pair<logicId, obTableParams>>
19632050
List<ObPair<Long, ObTableParam>> obTableParams = new ArrayList<ObPair<Long, ObTableParam>>();
19642051
for (ObPair<Long, ReplicaLocation> partIdWithReplica : partIdWithReplicaList) {
1965-
long partId = partIdWithReplica.getLeft();
2052+
long tabletId = partIdWithReplica.getLeft();
19662053
ReplicaLocation replica = partIdWithReplica.getRight();
19672054
ObServerAddr addr = replica.getAddr();
19682055
ObTable obTable = tableRoster.get(addr);
19692056
boolean addrExpired = addr.isExpired(serverAddressCachingTimeout);
1970-
if (addrExpired || obTable == null) {
1971-
logger
1972-
.warn(
1973-
"server address {} is expired={} or can not get ob table. So that will sync refresh metadata",
1974-
addr, addrExpired);
1975-
syncRefreshMetadata();
1976-
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
1977-
replica = getPartitionLocation(tableEntry, partId, route);
1978-
addr = replica.getAddr();
1979-
obTable = tableRoster.get(addr);
1980-
}
2057+
// if (addrExpired || obTable == null) {
2058+
// logger
2059+
// .warn(
2060+
// "server address {} is expired={} or can not get ob table. So that will sync refresh metadata",
2061+
// addr, addrExpired);
2062+
// syncRefreshMetadata();
2063+
// tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
2064+
// replica = getPartitionLocation(tableEntry, partId, route);
2065+
// addr = replica.getAddr();
2066+
// obTable = tableRoster.get(addr);
2067+
// }
19812068

19822069
if (obTable == null) {
19832070
RUNTIME.error("cannot get table by addr: " + addr);
19842071
throw new ObTableGetException("cannot get table by addr: " + addr);
19852072
}
19862073

19872074
ObTableParam param = new ObTableParam(obTable);
1988-
if (ObGlobal.obVsnMajor() >= 4) {
1989-
long partIdx = tableEntry.getPartIdx(partId);
1990-
partId = tableEntry.isPartitionTable() ? tableEntry.getPartitionInfo()
1991-
.getPartTabletIdMap().get(partIdx) : partId;
1992-
}
1993-
19942075
param.setTableId(tableEntry.getTableId());
19952076
// real partition(tablet) id
1996-
param.setPartitionId(partId);
2077+
param.setPartitionId(tabletId);
19972078

19982079
addr.recordAccess();
19992080
obTableParams.add(new ObPair<Long, ObTableParam>(partIdWithReplica.getLeft(), param));

0 commit comments

Comments
 (0)