Skip to content

Commit 2e283a1

Browse files
committed
fix review
1 parent df7606e commit 2e283a1

File tree

4 files changed

+110
-24
lines changed

4 files changed

+110
-24
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,7 +1400,18 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName, bo
14001400
try {
14011401
// if table entry is exist we just need to refresh table locations
14021402
if (tableEntry != null && !fetchAll) {
1403-
// do nothing
1403+
if (ObGlobal.obVsnMajor() >= 4) {
1404+
// do nothing
1405+
} else {
1406+
// 3.x still proactively refreshes all locations
1407+
tableEntry = loadTableEntryLocationWithPriority(serverRoster, //
1408+
tableEntryKey,//
1409+
tableEntry,//
1410+
tableEntryAcquireConnectTimeout,//
1411+
tableEntryAcquireSocketTimeout,//
1412+
serverAddressPriorityTimeout, //
1413+
serverAddressCachingTimeout, sysUA);
1414+
}
14041415
} else {
14051416
// if table entry is not exist we should fetch partition info and table locations
14061417
tableEntry = loadTableEntryWithPriority(serverRoster, //
@@ -1839,10 +1850,23 @@ public ObTable addTable(ObServerAddr addr){
18391850
public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry tableEntry,
18401851
long partId, boolean waitForRefresh,
18411852
ObServerRoute route) throws Exception {
1853+
ReplicaLocation replica = null;
18421854
long tabletId = getTabletIdByPartId(tableEntry, partId);
1843-
ObPartitionLocationInfo obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
1855+
ObPartitionLocationInfo obPartitionLocationInfo = null;
1856+
if (ObGlobal.obVsnMajor() >= 4) {
18441857

1845-
ReplicaLocation replica = getPartitionLocation(obPartitionLocationInfo, route);
1858+
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
1859+
1860+
replica = getPartitionLocation(obPartitionLocationInfo, route);
1861+
} else {
1862+
ObPair<Long, ReplicaLocation> partitionReplica = getPartitionReplica(tableEntry, partId,
1863+
route);
1864+
replica = partitionReplica.getRight();
1865+
}
1866+
if (replica == null) {
1867+
RUNTIME.error("Cannot get replica by partId: " + partId);
1868+
throw new ObTableGetException("Cannot get replica by partId: " + partId);
1869+
}
18461870
ObServerAddr addr = replica.getAddr();
18471871
ObTable obTable = tableRoster.get(addr);
18481872

@@ -1855,8 +1879,14 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
18551879
logger.info("Server addr {} is expired, refreshing tableEntry.", addr);
18561880
}
18571881

1858-
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
1859-
replica = getPartitionLocation(obPartitionLocationInfo, route);
1882+
if (ObGlobal.obVsnMajor() >= 4) {
1883+
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
1884+
replica = getPartitionLocation(obPartitionLocationInfo, route);
1885+
} else {
1886+
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
1887+
replica = getPartitionReplica(tableEntry, partId, route).getRight();
1888+
}
1889+
18601890
addr = replica.getAddr();
18611891
obTable = tableRoster.get(addr);
18621892

@@ -1865,8 +1895,14 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
18651895
throw new ObTableGetException("Cannot get table by addr: " + addr);
18661896
}
18671897
}
1868-
1869-
ObTableParam param = createTableParam(obTable, tableEntry, obPartitionLocationInfo, partId, tabletId);
1898+
ObTableParam param = null;
1899+
if (ObGlobal.obVsnMajor() >= 4) {
1900+
param = createTableParam(obTable, tableEntry, obPartitionLocationInfo, partId, tabletId);
1901+
} else {
1902+
param.setPartId(partId);
1903+
param.setTableId(tableEntry.getTableId());
1904+
param.setPartitionId(partId);
1905+
}
18701906
addr.recordAccess();
18711907
return new ObPair<>(tabletId, param);
18721908
}

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -735,11 +735,16 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn
735735
}
736736
}
737737
}
738-
739-
// only set empty partitionEntry
740-
ObPartitionEntry partitionEntry = new ObPartitionEntry();
741-
tableEntry.setPartitionEntry(partitionEntry);
742-
tableEntry.setRefreshTimeMills(System.currentTimeMillis());
738+
739+
if (ObGlobal.obVsnMajor() >= 4) {
740+
// only set empty partitionEntry
741+
ObPartitionEntry partitionEntry = new ObPartitionEntry();
742+
tableEntry.setPartitionEntry(partitionEntry);
743+
tableEntry.setRefreshTimeMills(System.currentTimeMillis());
744+
} else {
745+
// get location info
746+
getTableEntryLocationFromRemote(connection, key, tableEntry);
747+
}
743748

744749
if (!initialized) {
745750
if (BOOT.isInfoEnabled()) {
@@ -774,6 +779,7 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn
774779
return tableEntry;
775780
}
776781

782+
// Note: This code is applicable only for refreshing locations based on tablet ID in version 4.x
777783
private static String genLocationSQLByTabletId() {
778784
String sql = null;
779785
if (ObGlobal.obVsnMajor() >= 4) {
@@ -890,6 +896,37 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection,
890896
PreparedStatement ps = null;
891897
ResultSet rs = null;
892898
ObPartitionEntry partitionEntry = new ObPartitionEntry();
899+
long partitionNum = tableEntry.getPartitionNum();
900+
int epoch = (int) ((partitionNum / MAX_TABLET_NUMS_EPOCH) + 1);
901+
for (int i = 0; i < epoch; i++) {
902+
try {
903+
int offset = i * MAX_TABLET_NUMS_EPOCH;
904+
// // This code is executed only in version 3.x
905+
String sql = genLocationSQLByOffset(tableEntry, offset, MAX_TABLET_NUMS_EPOCH);
906+
ps = connection.prepareStatement(sql);
907+
ps.setString(1, key.getTenantName());
908+
ps.setString(2, key.getDatabaseName());
909+
ps.setString(3, key.getTableName());
910+
rs = ps.executeQuery();
911+
partitionEntry = getPartitionLocationFromResultSet(tableEntry, rs, partitionEntry);
912+
} catch (Exception e) {
913+
RUNTIME.error(LCD.convert("01-00010"), key, partitionNum, tableEntry, e);
914+
throw new ObTablePartitionLocationRefreshException(format(
915+
"fail to get partition location entry from remote entryKey = %s partNum = %d tableEntry =%s "
916+
+ "offset =%d epoch =%d", key, partitionNum, tableEntry, i, epoch), e);
917+
} finally {
918+
try {
919+
if (null != rs) {
920+
rs.close();
921+
}
922+
if (null != ps) {
923+
ps.close();
924+
}
925+
} catch (SQLException e) {
926+
// ignore
927+
}
928+
}
929+
} // end for
893930
tableEntry.setPartitionEntry(partitionEntry);
894931
tableEntry.setRefreshTimeMills(System.currentTimeMillis());
895932
return tableEntry;

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;
1919

20+
import com.alipay.oceanbase.rpc.ObGlobal;
2021
import com.alipay.oceanbase.rpc.ObTableClient;
2122
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
2223
import com.alipay.oceanbase.rpc.exception.*;
2324
import com.alipay.oceanbase.rpc.location.model.ObReadConsistency;
2425
import com.alipay.oceanbase.rpc.location.model.ObServerRoute;
26+
import com.alipay.oceanbase.rpc.location.model.TableEntry;
2527
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2628
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
2729
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
@@ -336,10 +338,7 @@ public boolean next() throws Exception {
336338

337339
} catch (Exception e) {
338340
if (e instanceof ObTableNeedFetchAllException) {
339-
// Adjust the start key and refresh the expectant
340-
this.tableQuery.adjustStartKey(currentStartKey);
341341
setExpectant(refreshPartition(tableQuery, tableName));
342-
343342
// Reset the iterator to start over
344343
it = expectant.entrySet().iterator();
345344
referPartition.clear(); // Clear the referPartition if needed

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -231,11 +231,20 @@ public boolean next() throws Exception {
231231
} catch (Exception e) {
232232
if (e instanceof ObTableNeedFetchAllException) {
233233
String realTableName = client.getPhyTableNameFromTableGroup(entityType, tableName);
234-
this.asyncRequest.getObTableQueryRequest().getTableQuery()
235-
.adjustStartKey(currentStartKey);
236-
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
237-
.getTableQuery(), realTableName));
238-
setEnd(true);
234+
TableEntry entry = client.getOrRefreshTableEntry(realTableName, false, false, false);
235+
// Calculate the next partition only when the range partition is affected by a split, based on the keys already scanned.
236+
if (ObGlobal.obVsnMajor() >= 4
237+
&& entry.isPartitionTable()
238+
&& entry.getPartitionInfo().getFirstPartDesc().getPartFuncType().isRangePart()) {
239+
this.asyncRequest.getObTableQueryRequest().getTableQuery()
240+
.adjustStartKey(currentStartKey);
241+
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
242+
.getTableQuery(), realTableName));
243+
setEnd(true);
244+
} else {
245+
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
246+
.getTableQuery(), realTableName));
247+
}
239248
} else {
240249
throw e;
241250
}
@@ -263,10 +272,15 @@ public boolean next() throws Exception {
263272
} catch (Exception e) {
264273
if (e instanceof ObTableNeedFetchAllException) {
265274
String realTableName = client.getPhyTableNameFromTableGroup(entityType, tableName);
266-
this.asyncRequest.getObTableQueryRequest().getTableQuery()
267-
.adjustStartKey(currentStartKey);
268-
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
269-
.getTableQuery(), realTableName));
275+
TableEntry tableEntry = client.getOrRefreshTableEntry(realTableName, false, false, false);
276+
if (ObGlobal.obVsnMajor() >= 4
277+
&& tableEntry.isPartitionTable()
278+
&& tableEntry.getPartitionInfo().getFirstPartDesc().getPartFuncType().isRangePart()) {
279+
this.asyncRequest.getObTableQueryRequest().getTableQuery()
280+
.adjustStartKey(currentStartKey);
281+
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
282+
.getTableQuery(), realTableName));
283+
}
270284
it = expectant.entrySet().iterator();
271285
retryTimes++;
272286
if (retryTimes > client.getTableEntryRefreshTryTimes()) {

0 commit comments

Comments
 (0)