Skip to content

Commit 828c1a4

Browse files
committed
add NeedTabletId flag and set/get interface
1 parent c8df6da commit 828c1a4

File tree

10 files changed

+102
-29
lines changed

10 files changed

+102
-29
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ public static boolean isHBasePutPerfSupport() {
125125
return OB_VERSION >= OB_VERSION_4_4_1_0;
126126
}
127127

128+
public static boolean isDistributeNeedTabletIdSupport() {
129+
return OB_VERSION >= OB_VERSION_4_3_5_5 && OB_VERSION < OB_VERSION_4_4_0_0
130+
|| OB_VERSION >= OB_VERSION_4_4_1_0;
131+
}
132+
128133
/*-------------------------------------------- OB_VERSION --------------------------------------------*/
129134

130135
public static final long OB_VERSION_4_2_3_0 = calcVersion(4, (short) 2, (byte) 3, (byte) 0);
@@ -143,6 +148,8 @@ public static boolean isHBasePutPerfSupport() {
143148

144149
public static final long OB_VERSION_4_3_5_3 = calcVersion(4, (short) 3, (byte) 5, (byte) 3);
145150

151+
public static final long OB_VERSION_4_3_5_5 = calcVersion(4, (short) 3, (byte) 5, (byte) 5);
152+
146153
public static final long OB_VERSION_4_4_0_0 = calcVersion(4, (short) 4, (byte) 0, (byte) 0);
147154

148155
public static final long OB_VERSION_4_4_1_0 = calcVersion(4, (short) 4, (byte) 1, (byte) 0);

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2107,17 +2107,19 @@ public ObPayload executeWithRetry(ObTable obTable, ObPayload request, String tab
21072107
throw new ObTableRoutingWrongException();
21082108
}
21092109
} else if (result != null && result.isRoutingWrong() && !isOdpMode()) {
2110-
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
2111-
obTable.getIp(), obTable.getPort(), tableName, result.isNeedRefreshMeta());
21122110
if (result.isNeedRefreshMeta()) {
21132111
tableRoute.refreshMeta(tableName);
21142112
}
2113+
long tabletId = INVALID_TABLET_ID;
21152114
if (request instanceof ObTableAbstractOperationRequest) {
2116-
long tabletId = ((ObTableAbstractOperationRequest) request).getPartitionId();
2115+
tabletId = ((ObTableAbstractOperationRequest) request).getPartitionId();
21172116
tableRoute.refreshPartitionLocation(tableName, tabletId, null);
21182117
} else if (request instanceof ObHbaseRequest) {
2119-
tableRoute.refreshTabletLocationBatch(tableName);
2118+
tabletId = ((ObHbaseRequest) request).getTabletId();
2119+
tableRoute.refreshPartitionLocation(tableName, tabletId, null);
21202120
}
2121+
logger.info("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}, tabletId: {}",
2122+
obTable.getIp(), obTable.getPort(), tableName, result.isNeedRefreshMeta(), tabletId);
21212123
}
21222124
return result;
21232125
}
@@ -2337,16 +2339,22 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
23372339
}
23382340

23392341
// Check if partIdMapObTable size is greater than 1
2342+
boolean needTabletId = false;
23402343
boolean isDistributedExecuteSupported = getServerCapacity().isSupportDistributedExecute();
23412344
if (partIdMapObTable.size() > 1 && !isDistributedExecuteSupported) {
23422345
throw new ObTablePartitionConsistentException(
23432346
"query and mutate must be a atomic operation");
23442347
}
2348+
if (isDistributedExecuteSupported) {
2349+
needTabletId = request.getNeedTabletId();
2350+
} else {
2351+
needTabletId = true;
2352+
}
23452353
// Proceed with the operation
23462354
Map.Entry<Long, ObTableParam> entry = partIdMapObTable.entrySet().iterator().next();
23472355
ObTableParam tableParam = entry.getValue();
23482356
request.setTableId(tableParam.getTableId());
2349-
long partitionId = isDistributedExecuteSupported ? INVALID_TABLET_ID : tableParam.getPartitionId();
2357+
long partitionId = needTabletId ? tableParam.getPartitionId() : INVALID_TABLET_ID;
23502358
routeTabletId = tableParam.getPartitionId();
23512359
request.setPartitionId(partitionId);
23522360
request.setTimeout(tableParam.getObTable().getObTableOperationTimeout());
@@ -2424,23 +2432,18 @@ public ObPayload execute(final ObHbaseRequest request) throws Exception {
24242432
} else {
24252433
Row row = new Row();
24262434
// get the first cell from the first cfRows to route
2427-
// use the first table in tablegroup to route
2428-
String realTableName = null;
24292435
if (request.getCfRows().isEmpty()) {
24302436
throw new ObTableUnexpectedException("no cf rows");
24312437
}
2432-
if (request.getCfRows().size() > 1) {
2433-
realTableName = tryGetTableNameFromTableGroupCache(request.getTableName(), false);
2434-
} else {
2435-
realTableName = request.getCfRows().get(0).getRealTableName();
2436-
}
2438+
String realTableName = request.getCfRows().get(0).getRealTableName();
24372439
int keyIdx = request.getCfRows().get(0).getKeyIndex(0);
24382440
row.add("K", request.getKeys().get(keyIdx).getValue());
24392441
row.add("Q", request.getCfRows().get(0).getCells().get(0).getQ().getValue());
24402442
row.add("T", request.getCfRows().get(0).getCells().get(0).getT().getValue());
24412443
ObTableParam tableParam = tableRoute.getTableParam(realTableName, row);
24422444
ObTable obTable = tableParam.getObTable();
24432445
request.setTimeout(obTable.getObTableOperationTimeout());
2446+
request.setTabletId(tableParam.getTabletId());
24442447
return executeWithRetry(obTable, request, realTableName);
24452448
}
24462449
}

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class BatchOperation {
4949
boolean hasCheckAndInsUp = false;
5050
boolean hasGet = false;
5151
boolean serverCanRetry = false;
52+
boolean needTabletId = false;
5253
ObTableOperationType lastType = ObTableOperationType.INVALID;
5354
boolean isSameType = true;
5455
protected ObTableEntityType entityType = ObTableEntityType.KV;
@@ -186,6 +187,10 @@ public void setServerCanRetry(boolean canRetry) {
186187
this.serverCanRetry = canRetry;
187188
}
188189

190+
public void setNeedTabletId(boolean needTabletId) {
191+
this.needTabletId = needTabletId;
192+
}
193+
189194
public BatchOperation setIsAtomic(boolean isAtomic) {
190195
this.isAtomic = isAtomic;
191196
return this;
@@ -319,6 +324,7 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
319324
batchOps = new ObTableClientLSBatchOpsImpl(tableName, (ObTableClient) client);
320325
batchOps.setEntityType(entityType);
321326
batchOps.setServerCanRetry(serverCanRetry);
327+
batchOps.setNeedTabletId(needTabletId);
322328
for (Object operation : operations) {
323329
if (operation instanceof CheckAndInsUp) {
324330
checkAndInsUpCnt++;

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObHbaseRequest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@
3333
/*
3434
OB_SERIALIZE_MEMBER(ObHbaseRequest,
3535
credential_,
36+
table_name_,
37+
tablet_id_,
3638
option_flag_,
3739
op_type_,
38-
table_name_,
3940
keys_,
4041
cf_rows_);
4142
*/
@@ -47,6 +48,7 @@
4748
public class ObHbaseRequest extends AbstractPayload implements Credentialable {
4849
protected ObBytesString credential;
4950
protected String tableName; // HBase tableName, OceanBase tablegroup_name
51+
protected long tabletId; // do not serialize
5052
protected ObTableHbaseReqFlag optionFlag = new ObTableHbaseReqFlag();
5153
protected ObTableOperationType opType;
5254
protected List<ObObj> keys = new ArrayList<>();
@@ -186,6 +188,14 @@ public boolean getServerCanRetry() {
186188
return optionFlag.getFlagServerCanRetry();
187189
}
188190

191+
public void setTabletId(long tabletId) {
192+
this.tabletId = tabletId;
193+
}
194+
195+
public long getTabletId() {
196+
return tabletId;
197+
}
198+
189199
public ObBytesString getCredential() {
190200
return credential;
191201
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,14 @@ public boolean getServerCanRetry() {
216216
return option_flag.isServerCanRetry();
217217
}
218218

219+
public void setNeedTabletId(boolean needTabletId) {
220+
option_flag.setNeedTabletId(needTabletId);
221+
}
222+
223+
public boolean getNeedTabletId() {
224+
return option_flag.isNeedTabletId();
225+
}
226+
219227
/*
220228
* Is returning affected entity.
221229
*/

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOptionFlag.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public enum ObTableOptionFlag {
2525
RETURNING_ROWKEY(1 << 0),
2626
USE_PUT(1 << 1),
2727
RETURN_ONE_RES(1 << 2),
28-
SERVER_CAN_RETRY(1 << 3);
28+
SERVER_CAN_RETRY(1 << 3),
29+
DIS_NEED_TABLET_ID(1 << 4);
2930

3031
private int value;
3132
private static Map<Integer, ObTableOptionFlag> map = new HashMap<Integer, ObTableOptionFlag>();
@@ -116,4 +117,16 @@ public void setServerCanRetry(boolean serverCanRetry) {
116117
this.value &= ~(SERVER_CAN_RETRY.value);
117118
}
118119
}
120+
121+
public boolean isNeedTabletId() {
122+
return (this.value & DIS_NEED_TABLET_ID.value) != 0;
123+
}
124+
125+
public void setNeedTabletId(boolean needTabletId) {
126+
if (needTabletId) {
127+
this.value |= DIS_NEED_TABLET_ID.value;
128+
} else {
129+
this.value &= ~(DIS_NEED_TABLET_ID.value);
130+
}
131+
}
119132
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,13 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
175175
throw new ObTableRoutingWrongException();
176176
}
177177
} else if (result != null && result.isRoutingWrong()) {
178-
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
179-
subObTable.getIp(), subObTable.getPort(), indexTableName, result.isNeedRefreshMeta());
180178
TableEntry tableEntry = result.isNeedRefreshMeta() ?
181179
client.getOrRefreshTableEntry(indexTableName, true) :
182180
client.getOrRefreshTableEntry(indexTableName, false);
183181
long tabletId = client.getTabletIdByPartId(tableEntry, partIdWithIndex.getLeft());
184182
client.refreshTableLocationByTabletId(indexTableName, tabletId);
183+
logger.info("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}, tabletId: {}",
184+
subObTable.getIp(), subObTable.getPort(), indexTableName, result.isNeedRefreshMeta(), tabletId);
185185
}
186186
}
187187
client.resetExecuteContinuousFailureCount(indexTableName);

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ protected ObTableQueryAsyncResult referToNewPartition(ObPair<Long, ObTableParam>
114114
throws Exception {
115115
ObTableParam obTableParam = partIdWithObTable.getRight();
116116
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
117-
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId();
117+
long partitionId = needTabletId(queryRequest) ? obTableParam.getPartitionId() : INVALID_TABLET_ID;
118118
// refresh request info
119119
queryRequest.setPartitionId(partitionId);
120120
queryRequest.setTableId(obTableParam.getTableId());
@@ -141,7 +141,7 @@ protected ObTableQueryAsyncResult referToLastStreamResult(ObPair<Long, ObTablePa
141141
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
142142

143143
// refresh request info
144-
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId();
144+
long partitionId = needTabletId(queryRequest) ? obTableParam.getPartitionId() : INVALID_TABLET_ID;
145145
queryRequest.setPartitionId(partitionId);
146146
queryRequest.setTableId(obTableParam.getTableId());
147147

@@ -161,7 +161,7 @@ protected void closeLastStreamResult(ObPair<Long, ObTableParam> partIdWithObTabl
161161
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
162162

163163
// refresh request info
164-
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId();
164+
long partitionId = needTabletId(queryRequest) ? obTableParam.getPartitionId() : INVALID_TABLET_ID;
165165
queryRequest.setPartitionId(partitionId);
166166
queryRequest.setTableId(obTableParam.getTableId());
167167

@@ -440,6 +440,14 @@ private boolean isDistributeScan() {
440440
return allowDistributeScan && client.getServerCapacity().isSupportDistributedExecute();
441441
}
442442

443+
private boolean needTabletId(ObTableQueryRequest queryRequest) {
444+
if (isDistributeScan()) {
445+
return queryRequest.getNeedTabletId();
446+
} else {
447+
return true;
448+
}
449+
}
450+
443451
public void setAllowDistributeScan(boolean allowDistributeScan) {
444452
this.allowDistributeScan = allowDistributeScan;
445453
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@ public class ObTableClientQueryStreamResult extends AbstractQueryStreamResult {
4444

4545
protected ObTableQueryResult referToNewPartition(ObPair<Long, ObTableParam> partIdWithObTable)
4646
throws Exception {
47-
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
48-
: partIdWithObTable.getRight().getPartitionId();
4947
ObTableQueryRequest request = new ObTableQueryRequest();
5048
request.setTableName(tableName);
5149
request.setTableQuery(tableQuery);
50+
long partitionId = needTabletId(request) ? partIdWithObTable.getRight().getPartitionId() : INVALID_TABLET_ID;
5251
request.setPartitionId(partitionId);
5352
request.setTableId(partIdWithObTable.getRight().getTableId());
5453
request.setEntityType(entityType);
@@ -117,4 +116,12 @@ protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery ta
117116
throws Exception {
118117
return buildPartitions(client, tableQuery, tableName);
119118
}
119+
120+
private boolean needTabletId(ObTableQueryRequest request) {
121+
if (client.getServerCapacity().isSupportDistributedExecute()) {
122+
return request.getNeedTabletId();
123+
} else {
124+
return true;
125+
}
126+
}
120127
}

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class ObTableClientLSBatchOpsImpl extends AbstractTableBatchOps {
6969
private boolean returningAffectedEntity = false;
7070
private boolean needAllProp = false;
7171
private boolean serverCanRetry = false;
72+
private boolean needTabletId = false;
7273
private List<ObTableSingleOp> batchOperation;
7374

7475
/*
@@ -553,16 +554,14 @@ public void partitionExecute(ObTableSingleOpResult[] results,
553554
long tableId = 0;
554555
long originPartId = 0;
555556
long operationTimeout = 0;
556-
List<Long> tabletIds = new ArrayList<>();
557557
ObTable subObTable = null;
558558

559559
boolean isFirstEntry = true;
560560
// list ( index list for tablet op 1, index list for tablet op 2, ...)
561561
List<List<ObPair<Integer, ObTableSingleOp>>> lsOperationWithIndexList = new ArrayList<>();
562562
for (final Map.Entry<Long, ObPair<ObTableParam, BatchIdxOperationPairList>> tabletOperation : tabletOperationsMap.entrySet()) {
563563
ObTableParam tableParam = tabletOperation.getValue().getLeft();
564-
long tabletId = obTableClient.getServerCapacity().isSupportDistributedExecute() ?
565-
INVALID_TABLET_ID : tableParam.getPartitionId();
564+
long tabletId = needTabletId() ? tableParam.getPartitionId() : INVALID_TABLET_ID;
566565
List<ObPair<Integer, ObTableSingleOp>> tabletOperationWithIndexList = tabletOperation.getValue().getRight();
567566
lsOperationWithIndexList.add(tabletOperationWithIndexList);
568567
List<ObTableSingleOp> singleOps = new ArrayList<>();
@@ -572,7 +571,6 @@ public void partitionExecute(ObTableSingleOpResult[] results,
572571
ObTableTabletOp tableTabletOp = new ObTableTabletOp();
573572
tableTabletOp.setSingleOperations(singleOps);
574573
tableTabletOp.setTabletId(tabletId);
575-
tabletIds.add(tabletId);
576574

577575
tableLsOp.addTabletOperation(tableTabletOp);
578576

@@ -649,13 +647,13 @@ public void partitionExecute(ObTableSingleOpResult[] results,
649647
}
650648
} else if (result != null && result.isRoutingWrong() && !obTableClient.isOdpMode()) {
651649
// retry successfully in server and need to refresh client cache
652-
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
653-
subObTable.getIp(), subObTable.getPort(), realTableName, result.isNeedRefreshMeta());
650+
long tabletId = tableLsOp.getTabletOperations().get(0).getTabletId();
651+
logger.info("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}, tabletId: {}",
652+
subObTable.getIp(), subObTable.getPort(), realTableName, result.isNeedRefreshMeta(), tabletId);
654653
if (result.isNeedRefreshMeta()) {
655654
obTableClient.getOrRefreshTableEntry(realTableName, true);
656655
}
657-
// TODO: 如果是不需要全部刷新地址的错误,全部刷新地址会降低效率。如何确定出错的 tablet_id 并刷新?
658-
obTableClient.refreshTabletLocationBatch(realTableName);
656+
obTableClient.refreshTableLocationByTabletId(realTableName, tabletId);
659657
}
660658
subLSOpResult = (ObTableLSOpResult) result;
661659
obTableClient.resetExecuteContinuousFailureCount(realTableName);
@@ -1011,4 +1009,17 @@ public void setServerCanRetry(boolean canRetry) {
10111009
public boolean getServerCanRetry() {
10121010
return this.serverCanRetry;
10131011
}
1012+
1013+
public void setNeedTabletId(boolean needTabletId) {
1014+
this.needTabletId = needTabletId;
1015+
}
1016+
1017+
private boolean needTabletId() {
1018+
if (obTableClient.getServerCapacity().isSupportDistributedExecute()) {
1019+
return needTabletId;
1020+
} else {
1021+
// 不开分布式需要tablet_id
1022+
return true;
1023+
}
1024+
}
10141025
}

0 commit comments

Comments
 (0)