Skip to content

Commit bd246ea

Browse files
maochongxinmiyuan-ljrshenyunlongGroundWu
authored
route single partition refresh (#247)
* Optimize SQL for refreshing table location information * partical refresh * Fix infinite loop caused by removed reference * fix lsop retry fail * Remove unnecessary comments and format code * Fix frequent refresh lock failures due to short refresh interval * Fix frequent refresh lock failures due to short refresh interval * add result code -4723 * fix review * add -4138 * fix review: add ut for byteutil * fix lsop refresh location * Merge pull request #204 from oceanbase/retry_batchops_merge_master Enhance Client Support for Partition Splitting * feature: Optimize partial refresh and add retry for executeMutation/query (#213) * remove lock in refreshTableLocationByTabletId * fix refresh sql * fix refresh interval * add retry logic for common query * add retry logic for executeMutation * add retry logic for executeMutation * add retry logic for ObTableQueryAndMutateRequest * fix: correct ineffective retry logic * retry logic for ObTableClient execute * fix * fix * fix 3.x null exception (#214) * fix * fix regress * [fix] add threshold for refresh table entry with location (#220) * add threshold for refresh table entry with location * [Fix] remove TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD in Property * [Fix] fix compile error --------- Co-authored-by: shenyunlong.syl <[email protected]> * [fix] 3.x compatible (#223) * fix * fix * [fix] test case stuck in await (#227) * fix * fix * refine * Fix global route refresh wrong (#231) * fix global index route refresh wrong when route need refresh * fix global index route wrong * revert unused commit * fix global index route wrong when need partial route refresh (#232) * fix global index route wrong when need partial route refresh * Ensure PartitionInfo is Exposed Only After Leader is Found During Single Shard Refresh --------- Co-authored-by: maochongxin <[email protected]> * fix PartitionNumOneTest routed to the wrong server * add error code 5627 * fix extendType parse object to comparable * fix refresh location param * fix log; add retry interval for executeWithRetries * Add quick path: return quickly if recently refreshed * add log for batchOpsImpl * fix * fix table.entry.refresh.interval.wait * fix global index route error * fix * TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD -> 0 * add retry code =4242 * fix getPartitionLocation nullptr && resolve conflict error * fix resolve conflict error * fix getLs nullptr * fix dr nullptr * fix global index nullptr * fix unexpected addr expired * fix unexpected addr expired * Reduce log printing and optimize log output content; * fix update lsId if location exists * fix log * fix lsop -5200 with table_group * fix rpc.execute.timeout * fix log * fix log && fix getPartitionReplica return wrong part id --------- Co-authored-by: miyuan-ljr <[email protected]> Co-authored-by: shenyunlong.syl <[email protected]> Co-authored-by: GroundWu <[email protected]>
1 parent 5aa84e7 commit bd246ea

File tree

13 files changed

+835
-361
lines changed

13 files changed

+835
-361
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 307 additions & 125 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
165165
} else {
166166
String errMessage = TraceUtil.formatTraceMessage(conn, response,
167167
"receive unexpected command code: " + response.getCmdCode().value());
168-
throw new ObTableUnexpectedException(errMessage);
168+
throw new ObTableUnexpectedException(errMessage, resultCode.getRcode());
169169
}
170170

171171
payload.decode(buf);
@@ -196,6 +196,8 @@ private boolean needFetchAll(int errorCode, int pcode) {
196196
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
197197
|| errorCode == ResultCodes.OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST.errorCode
198198
|| errorCode == ResultCodes.OB_SNAPSHOT_DISCARDED.errorCode
199+
|| errorCode == ResultCodes.OB_SCHEMA_EAGAIN.errorCode
200+
|| errorCode == ResultCodes.OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH.errorCode
199201
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
200202
}
201203

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 372 additions & 129 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionEntry.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,18 @@ public ObPartitionLocation getPartitionLocationWithPartId(long partId) {
6767
}
6868

6969
/*
70-
* Put partition location with part id.
70+
* Get partition location with tablet id.
7171
*/
72-
public ObPartitionLocation putPartitionLocationWithPartId(long partId,
73-
ObPartitionLocation ObpartitionLocation) {
74-
return partitionLocation.put(partId, ObpartitionLocation);
72+
public ObPartitionLocation getPartitionLocationWithTabletId(long tabletId) {
73+
return partitionLocation.get(tabletId);
7574
}
7675

7776
/*
78-
* Get partition location with tablet id.
77+
* Put partition location with part id.
7978
*/
80-
public ObPartitionLocation getPartitionLocationWithTabletId(long tabletId) {
81-
return partitionLocation.get(tabletId);
79+
public ObPartitionLocation putPartitionLocationWithPartId(long partId,
80+
ObPartitionLocation ObpartitionLocation) {
81+
return partitionLocation.put(partId, ObpartitionLocation);
8282
}
8383

8484
/*

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionLocationInfo.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.concurrent.CountDownLatch;
2121
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.locks.ReentrantLock;
2223
import java.util.concurrent.locks.ReentrantReadWriteLock;
2324

2425
import static com.alipay.oceanbase.rpc.protocol.payload.Constants.OB_INVALID_ID;
@@ -30,7 +31,9 @@ public class ObPartitionLocationInfo {
3031
public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
3132
public AtomicBoolean initialized = new AtomicBoolean(false);
3233
public final CountDownLatch initializationLatch = new CountDownLatch(1);
33-
34+
35+
public ReentrantLock refreshLock = new ReentrantLock();
36+
3437
public ObPartitionLocation getPartitionLocation() {
3538
rwLock.readLock().lock();
3639
try {

src/main/java/com/alipay/oceanbase/rpc/property/Property.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public enum Property {
7171
TABLE_ENTRY_REFRESH_INTERNAL_CEILING("table.entry.refresh.internal.ceiling", 1600L,
7272
"刷新TABLE地址的最大时间间隔"),
7373

74-
TABLE_ENTRY_REFRESH_INTERVAL_WAIT("table.entry.refresh.interval.wait", false,
74+
TABLE_ENTRY_REFRESH_INTERVAL_WAIT("table.entry.refresh.interval.wait", true,
7575
"刷新TABLE地址时是否等待间隔时间"),
7676

7777
TABLE_ENTRY_REFRESH_LOCK_TIMEOUT("table.entry.refresh.lock.timeout", 4000L, "刷新TABLE地址的锁超时时间"),
@@ -113,7 +113,7 @@ public enum Property {
113113
// [ObTable][RPC]
114114
RPC_CONNECT_TRY_TIMES("rpc.connect.try.times", 3, "建立RPC连接的尝试次数"),
115115

116-
RPC_EXECUTE_TIMEOUT("rpc.execute.timeout", 3000, "执行RPC请求的socket超时时间"),
116+
RPC_EXECUTE_TIMEOUT("rpc.execute.timeout", 12000, "执行RPC请求的socket超时时间"),
117117

118118
RPC_LOGIN_TIMEOUT("rpc.login.timeout", 1000, "请求RPC登录的超时时间"),
119119

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1350,10 +1350,12 @@ public ObObjMeta getDefaultObjMeta() {
13501350
* Parse to comparable.
13511351
*/
13521352
@Override
1353-
public Comparable parseToComparable(Object o, ObCollationType ct)
1354-
throws IllegalArgumentException,
1355-
FeatureNotSupportedException {
1356-
throw new FeatureNotSupportedException("ObUnknownType is not supported .");
1353+
public Comparable parseToComparable(Object o, ObCollationType ct) throws IllegalArgumentException{
1354+
if (o instanceof Number) {
1355+
return parseLong(this, o, ct);
1356+
} else{
1357+
return parseTextToComparable(this, o, ct);
1358+
}
13571359
}
13581360

13591361
}, // Min, Max, NOP etc.

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;
1919

20+
import com.alipay.oceanbase.rpc.ObGlobal;
2021
import com.alipay.oceanbase.rpc.ObTableClient;
2122
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
2223
import com.alipay.oceanbase.rpc.exception.*;
2324
import com.alipay.oceanbase.rpc.location.model.ObReadConsistency;
2425
import com.alipay.oceanbase.rpc.location.model.ObServerRoute;
26+
import com.alipay.oceanbase.rpc.location.model.TableEntry;
2527
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2628
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
2729
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
@@ -44,6 +46,8 @@
4446
import java.util.concurrent.atomic.AtomicReference;
4547
import java.util.concurrent.locks.ReentrantLock;
4648

49+
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME;
50+
4751
public abstract class AbstractQueryStreamResult extends AbstractPayload implements
4852
QueryStreamResult {
4953

@@ -52,7 +56,6 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
5256
protected volatile boolean closed = false;
5357
protected volatile List<ObObj> row = null;
5458
protected volatile int rowIndex = -1;
55-
// 调整它的startKey
5659
protected ObTableQuery tableQuery;
5760
protected long operationTimeout = -1;
5861
protected String tableName;
@@ -68,7 +71,8 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
6871
private ObReadConsistency readConsistency = ObReadConsistency.STRONG;
6972
// ObRowKey objs: [startKey, MIN_OBJECT, MIN_OBJECT]
7073
public List<ObObj> currentStartKey;
71-
74+
protected ObTableClient client;
75+
7276
/*
7377
* Get pcode.
7478
*/
@@ -144,6 +148,11 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
144148
if (failedServerList != null) {
145149
route.setBlackList(failedServerList);
146150
}
151+
if (ObGlobal.obVsnMajor() >= 4) {
152+
TableEntry tableEntry = client.getOrRefreshTableEntry(indexTableName, false, false, false);
153+
client.refreshTableLocationByTabletId(tableEntry, indexTableName, client.getTabletIdByPartId(tableEntry, partIdWithIndex.getLeft()));
154+
}
155+
147156
subObTable = client
148157
.getTableWithPartId(indexTableName, partIdWithIndex.getLeft(),
149158
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(),
@@ -154,7 +163,6 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
154163
result = subObTable.executeWithConnection(request, connectionRef);
155164
} else {
156165
result = subObTable.execute(request);
157-
158166
if (result != null && result.getPcode() == Pcodes.OB_TABLE_API_MOVE) {
159167
ObTableApiMove moveResponse = (ObTableApiMove) result;
160168
client.getRouteTableRefresher().addTableIfAbsent(indexTableName, true);
@@ -243,35 +251,30 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
243251
} else if (e instanceof ObTableException) {
244252
if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || ((ObTableException) e)
245253
.getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode)
246-
&& ((request instanceof ObTableQueryAsyncRequest && ((ObTableQueryAsyncRequest) request)
247-
.getObTableQueryRequest().getTableQuery().isHbaseQuery()) || (request instanceof ObTableQueryRequest && ((ObTableQueryRequest) request)
248-
.getTableQuery().isHbaseQuery()))
254+
&& ((request instanceof ObTableQueryAsyncRequest && ((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery())
255+
|| (request instanceof ObTableQueryRequest && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery()))
249256
&& client.getTableGroupInverted().get(indexTableName) != null) {
250257
// table not exists && hbase mode && table group exists , three condition both
251258
client.eraseTableGroupFromCache(tableName);
252259
}
253260
if (((ObTableException) e).isNeedRefreshTableEntry()) {
254261
needRefreshTableEntry = true;
255-
logger
256-
.warn(
257-
"tablename:{} partition id:{} stream query refresh table while meet Exception needing refresh, errorCode: {}",
258-
indexTableName, partIdWithIndex.getLeft(),
259-
((ObTableException) e).getErrorCode(), e);
260262
if (client.isRetryOnChangeMasterTimes()
261263
&& (tryTimes - 1) < client.getRuntimeRetryTimes()) {
262-
logger
263-
.warn(
264-
"tablename:{} partition id:{} stream query retry while meet Exception needing refresh, errorCode: {} , retry times {}",
265-
indexTableName, partIdWithIndex.getLeft(),
266-
((ObTableException) e).getErrorCode(), tryTimes, e);
267264
// tablet not exists, refresh table entry
268265
if (e instanceof ObTableNeedFetchAllException) {
269266
client.getOrRefreshTableEntry(indexTableName, true, true, true);
270267
throw e;
271268
}
272269
} else {
270+
String logMessage = String.format(
271+
"exhaust retry while meet NeedRefresh Exception, table name: %s, ls id: %d, batch ops refresh table, errorCode: %d",
272+
indexTableName,
273+
((ObTableException) e).getErrorCode()
274+
);
275+
logger.warn(logMessage, e);
273276
client.calculateContinuousFailure(indexTableName, e.getMessage());
274-
throw e;
277+
throw new ObTableRetryExhaustedException(logMessage, e);
275278
}
276279
} else {
277280
client.calculateContinuousFailure(indexTableName, e.getMessage());
@@ -349,10 +352,7 @@ public boolean next() throws Exception {
349352

350353
} catch (Exception e) {
351354
if (e instanceof ObTableNeedFetchAllException) {
352-
// Adjust the start key and refresh the expectant
353-
this.tableQuery.adjustStartKey(currentStartKey);
354355
setExpectant(refreshPartition(tableQuery, tableName));
355-
356356
// Reset the iterator to start over
357357
it = expectant.entrySet().iterator();
358358
referPartition.clear(); // Clear the referPartition if needed
@@ -389,8 +389,6 @@ public boolean next() throws Exception {
389389
protected Map<Long, ObPair<Long, ObTableParam>> buildPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception {
390390
Map<Long, ObPair<Long, ObTableParam>> partitionObTables = new LinkedHashMap<>();
391391
String indexName = tableQuery.getIndexName();
392-
String indexTableName = null;
393-
394392
if (!client.isOdpMode()) {
395393
indexTableName = client.getIndexTableName(tableName, indexName, tableQuery.getScanRangeColumns(), false);
396394
}
@@ -442,12 +440,12 @@ protected void nextRow() {
442440

443441
protected void checkStatus() throws IllegalStateException {
444442
if (!initialized) {
445-
throw new IllegalStateException("table " + tableName
443+
throw new IllegalStateException("table " + indexTableName
446444
+ "query stream result is not initialized");
447445
}
448446

449447
if (closed) {
450-
throw new IllegalStateException("table " + tableName + " query stream result is closed");
448+
throw new IllegalStateException("table " + indexTableName + " query stream result is closed");
451449
}
452450
}
453451

@@ -576,9 +574,33 @@ public void init() throws Exception {
576574
return;
577575
}
578576
if (tableQuery.getBatchSize() == -1) {
579-
for (Map.Entry<Long, ObPair<Long, ObTableParam>> entry : expectant.entrySet()) {
580-
// mark the refer partition
581-
referToNewPartition(entry.getValue());
577+
if (!expectant.isEmpty()) {
578+
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
579+
.iterator();
580+
int retryTimes = 0;
581+
while (it.hasNext()) {
582+
Map.Entry<Long, ObPair<Long, ObTableParam>> entry = it.next();
583+
try {
584+
// try access new partition, async will not remove useless expectant
585+
referToNewPartition(entry.getValue());
586+
} catch (Exception e) {
587+
if (e instanceof ObTableNeedFetchAllException) {
588+
setExpectant(refreshPartition(tableQuery, tableName));
589+
it = expectant.entrySet().iterator();
590+
retryTimes++;
591+
if (retryTimes > client.getRuntimeRetryTimes()) {
592+
RUNTIME.error("Fail to get refresh table entry response after {}",
593+
retryTimes);
594+
throw new ObTableRetryExhaustedException(
595+
"Fail to get refresh table entry response after " + retryTimes +
596+
"errorCode:" + ((ObTableNeedFetchAllException) e).getErrorCode());
597+
598+
}
599+
} else {
600+
throw e;
601+
}
602+
}
603+
}
582604
}
583605
expectant.clear();
584606
} else {
@@ -719,4 +741,19 @@ public ObReadConsistency getReadConsistency() {
719741
public void setReadConsistency(ObReadConsistency readConsistency) {
720742
this.readConsistency = readConsistency;
721743
}
744+
745+
/**
746+
* Get client.
747+
* @return client
748+
*/
749+
public ObTableClient getClient() {
750+
return client;
751+
}
752+
753+
/*
754+
* Set client.
755+
*/
756+
public void setClient(ObTableClient client) {
757+
this.client = client;
758+
}
722759
}

0 commit comments

Comments
 (0)