Skip to content

Commit bcbc9ef

Browse files
authored
Fix performance not recovery after disaster recovery (#385)
* add NeedTabletId flag and set/get interface * fix scan/get more result after meta refreshing
1 parent c8df6da commit bcbc9ef

File tree

10 files changed

+148
-33
lines changed

10 files changed

+148
-33
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ public static boolean isHBasePutPerfSupport() {
125125
return OB_VERSION >= OB_VERSION_4_4_1_0;
126126
}
127127

128+
public static boolean isDistributeNeedTabletIdSupport() {
129+
return OB_VERSION >= OB_VERSION_4_3_5_5 && OB_VERSION < OB_VERSION_4_4_0_0
130+
|| OB_VERSION >= OB_VERSION_4_4_1_0;
131+
}
132+
128133
/*-------------------------------------------- OB_VERSION --------------------------------------------*/
129134

130135
public static final long OB_VERSION_4_2_3_0 = calcVersion(4, (short) 2, (byte) 3, (byte) 0);
@@ -143,6 +148,8 @@ public static boolean isHBasePutPerfSupport() {
143148

144149
public static final long OB_VERSION_4_3_5_3 = calcVersion(4, (short) 3, (byte) 5, (byte) 3);
145150

151+
public static final long OB_VERSION_4_3_5_5 = calcVersion(4, (short) 3, (byte) 5, (byte) 5);
152+
146153
public static final long OB_VERSION_4_4_0_0 = calcVersion(4, (short) 4, (byte) 0, (byte) 0);
147154

148155
public static final long OB_VERSION_4_4_1_0 = calcVersion(4, (short) 4, (byte) 1, (byte) 0);

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2107,17 +2107,19 @@ public ObPayload executeWithRetry(ObTable obTable, ObPayload request, String tab
21072107
throw new ObTableRoutingWrongException();
21082108
}
21092109
} else if (result != null && result.isRoutingWrong() && !isOdpMode()) {
2110-
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
2111-
obTable.getIp(), obTable.getPort(), tableName, result.isNeedRefreshMeta());
21122110
if (result.isNeedRefreshMeta()) {
21132111
tableRoute.refreshMeta(tableName);
21142112
}
2113+
long tabletId = INVALID_TABLET_ID;
21152114
if (request instanceof ObTableAbstractOperationRequest) {
2116-
long tabletId = ((ObTableAbstractOperationRequest) request).getPartitionId();
2115+
tabletId = ((ObTableAbstractOperationRequest) request).getPartitionId();
21172116
tableRoute.refreshPartitionLocation(tableName, tabletId, null);
21182117
} else if (request instanceof ObHbaseRequest) {
2119-
tableRoute.refreshTabletLocationBatch(tableName);
2118+
tabletId = ((ObHbaseRequest) request).getTabletId();
2119+
tableRoute.refreshPartitionLocation(tableName, tabletId, null);
21202120
}
2121+
logger.info("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}, tabletId: {}",
2122+
obTable.getIp(), obTable.getPort(), tableName, result.isNeedRefreshMeta(), tabletId);
21212123
}
21222124
return result;
21232125
}
@@ -2337,16 +2339,22 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
23372339
}
23382340

23392341
// Check if partIdMapObTable size is greater than 1
2342+
boolean needTabletId = false;
23402343
boolean isDistributedExecuteSupported = getServerCapacity().isSupportDistributedExecute();
23412344
if (partIdMapObTable.size() > 1 && !isDistributedExecuteSupported) {
23422345
throw new ObTablePartitionConsistentException(
23432346
"query and mutate must be a atomic operation");
23442347
}
2348+
if (isDistributedExecuteSupported) {
2349+
needTabletId = request.getNeedTabletId();
2350+
} else {
2351+
needTabletId = true;
2352+
}
23452353
// Proceed with the operation
23462354
Map.Entry<Long, ObTableParam> entry = partIdMapObTable.entrySet().iterator().next();
23472355
ObTableParam tableParam = entry.getValue();
23482356
request.setTableId(tableParam.getTableId());
2349-
long partitionId = isDistributedExecuteSupported ? INVALID_TABLET_ID : tableParam.getPartitionId();
2357+
long partitionId = needTabletId ? tableParam.getPartitionId() : INVALID_TABLET_ID;
23502358
routeTabletId = tableParam.getPartitionId();
23512359
request.setPartitionId(partitionId);
23522360
request.setTimeout(tableParam.getObTable().getObTableOperationTimeout());
@@ -2424,23 +2432,18 @@ public ObPayload execute(final ObHbaseRequest request) throws Exception {
24242432
} else {
24252433
Row row = new Row();
24262434
// get the first cell from the first cfRows to route
2427-
// use the first table in tablegroup to route
2428-
String realTableName = null;
24292435
if (request.getCfRows().isEmpty()) {
24302436
throw new ObTableUnexpectedException("no cf rows");
24312437
}
2432-
if (request.getCfRows().size() > 1) {
2433-
realTableName = tryGetTableNameFromTableGroupCache(request.getTableName(), false);
2434-
} else {
2435-
realTableName = request.getCfRows().get(0).getRealTableName();
2436-
}
2438+
String realTableName = request.getCfRows().get(0).getRealTableName();
24372439
int keyIdx = request.getCfRows().get(0).getKeyIndex(0);
24382440
row.add("K", request.getKeys().get(keyIdx).getValue());
24392441
row.add("Q", request.getCfRows().get(0).getCells().get(0).getQ().getValue());
24402442
row.add("T", request.getCfRows().get(0).getCells().get(0).getT().getValue());
24412443
ObTableParam tableParam = tableRoute.getTableParam(realTableName, row);
24422444
ObTable obTable = tableParam.getObTable();
24432445
request.setTimeout(obTable.getObTableOperationTimeout());
2446+
request.setTabletId(tableParam.getTabletId());
24442447
return executeWithRetry(obTable, request, realTableName);
24452448
}
24462449
}

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class BatchOperation {
4949
boolean hasCheckAndInsUp = false;
5050
boolean hasGet = false;
5151
boolean serverCanRetry = false;
52+
boolean needTabletId = false;
5253
ObTableOperationType lastType = ObTableOperationType.INVALID;
5354
boolean isSameType = true;
5455
protected ObTableEntityType entityType = ObTableEntityType.KV;
@@ -186,6 +187,10 @@ public void setServerCanRetry(boolean canRetry) {
186187
this.serverCanRetry = canRetry;
187188
}
188189

190+
public void setNeedTabletId(boolean needTabletId) {
191+
this.needTabletId = needTabletId;
192+
}
193+
189194
public BatchOperation setIsAtomic(boolean isAtomic) {
190195
this.isAtomic = isAtomic;
191196
return this;
@@ -319,6 +324,7 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
319324
batchOps = new ObTableClientLSBatchOpsImpl(tableName, (ObTableClient) client);
320325
batchOps.setEntityType(entityType);
321326
batchOps.setServerCanRetry(serverCanRetry);
327+
batchOps.setNeedTabletId(needTabletId);
322328
for (Object operation : operations) {
323329
if (operation instanceof CheckAndInsUp) {
324330
checkAndInsUpCnt++;

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObHbaseRequest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@
3333
/*
3434
OB_SERIALIZE_MEMBER(ObHbaseRequest,
3535
credential_,
36+
table_name_,
37+
tablet_id_,
3638
option_flag_,
3739
op_type_,
38-
table_name_,
3940
keys_,
4041
cf_rows_);
4142
*/
@@ -47,6 +48,7 @@
4748
public class ObHbaseRequest extends AbstractPayload implements Credentialable {
4849
protected ObBytesString credential;
4950
protected String tableName; // HBase tableName, OceanBase tablegroup_name
51+
protected long tabletId; // do not serialize
5052
protected ObTableHbaseReqFlag optionFlag = new ObTableHbaseReqFlag();
5153
protected ObTableOperationType opType;
5254
protected List<ObObj> keys = new ArrayList<>();
@@ -186,6 +188,14 @@ public boolean getServerCanRetry() {
186188
return optionFlag.getFlagServerCanRetry();
187189
}
188190

191+
public void setTabletId(long tabletId) {
192+
this.tabletId = tabletId;
193+
}
194+
195+
public long getTabletId() {
196+
return tabletId;
197+
}
198+
189199
public ObBytesString getCredential() {
190200
return credential;
191201
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,14 @@ public boolean getServerCanRetry() {
216216
return option_flag.isServerCanRetry();
217217
}
218218

219+
public void setNeedTabletId(boolean needTabletId) {
220+
option_flag.setNeedTabletId(needTabletId);
221+
}
222+
223+
public boolean getNeedTabletId() {
224+
return option_flag.isNeedTabletId();
225+
}
226+
219227
/*
220228
* Is returning affected entity.
221229
*/

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOptionFlag.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public enum ObTableOptionFlag {
2525
RETURNING_ROWKEY(1 << 0),
2626
USE_PUT(1 << 1),
2727
RETURN_ONE_RES(1 << 2),
28-
SERVER_CAN_RETRY(1 << 3);
28+
SERVER_CAN_RETRY(1 << 3),
29+
DIS_NEED_TABLET_ID(1 << 4);
2930

3031
private int value;
3132
private static Map<Integer, ObTableOptionFlag> map = new HashMap<Integer, ObTableOptionFlag>();
@@ -116,4 +117,16 @@ public void setServerCanRetry(boolean serverCanRetry) {
116117
this.value &= ~(SERVER_CAN_RETRY.value);
117118
}
118119
}
120+
121+
public boolean isNeedTabletId() {
122+
return (this.value & DIS_NEED_TABLET_ID.value) != 0;
123+
}
124+
125+
public void setNeedTabletId(boolean needTabletId) {
126+
if (needTabletId) {
127+
this.value |= DIS_NEED_TABLET_ID.value;
128+
} else {
129+
this.value &= ~(DIS_NEED_TABLET_ID.value);
130+
}
131+
}
119132
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.atomic.AtomicReference;
4747
import java.util.concurrent.locks.ReentrantLock;
4848

49+
import static com.alipay.oceanbase.rpc.protocol.payload.Constants.INVALID_TABLET_ID;
4950
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME;
5051

5152
public abstract class AbstractQueryStreamResult extends AbstractPayload implements
@@ -175,13 +176,13 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
175176
throw new ObTableRoutingWrongException();
176177
}
177178
} else if (result != null && result.isRoutingWrong()) {
178-
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
179-
subObTable.getIp(), subObTable.getPort(), indexTableName, result.isNeedRefreshMeta());
180179
TableEntry tableEntry = result.isNeedRefreshMeta() ?
181180
client.getOrRefreshTableEntry(indexTableName, true) :
182181
client.getOrRefreshTableEntry(indexTableName, false);
183182
long tabletId = client.getTabletIdByPartId(tableEntry, partIdWithIndex.getLeft());
184183
client.refreshTableLocationByTabletId(indexTableName, tabletId);
184+
logger.info("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}, tabletId: {}",
185+
subObTable.getIp(), subObTable.getPort(), indexTableName, result.isNeedRefreshMeta(), tabletId);
185186
}
186187
}
187188
client.resetExecuteContinuousFailureCount(indexTableName);
@@ -412,7 +413,7 @@ public boolean next() throws Exception {
412413
}
413414
}
414415

415-
protected Map<Long, ObPair<Long, ObTableParam>> buildPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception {
416+
protected Map<Long, ObPair<Long, ObTableParam>> buildAllPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception {
416417
Map<Long, ObPair<Long, ObTableParam>> partitionObTables = new LinkedHashMap<>();
417418
String indexName = tableQuery.getIndexName();
418419
if (!client.isOdpMode()) {
@@ -456,6 +457,40 @@ protected Map<Long, ObPair<Long, ObTableParam>> buildPartitions(ObTableClient cl
456457
return partitionObTables;
457458
}
458459

460+
protected Map<Long, ObPair<Long, ObTableParam>> buildFirstPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception {
461+
Map<Long, ObPair<Long, ObTableParam>> partitionObTables = new LinkedHashMap<>();
462+
String indexName = tableQuery.getIndexName();
463+
464+
if (!this.client.isOdpMode()) {
465+
indexTableName = client.getIndexTableName(tableName, indexName, tableQuery.getScanRangeColumns(), false);
466+
}
467+
468+
if (tableQuery.getKeyRanges().isEmpty()) {
469+
throw new IllegalArgumentException("query ranges is empty");
470+
} else {
471+
ObNewRange range = tableQuery.getKeyRanges().get(0);
472+
ObRowKey startKey = range.getStartKey();
473+
int startKeySize = startKey.getObjs().size();
474+
Object[] start = new Object[startKeySize];
475+
476+
for (int i = 0; i < startKeySize; i++) {
477+
start[i] = startKey.getObj(i).isMinObj() || startKey.getObj(i).isMaxObj() ?
478+
startKey.getObj(i) : startKey.getObj(i).getValue();
479+
}
480+
481+
if (this.entityType == ObTableEntityType.HKV && client.isTableGroupName(tableName)) {
482+
indexTableName = client.tryGetTableNameFromTableGroupCache(tableName, false);
483+
}
484+
ObBorderFlag borderFlag = range.getBorderFlag();
485+
List<ObTableParam> params = this.client.getTableParams(indexTableName, tableQuery, start,
486+
borderFlag.isInclusiveStart(), start, borderFlag.isInclusiveEnd());
487+
488+
partitionObTables.put(INVALID_TABLET_ID, new ObPair<>(params.get(0).getPartId(), params.get(0)));
489+
}
490+
491+
return partitionObTables;
492+
}
493+
459494
protected void nextRow() {
460495
rowIndex = rowIndex + 1;
461496
row = cacheRows.poll();

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ protected ObTableQueryAsyncResult referToNewPartition(ObPair<Long, ObTableParam>
114114
throws Exception {
115115
ObTableParam obTableParam = partIdWithObTable.getRight();
116116
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
117-
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId();
117+
long partitionId = needTabletId(queryRequest) ? obTableParam.getPartitionId() : INVALID_TABLET_ID;
118118
// refresh request info
119119
queryRequest.setPartitionId(partitionId);
120120
queryRequest.setTableId(obTableParam.getTableId());
@@ -141,7 +141,7 @@ protected ObTableQueryAsyncResult referToLastStreamResult(ObPair<Long, ObTablePa
141141
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
142142

143143
// refresh request info
144-
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId();
144+
long partitionId = needTabletId(queryRequest) ? obTableParam.getPartitionId() : INVALID_TABLET_ID;
145145
queryRequest.setPartitionId(partitionId);
146146
queryRequest.setTableId(obTableParam.getTableId());
147147

@@ -161,7 +161,7 @@ protected void closeLastStreamResult(ObPair<Long, ObTableParam> partIdWithObTabl
161161
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
162162

163163
// refresh request info
164-
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId();
164+
long partitionId = needTabletId(queryRequest) ? obTableParam.getPartitionId() : INVALID_TABLET_ID;
165165
queryRequest.setPartitionId(partitionId);
166166
queryRequest.setTableId(obTableParam.getTableId());
167167

@@ -278,7 +278,11 @@ public boolean queryNewStreamResultInNext() throws Exception {
278278
protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery,
279279
String tableName)
280280
throws Exception {
281-
return buildPartitions(client, tableQuery, tableName);
281+
if (isDistributeScan()) {
282+
return buildFirstPartitions(client, tableQuery, tableName);
283+
} else {
284+
return buildAllPartitions(client, tableQuery, tableName);
285+
}
282286
}
283287

284288
// This function is designed for HBase-type requests.
@@ -440,6 +444,14 @@ private boolean isDistributeScan() {
440444
return allowDistributeScan && client.getServerCapacity().isSupportDistributedExecute();
441445
}
442446

447+
private boolean needTabletId(ObTableQueryRequest queryRequest) {
448+
if (isDistributeScan()) {
449+
return queryRequest.getNeedTabletId();
450+
} else {
451+
return true;
452+
}
453+
}
454+
443455
public void setAllowDistributeScan(boolean allowDistributeScan) {
444456
this.allowDistributeScan = allowDistributeScan;
445457
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.alipay.oceanbase.rpc.stream;
1919

20-
import com.alipay.oceanbase.rpc.ObTableClient;
2120
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
2221
import com.alipay.oceanbase.rpc.exception.ObTableEntryRefreshException;
2322
import com.alipay.oceanbase.rpc.exception.ObTableRetryExhaustedException;
@@ -44,11 +43,10 @@ public class ObTableClientQueryStreamResult extends AbstractQueryStreamResult {
4443

4544
protected ObTableQueryResult referToNewPartition(ObPair<Long, ObTableParam> partIdWithObTable)
4645
throws Exception {
47-
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
48-
: partIdWithObTable.getRight().getPartitionId();
4946
ObTableQueryRequest request = new ObTableQueryRequest();
5047
request.setTableName(tableName);
5148
request.setTableQuery(tableQuery);
49+
long partitionId = needTabletId(request) ? partIdWithObTable.getRight().getPartitionId() : INVALID_TABLET_ID;
5250
request.setPartitionId(partitionId);
5351
request.setTableId(partIdWithObTable.getRight().getTableId());
5452
request.setEntityType(entityType);
@@ -115,6 +113,18 @@ protected ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partId
115113
protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery tableQuery,
116114
String tableName)
117115
throws Exception {
118-
return buildPartitions(client, tableQuery, tableName);
116+
if (client.getServerCapacity().isSupportDistributedExecute()) {
117+
return buildFirstPartitions(client, tableQuery, tableName);
118+
} else {
119+
return buildAllPartitions(client, tableQuery, tableName);
120+
}
121+
}
122+
123+
private boolean needTabletId(ObTableQueryRequest request) {
124+
if (client.getServerCapacity().isSupportDistributedExecute()) {
125+
return request.getNeedTabletId();
126+
} else {
127+
return true;
128+
}
119129
}
120130
}

0 commit comments

Comments
 (0)