Skip to content

Commit 5df41bd

Browse files
authored
adapt for distributedExecute (#290)
* adapt for distributedExecute * fix for compile * fix for ci test * get server capacity in odp mode * fix for reviews
1 parent 7739773 commit 5df41bd

File tree

6 files changed

+177
-59
lines changed

6 files changed

+177
-59
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1390,7 +1390,8 @@ public TableEntry refreshTableLocationByTabletId(TableEntry tableEntry, String t
13901390
tableEntryAcquireSocketTimeout,
13911391
serverAddressPriorityTimeout,
13921392
serverAddressCachingTimeout,
1393-
sysUA
1393+
sysUA,
1394+
!getServerCapacity().isSupportDistributedExecute() /* withLsId */
13941395
);
13951396

13961397
tableEntry.prepareForWeakRead(serverRoster.getServerLdcLocation());
@@ -4245,13 +4246,20 @@ public ObServerRoute getRoute(boolean readonly) {
42454246
}
42464247
}
42474248

4248-
private ObTableServerCapacity getServerCapacity() {
4249-
if (tableRoster.isEmpty()) {
4250-
throw new IllegalStateException("client is not initialized and obTable is empty");
4249+
public ObTableServerCapacity getServerCapacity() {
4250+
if (isOdpMode()) {
4251+
if (odpTable == null) {
4252+
throw new IllegalStateException("client is not initialized and obTable is empty");
4253+
}
4254+
return odpTable.getServerCapacity();
4255+
} else {
4256+
if (tableRoster == null || tableRoster.isEmpty()) {
4257+
throw new IllegalStateException("client is not initialized and obTable is empty");
4258+
}
4259+
Iterator<ObTable> iterator = tableRoster.values().iterator();
4260+
ObTable firstObTable = iterator.next();
4261+
return firstObTable.getServerCapacity();
42514262
}
4252-
Iterator<ObTable> iterator = tableRoster.values().iterator();
4253-
ObTable firstObTable = iterator.next();
4254-
return firstObTable.getServerCapacity();
42554263
}
42564264

42574265
public void setOdpAddr(String odpAddr) {

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,25 @@ public class LocationUtil {
189189
+ " AND A.tenant_name = ? "
190190
+ " AND A.database_name = ? "
191191
+ " AND A.table_name = ?;";
192-
192+
private static final String PROXY_LOCATION_SQL_PARTITION_BY_TABLETID_WITHOUT_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ "
193+
+ " A.tablet_id as tablet_id, "
194+
+ " A.svr_ip as svr_ip, "
195+
+ " A.sql_port as sql_port, "
196+
+ " A.table_id as table_id, "
197+
+ " A.role as role, "
198+
+ " A.replica_num as replica_num, "
199+
+ " A.part_num as part_num, "
200+
+ " (SELECT B.svr_port FROM oceanbase.__all_server B WHERE A.svr_ip = B.svr_ip AND A.sql_port = B.inner_port) as svr_port, "
201+
+ " (SELECT B.status FROM oceanbase.__all_server B WHERE A.svr_ip = B.svr_ip AND A.sql_port = B.inner_port) as status, "
202+
+ " (SELECT B.stop_time FROM oceanbase.__all_server B WHERE A.svr_ip = B.svr_ip AND A.sql_port = B.inner_port) as stop_time, "
203+
+ " A.spare1 as replica_type "
204+
+ "FROM "
205+
+ " oceanbase.__all_virtual_proxy_schema A "
206+
+ "WHERE "
207+
+ " A.tablet_id = ? "
208+
+ " AND A.tenant_name = ? "
209+
+ " AND A.database_name = ? "
210+
+ " AND A.table_name = ?;";
193211
private static final String PROXY_FIRST_PARTITION_SQL_V4 = "SELECT /*+READ_CONSISTENCY(WEAK)*/ part_id, part_name, tablet_id, high_bound_val, sub_part_num "
194212
+ "FROM oceanbase.__all_virtual_proxy_partition "
195213
+ "WHERE tenant_name = ? and table_id = ? LIMIT ?;";
@@ -499,7 +517,8 @@ public static TableEntry loadTableEntryLocationWithPriority(final ServerRoster s
499517
final long socketTimeout,
500518
final long priorityTimeout,
501519
final long cachingTimeout,
502-
final ObUserAuth sysUA)
520+
final ObUserAuth sysUA,
521+
boolean withLsId)
503522
throws ObTableEntryRefreshException {
504523

505524
return callTableEntryRefreshWithPriority(serverRoster, priorityTimeout, cachingTimeout,
@@ -512,7 +531,7 @@ TableEntry execute(ObServerAddr obServerAddr) throws ObTableEntryRefreshExceptio
512531
TableEntry execute(Connection connection)
513532
throws ObTablePartitionLocationRefreshException {
514533
return getTableEntryLocationFromRemote(connection, key, tableEntry,
515-
tabletId);
534+
tabletId, withLsId);
516535
}
517536
});
518537
}
@@ -793,10 +812,14 @@ private static TableEntry getTableEntryFromRemote(Connection connection, TableEn
793812
}
794813

795814
// Note: This code is applicable only for refreshing locations based on tablet ID in version 4.x
796-
private static String genLocationSQLByTabletId() {
815+
private static String genLocationSQLByTabletId(boolean withLsId) {
797816
String sql = null;
798817
if (ObGlobal.obVsnMajor() >= 4) {
799-
sql = PROXY_LOCATION_SQL_PARTITION_BY_TABLETID_V4;
818+
if (withLsId) {
819+
sql = PROXY_LOCATION_SQL_PARTITION_BY_TABLETID_V4;
820+
} else {
821+
sql = PROXY_LOCATION_SQL_PARTITION_BY_TABLETID_WITHOUT_V4;
822+
}
800823
} else {
801824
throw new FeatureNotSupportedException("not support ob version less than 4");
802825
}
@@ -863,12 +886,13 @@ private static String genLocationSQLByOffset(TableEntry tableEntry, int offset,
863886
public static TableEntry getTableEntryLocationFromRemote(Connection connection,
864887
TableEntryKey key,
865888
TableEntry tableEntry,
866-
Long tabletId)
889+
Long tabletId,
890+
boolean withLsId)
867891
throws ObTablePartitionLocationRefreshException {
868892
PreparedStatement ps = null;
869893
ResultSet rs = null;
870894
ObPartitionEntry partitionEntry = tableEntry.getPartitionEntry();
871-
String sql = genLocationSQLByTabletId();
895+
String sql = genLocationSQLByTabletId(withLsId);
872896
ObPartitionLocationInfo partitionLocationInfo = partitionEntry.getPartitionInfo(tabletId);
873897
// return quickly if recently refreshed
874898
if (System.currentTimeMillis() - partitionLocationInfo.getLastUpdateTime()
@@ -877,13 +901,20 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection,
877901
}
878902
try {
879903
ps = connection.prepareStatement(sql);
880-
ps.setString(1, key.getTenantName());
881-
ps.setLong(2, tabletId);
882-
ps.setString(3, key.getTenantName());
883-
ps.setString(4, key.getDatabaseName());
884-
ps.setString(5, key.getTableName());
904+
if (withLsId) {
905+
ps.setString(1, key.getTenantName());
906+
ps.setLong(2, tabletId);
907+
ps.setString(3, key.getTenantName());
908+
ps.setString(4, key.getDatabaseName());
909+
ps.setString(5, key.getTableName());
910+
} else {
911+
ps.setLong(1, tabletId);
912+
ps.setString(2, key.getTenantName());
913+
ps.setString(3, key.getDatabaseName());
914+
ps.setString(4, key.getTableName());
915+
}
885916
rs = ps.executeQuery();
886-
getPartitionLocationFromResultSetByTablet(tableEntry, rs, partitionEntry, tabletId);
917+
getPartitionLocationFromResultSetByTablet(tableEntry, rs, partitionEntry, tabletId, withLsId);
887918
} catch (Exception e) {
888919
RUNTIME.error(LCD.convert("01-00010"), key, tableEntry, e);
889920
throw new ObTablePartitionLocationRefreshException(format(
@@ -1217,7 +1248,8 @@ private static TableEntry getTableEntryFromResultSet(TableEntryKey key, ResultSe
12171248
private static ObPartitionEntry getPartitionLocationFromResultSetByTablet(TableEntry tableEntry,
12181249
ResultSet rs,
12191250
ObPartitionEntry partitionEntry,
1220-
long tabletId)
1251+
long tabletId,
1252+
boolean withLsId)
12211253
throws SQLException,
12221254
ObTablePartitionLocationRefreshException {
12231255

@@ -1232,7 +1264,7 @@ private static ObPartitionEntry getPartitionLocationFromResultSetByTablet(TableE
12321264
ReplicaLocation replica = buildReplicaLocation(rs);
12331265
long partitionId = (ObGlobal.obVsnMajor() >= 4) ? rs.getLong("tablet_id") : rs
12341266
.getLong("partition_id");
1235-
long lsId = ObGlobal.obVsnMajor() >= 4 ? rs.getLong("ls_id") : INVALID_LS_ID;
1267+
long lsId = ObGlobal.obVsnMajor() >= 4 && withLsId ? rs.getLong("ls_id") : INVALID_LS_ID;
12361268
if (rs.wasNull() && ObGlobal.obVsnMajor() >= 4) {
12371269
lsId = INVALID_LS_ID; // For non-partitioned table
12381270
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.*;
3939
import java.util.concurrent.atomic.AtomicReference;
4040

41+
import static com.alipay.oceanbase.rpc.constant.Constants.INVALID_TABLET_ID;
4142
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME;
4243

4344
public class ObTableClientQueryAsyncStreamResult extends AbstractQueryStreamResult {
@@ -112,9 +113,10 @@ protected ObTableQueryAsyncResult referToNewPartition(ObPair<Long, ObTableParam>
112113
throws Exception {
113114
ObTableParam obTableParam = partIdWithObTable.getRight();
114115
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
115-
116+
long partitionId = client.getServerCapacity().isSupportSecondaryPartition() ?
117+
INVALID_TABLET_ID : obTableParam.getPartitionId();
116118
// refresh request info
117-
queryRequest.setPartitionId(obTableParam.getPartitionId());
119+
queryRequest.setPartitionId(partitionId);
118120
queryRequest.setTableId(obTableParam.getTableId());
119121
if (operationTimeout > 0) {
120122
asyncRequest.setTimeout(operationTimeout);

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,20 @@
3333
import java.util.Map;
3434
import java.util.concurrent.atomic.AtomicReference;
3535

36+
import static com.alipay.oceanbase.rpc.protocol.payload.Constants.INVALID_TABLET_ID;
37+
3638
public class ObTableClientQueryStreamResult extends AbstractQueryStreamResult {
3739
private static final Logger logger = TableClientLoggerFactory
3840
.getLogger(ObTableClientQueryStreamResult.class);
3941

4042
protected ObTableQueryResult referToNewPartition(ObPair<Long, ObTableParam> partIdWithObTable)
4143
throws Exception {
44+
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ?
45+
INVALID_TABLET_ID : partIdWithObTable.getRight().getPartitionId();
4246
ObTableQueryRequest request = new ObTableQueryRequest();
4347
request.setTableName(tableName);
4448
request.setTableQuery(tableQuery);
45-
request.setPartitionId(partIdWithObTable.getRight().getPartitionId());
49+
request.setPartitionId(partitionId);
4650
request.setTableId(partIdWithObTable.getRight().getTableId());
4751
request.setEntityType(entityType);
4852
if (operationTimeout > 0) {

0 commit comments

Comments
 (0)