Skip to content

Commit fb03d26

Browse files
committed
fix review
1 parent a46e2b0 commit fb03d26

File tree

4 files changed

+113
-24
lines changed

4 files changed

+113
-24
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1399,7 +1399,18 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName, bo
13991399
try {
14001400
// if table entry is exist we just need to refresh table locations
14011401
if (tableEntry != null && !fetchAll) {
1402-
// do nothing
1402+
if (ObGlobal.obVsnMajor() >= 4) {
1403+
// do nothing
1404+
} else {
1405+
// 3.x still proactively refreshes all locations
1406+
tableEntry = loadTableEntryLocationWithPriority(serverRoster, //
1407+
tableEntryKey,//
1408+
tableEntry,//
1409+
tableEntryAcquireConnectTimeout,//
1410+
tableEntryAcquireSocketTimeout,//
1411+
serverAddressPriorityTimeout, //
1412+
serverAddressCachingTimeout, sysUA);
1413+
}
14031414
} else {
14041415
// if table entry is not exist we should fetch partition info and table locations
14051416
tableEntry = loadTableEntryWithPriority(serverRoster, //
@@ -1829,10 +1840,23 @@ public ObTable addTable(ObServerAddr addr){
18291840
public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry tableEntry,
18301841
long partId, boolean waitForRefresh,
18311842
ObServerRoute route) throws Exception {
1843+
ReplicaLocation replica = null;
18321844
long tabletId = getTabletIdByPartId(tableEntry, partId);
1833-
ObPartitionLocationInfo obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
1845+
ObPartitionLocationInfo obPartitionLocationInfo = null;
1846+
if (ObGlobal.obVsnMajor() >= 4) {
18341847

1835-
ReplicaLocation replica = getPartitionLocation(obPartitionLocationInfo, route);
1848+
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
1849+
1850+
replica = getPartitionLocation(obPartitionLocationInfo, route);
1851+
} else {
1852+
ObPair<Long, ReplicaLocation> partitionReplica = getPartitionReplica(tableEntry, partId,
1853+
route);
1854+
replica = partitionReplica.getRight();
1855+
}
1856+
if (replica == null) {
1857+
RUNTIME.error("Cannot get replica by partId: " + partId);
1858+
throw new ObTableGetException("Cannot get replica by partId: " + partId);
1859+
}
18361860
ObServerAddr addr = replica.getAddr();
18371861
ObTable obTable = tableRoster.get(addr);
18381862

@@ -1845,8 +1869,14 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
18451869
logger.info("Server addr {} is expired, refreshing tableEntry.", addr);
18461870
}
18471871

1848-
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
1849-
replica = getPartitionLocation(obPartitionLocationInfo, route);
1872+
if (ObGlobal.obVsnMajor() >= 4) {
1873+
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
1874+
replica = getPartitionLocation(obPartitionLocationInfo, route);
1875+
} else {
1876+
tableEntry = getOrRefreshTableEntry(tableName, true, waitForRefresh, false);
1877+
replica = getPartitionReplica(tableEntry, partId, route).getRight();
1878+
}
1879+
18501880
addr = replica.getAddr();
18511881
obTable = tableRoster.get(addr);
18521882

@@ -1855,8 +1885,14 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
18551885
throw new ObTableGetException("Cannot get table by addr: " + addr);
18561886
}
18571887
}
1858-
1859-
ObTableParam param = createTableParam(obTable, tableEntry, obPartitionLocationInfo, partId, tabletId);
1888+
ObTableParam param = null;
1889+
if (ObGlobal.obVsnMajor() >= 4) {
1890+
param = createTableParam(obTable, tableEntry, obPartitionLocationInfo, partId, tabletId);
1891+
} else {
1892+
param.setPartId(partId);
1893+
param.setTableId(tableEntry.getTableId());
1894+
param.setPartitionId(partId);
1895+
}
18601896
addr.recordAccess();
18611897
return new ObPair<>(tabletId, param);
18621898
}

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -735,11 +735,16 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn
735735
}
736736
}
737737
}
738-
739-
// only set empty partitionEntry
740-
ObPartitionEntry partitionEntry = new ObPartitionEntry();
741-
tableEntry.setPartitionEntry(partitionEntry);
742-
tableEntry.setRefreshTimeMills(System.currentTimeMillis());
738+
739+
if (ObGlobal.obVsnMajor() >= 4) {
740+
// only set empty partitionEntry
741+
ObPartitionEntry partitionEntry = new ObPartitionEntry();
742+
tableEntry.setPartitionEntry(partitionEntry);
743+
tableEntry.setRefreshTimeMills(System.currentTimeMillis());
744+
} else {
745+
// get location info
746+
getTableEntryLocationFromRemote(connection, key, tableEntry);
747+
}
743748

744749
if (!initialized) {
745750
if (BOOT.isInfoEnabled()) {
@@ -774,6 +779,7 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn
774779
return tableEntry;
775780
}
776781

782+
// Note: This code is applicable only for refreshing locations based on tablet ID in version 4.x
777783
private static String genLocationSQLByTabletId() {
778784
String sql = null;
779785
if (ObGlobal.obVsnMajor() >= 4) {
@@ -890,6 +896,37 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection,
890896
PreparedStatement ps = null;
891897
ResultSet rs = null;
892898
ObPartitionEntry partitionEntry = new ObPartitionEntry();
899+
long partitionNum = tableEntry.getPartitionNum();
900+
int epoch = (int) ((partitionNum / MAX_TABLET_NUMS_EPOCH) + 1);
901+
for (int i = 0; i < epoch; i++) {
902+
try {
903+
int offset = i * MAX_TABLET_NUMS_EPOCH;
904+
// // This code is executed only in version 3.x
905+
String sql = genLocationSQLByOffset(tableEntry, offset, MAX_TABLET_NUMS_EPOCH);
906+
ps = connection.prepareStatement(sql);
907+
ps.setString(1, key.getTenantName());
908+
ps.setString(2, key.getDatabaseName());
909+
ps.setString(3, key.getTableName());
910+
rs = ps.executeQuery();
911+
partitionEntry = getPartitionLocationFromResultSet(tableEntry, rs, partitionEntry);
912+
} catch (Exception e) {
913+
RUNTIME.error(LCD.convert("01-00010"), key, partitionNum, tableEntry, e);
914+
throw new ObTablePartitionLocationRefreshException(format(
915+
"fail to get partition location entry from remote entryKey = %s partNum = %d tableEntry =%s "
916+
+ "offset =%d epoch =%d", key, partitionNum, tableEntry, i, epoch), e);
917+
} finally {
918+
try {
919+
if (null != rs) {
920+
rs.close();
921+
}
922+
if (null != ps) {
923+
ps.close();
924+
}
925+
} catch (SQLException e) {
926+
// ignore
927+
}
928+
}
929+
} // end for
893930
tableEntry.setPartitionEntry(partitionEntry);
894931
tableEntry.setRefreshTimeMills(System.currentTimeMillis());
895932
return tableEntry;

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;
1919

20+
import com.alipay.oceanbase.rpc.ObGlobal;
2021
import com.alipay.oceanbase.rpc.ObTableClient;
2122
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
2223
import com.alipay.oceanbase.rpc.exception.*;
2324
import com.alipay.oceanbase.rpc.location.model.ObReadConsistency;
2425
import com.alipay.oceanbase.rpc.location.model.ObServerRoute;
26+
import com.alipay.oceanbase.rpc.location.model.TableEntry;
2527
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2628
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
2729
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
@@ -323,10 +325,7 @@ public boolean next() throws Exception {
323325

324326
} catch (Exception e) {
325327
if (e instanceof ObTableNeedFetchAllException) {
326-
// Adjust the start key and refresh the expectant
327-
this.tableQuery.adjustStartKey(currentStartKey);
328328
setExpectant(refreshPartition(tableQuery, tableName));
329-
330329
// Reset the iterator to start over
331330
it = expectant.entrySet().iterator();
332331
referPartition.clear(); // Clear the referPartition if needed

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package com.alipay.oceanbase.rpc.stream;
1919

20+
import com.alipay.oceanbase.rpc.ObGlobal;
2021
import com.alipay.oceanbase.rpc.ObTableClient;
2122
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
2223
import com.alipay.oceanbase.rpc.exception.ObTableException;
2324
import com.alipay.oceanbase.rpc.exception.ObTableNeedFetchAllException;
2425
import com.alipay.oceanbase.rpc.exception.ObTableRetryExhaustedException;
26+
import com.alipay.oceanbase.rpc.location.model.TableEntry;
2527
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2628
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
2729
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
@@ -201,11 +203,21 @@ public boolean next() throws Exception {
201203
referToLastStreamResult(lastEntry.getValue());
202204
} catch (Exception e) {
203205
if (e instanceof ObTableNeedFetchAllException) {
204-
this.asyncRequest.getObTableQueryRequest().getTableQuery()
205-
.adjustStartKey(currentStartKey);
206-
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
207-
.getTableQuery(), tableName));
208-
setEnd(true);
206+
207+
TableEntry entry = client.getOrRefreshTableEntry(tableName, false, false, false);
208+
// Calculate the next partition only when the range partition is affected by a split, based on the keys already scanned.
209+
if (ObGlobal.obVsnMajor() >= 4
210+
&& entry.isPartitionTable()
211+
&& entry.getPartitionInfo().getFirstPartDesc().getPartFuncType().isRangePart()) {
212+
this.asyncRequest.getObTableQueryRequest().getTableQuery()
213+
.adjustStartKey(currentStartKey);
214+
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
215+
.getTableQuery(), tableName));
216+
setEnd(true);
217+
} else {
218+
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
219+
.getTableQuery(), tableName));
220+
}
209221
} else {
210222
throw e;
211223
}
@@ -232,10 +244,15 @@ public boolean next() throws Exception {
232244
referToNewPartition(entry.getValue());
233245
} catch (Exception e) {
234246
if (e instanceof ObTableNeedFetchAllException) {
235-
this.asyncRequest.getObTableQueryRequest().getTableQuery()
236-
.adjustStartKey(currentStartKey);
237-
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
238-
.getTableQuery(), tableName));
247+
TableEntry tableEntry = client.getOrRefreshTableEntry(tableName, false, false, false);
248+
if (ObGlobal.obVsnMajor() >= 4
249+
&& tableEntry.isPartitionTable()
250+
&& tableEntry.getPartitionInfo().getFirstPartDesc().getPartFuncType().isRangePart()) {
251+
this.asyncRequest.getObTableQueryRequest().getTableQuery()
252+
.adjustStartKey(currentStartKey);
253+
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
254+
.getTableQuery(), tableName));
255+
}
239256
it = expectant.entrySet().iterator();
240257
retryTimes++;
241258
if (retryTimes > client.getTableEntryRefreshTryTimes()) {

0 commit comments

Comments
 (0)