Skip to content

Commit f005f4b

Browse files
committed
add threshold for refresh table entry with location
1 parent e053056 commit f005f4b

File tree

4 files changed

+57
-19
lines changed

4 files changed

+57
-19
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ private void initProperties() {
401401
routeMap.put(TABLE_ENTRY_ACQUIRE_CONNECT_TIMEOUT.getKey(), String.valueOf(tableEntryAcquireConnectTimeout));
402402
routeMap.put(TABLE_ENTRY_ACQUIRE_SOCKET_TIMEOUT.getKey(), String.valueOf(tableEntryAcquireSocketTimeout));
403403
routeMap.put(TABLE_ENTRY_REFRESH_INTERVAL_BASE.getKey(), String.valueOf(tableEntryRefreshIntervalBase));
404+
routeMap.put(TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD.getKey(), String.valueOf(tableEntryLocationRefreshThreshold));
404405
routeMap.put(TABLE_ENTRY_REFRESH_INTERVAL_CEILING.getKey(), String.valueOf(tableEntryRefreshIntervalCeiling));
405406
routeMap.put(TABLE_ENTRY_REFRESH_TRY_TIMES.getKey(), String.valueOf(tableEntryRefreshTryTimes));
406407
}

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
import static com.alipay.oceanbase.rpc.location.model.partition.ObPartitionKey.MAX_PARTITION_ELEMENT;
4949
import static com.alipay.oceanbase.rpc.location.model.partition.ObPartitionKey.MIN_PARTITION_ELEMENT;
50+
import static com.alipay.oceanbase.rpc.property.Property.TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD;
5051
import static com.alipay.oceanbase.rpc.util.RandomUtil.getRandomNum;
5152
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.*;
5253
import static java.lang.String.format;
@@ -745,10 +746,14 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn
745746
}
746747

747748
if (ObGlobal.obVsnMajor() >= 4) {
748-
// only set empty partitionEntry
749-
ObPartitionEntry partitionEntry = new ObPartitionEntry();
750-
tableEntry.setPartitionEntry(partitionEntry);
751-
tableEntry.setRefreshTimeMills(System.currentTimeMillis());
749+
// only set empty partitionEntry
750+
if (tableEntry.getPartitionNum() <= TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD.getDefaultLong()) {
751+
getTableEntryLocationFromRemote(connection, key, tableEntry);
752+
} else {
753+
ObPartitionEntry partitionEntry = new ObPartitionEntry();
754+
tableEntry.setPartitionEntry(partitionEntry);
755+
tableEntry.setRefreshTimeMills(System.currentTimeMillis());
756+
}
752757
} else {
753758
// get location info
754759
getTableEntryLocationFromRemote(connection, key, tableEntry);
@@ -915,6 +920,7 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection,
915920
ps.setString(1, key.getTenantName());
916921
ps.setString(2, key.getDatabaseName());
917922
ps.setString(3, key.getTableName());
923+
ps.setString(4, key.getTenantName());
918924
rs = ps.executeQuery();
919925
partitionEntry = getPartitionLocationFromResultSet(tableEntry, rs, partitionEntry);
920926
} catch (Exception e) {
@@ -1279,28 +1285,55 @@ private static ObPartitionEntry getPartitionLocationFromResultSet(TableEntry tab
12791285
} else {
12801286
tabletLsIdMap.put(partitionId, INVALID_LS_ID); // non-partitioned table
12811287
}
1288+
ObPartitionLocationInfo partitionLocationInfo = partitionEntry
1289+
.getPartitionInfo(partitionId);
1290+
ObPartitionLocation location = partitionLocationInfo.getPartitionLocation();
1291+
if (location == null) {
1292+
partitionLocationInfo.rwLock.writeLock().lock();
1293+
try {
1294+
location = partitionLocationInfo.getPartitionLocation();
1295+
if (location == null) {
1296+
location = new ObPartitionLocation();
1297+
partitionLocationInfo.updateLocation(location, lsId);
1298+
}
1299+
} finally {
1300+
partitionLocationInfo.rwLock.writeLock().unlock();
1301+
}
1302+
}
1303+
if (!replica.isValid()) {
1304+
RUNTIME
1305+
.warn(format(
1306+
"Replica is invalid; continuing. Replica=%s, PartitionId/TabletId=%d, TableId=%d",
1307+
replica, partitionId, tableEntry.getTableId()));
1308+
continue;
1309+
}
1310+
location.addReplicaLocation(replica);
1311+
1312+
if (partitionLocationInfo.initialized.compareAndSet(false, true)) {
1313+
partitionLocationInfo.initializationLatch.countDown();
1314+
}
12821315
} else {
12831316
partitionId = rs.getLong("partition_id");
12841317
if (tableEntry.isPartitionTable()
1285-
&& null != tableEntry.getPartitionInfo().getSubPartDesc()) {
1318+
&& null != tableEntry.getPartitionInfo().getSubPartDesc()) {
12861319
partitionId = ObPartIdCalculator.getPartIdx(partitionId, tableEntry
1287-
.getPartitionInfo().getSubPartDesc().getPartNum());
1320+
.getPartitionInfo().getSubPartDesc().getPartNum());
12881321
}
1289-
}
1290-
if (!replica.isValid()) {
1291-
RUNTIME
1292-
.warn(format(
1293-
"replica is invalid, continue, replica=%s, partitionId/tabletId=%d, tableId=%d",
1294-
replica, partitionId, tableEntry.getTableId()));
1295-
continue;
1296-
}
1297-
ObPartitionLocation location = partitionLocation.get(partitionId);
1322+
if (!replica.isValid()) {
1323+
RUNTIME
1324+
.warn(format(
1325+
"replica is invalid, continue, replica=%s, partitionId/tabletId=%d, tableId=%d",
1326+
replica, partitionId, tableEntry.getTableId()));
1327+
continue;
1328+
}
1329+
ObPartitionLocation location = partitionLocation.get(partitionId);
12981330

1299-
if (location == null) {
1300-
location = new ObPartitionLocation();
1301-
partitionLocation.put(partitionId, location);
1331+
if (location == null) {
1332+
location = new ObPartitionLocation();
1333+
partitionLocation.put(partitionId, location);
1334+
}
1335+
location.addReplicaLocation(replica);
13021336
}
1303-
location.addReplicaLocation(replica);
13041337
}
13051338

13061339
if (ObGlobal.obVsnMajor() < 4) {

src/main/java/com/alipay/oceanbase/rpc/property/Property.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public enum Property {
5959
TABLE_ENTRY_ACQUIRE_SOCKET_TIMEOUT("table.entry.acquire.socket.timeout", 3000L,
6060
"刷新TABLE地址的SOCKET超时时间"),
6161

62+
TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD("table.entry.location.refresh.threshold", 100L, "刷新TABLE ENTRY同时刷新LOCATION的阈值"),
63+
6264
TABLE_ENTRY_REFRESH_INTERVAL_BASE("table.entry.refresh.interval.base", 100L, "刷新TABLE地址的基础时间间隔"),
6365

6466
@Deprecated

src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public abstract class AbstractObTableClient extends AbstractTable {
5050
protected long tableEntryRefreshIntervalBase = TABLE_ENTRY_REFRESH_INTERVAL_BASE
5151
.getDefaultLong();
5252

53+
protected long tableEntryLocationRefreshThreshold = TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD.getDefaultLong();
54+
5355
protected long tableEntryRefreshIntervalCeiling = TABLE_ENTRY_REFRESH_INTERVAL_CEILING
5456
.getDefaultLong();
5557

0 commit comments

Comments
 (0)