Skip to content

Commit e5e8687

Browse files
authored
Merge pull request #204 from oceanbase/retry_batchops_merge_master
Enhance Client Support for Partition Splitting
2 parents 29c367a + 64926a4 commit e5e8687

File tree

19 files changed

+1297
-370
lines changed

19 files changed

+1297
-370
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 189 additions & 81 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,31 +122,32 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
122122
ObRpcResultCode resultCode = new ObRpcResultCode();
123123
resultCode.decode(buf);
124124
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
125-
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
125+
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
126126
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127127
"routed to the wrong server: " + response.getMessage());
128128
logger.warn(errMessage);
129129
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
130-
throw new ObTableNeedFetchAllException(errMessage);
130+
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
131131
} else if (needFetchPartial(resultCode.getRcode())) {
132-
throw new ObTableRoutingWrongException(errMessage);
132+
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
133133
} else {
134134
// Encountered an unexpected RoutingWrong error code,
135135
// possibly due to the client error code version being behind the observer's version.
136136
// Attempting a full refresh here
137137
// and delegating to the upper-level call to determine whether to throw the exception to the user based on the retry result.
138138
logger.warn("get unexpected error code: {}", response.getMessage());
139-
throw new ObTableNeedFetchAllException(errMessage);
139+
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
140140
}
141141
}
142-
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
142+
if (resultCode.getRcode() != 0
143+
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
143144
String errMessage = TraceUtil.formatTraceMessage(conn, request,
144145
"routed to the wrong server: " + response.getMessage());
145146
logger.warn(errMessage);
146147
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
147-
throw new ObTableNeedFetchAllException(errMessage);
148+
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
148149
} else if (needFetchPartial(resultCode.getRcode())) {
149-
throw new ObTableRoutingWrongException(errMessage);
150+
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
150151
} else {
151152
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
152153
.getObTable().getPort(), response.getHeader().getTraceId1(), response
@@ -193,6 +194,8 @@ private boolean needFetchAll(int errorCode, int pcode) {
193194
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
194195
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
195196
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
197+
|| errorCode == ResultCodes.OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST.errorCode
198+
|| errorCode == ResultCodes.OB_SNAPSHOT_DISCARDED.errorCode
196199
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
197200
}
198201

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 283 additions & 116 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/location/model/TableEntry.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
2424
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
2525

26-
import java.util.HashMap;
2726
import java.util.LinkedHashMap;
2827
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.locks.Lock;
2930

3031
import static com.google.common.base.Preconditions.checkArgument;
3132

@@ -53,7 +54,9 @@ public class TableEntry {
5354
// partition location
5455
private TableEntryKey tableEntryKey = null;
5556
private volatile ObPartitionEntry partitionEntry = null;
56-
57+
58+
public ConcurrentHashMap<Long, Lock> refreshLockMap = new ConcurrentHashMap<>();
59+
5760
/*
5861
* Is valid.
5962
*/
@@ -218,8 +221,6 @@ public void prepare() throws IllegalArgumentException {
218221
checkArgument(partitionInfo != null, "partition table partition info is not ready. key"
219222
+ tableEntryKey);
220223
partitionInfo.prepare();
221-
checkArgument(partitionEntry != null,
222-
"partition table partition entry is not ready. key" + tableEntryKey);
223224
}
224225
}
225226

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionEntry.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,23 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
2426

2527
public class ObPartitionEntry {
2628
private Map<Long, ObPartitionLocation> partitionLocation = new HashMap<Long, ObPartitionLocation>();
2729

2830
// mapping from tablet id to ls id, and the part id to tablet id mapping is in ObPartitionInfo
2931
private Map<Long, Long> tabletLsIdMap = new HashMap<>();
32+
33+
// tabelt id -> (PartitionLocation, LsId)
34+
private ConcurrentHashMap<Long, ObPartitionLocationInfo> partitionInfos = new ConcurrentHashMap<>();
35+
3036

37+
public ObPartitionLocationInfo getPartitionInfo(long tabletId) {
38+
return partitionInfos.computeIfAbsent(tabletId, id -> new ObPartitionLocationInfo());
39+
}
40+
3141
public Map<Long, ObPartitionLocation> getPartitionLocation() {
3242
return partitionLocation;
3343
}
@@ -39,6 +49,16 @@ public void setPartitionLocation(Map<Long, ObPartitionLocation> partitionLocatio
3949
this.partitionLocation = partitionLocation;
4050
}
4151

52+
public Map<Long, Long> getTabletLsIdMap() {
53+
return tabletLsIdMap;
54+
}
55+
56+
public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
57+
this.tabletLsIdMap = tabletLsIdMap;
58+
}
59+
60+
public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
61+
4262
/*
4363
* Get partition location with part id.
4464
*/
@@ -86,14 +106,4 @@ public void prepareForWeakRead(ObServerLdcLocation ldcLocation) {
86106
public String toString() {
87107
return "ObPartitionEntry{" + "partitionLocation=" + partitionLocation + '}';
88108
}
89-
90-
public Map<Long, Long> getTabletLsIdMap() {
91-
return tabletLsIdMap;
92-
}
93-
94-
public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
95-
this.tabletLsIdMap = tabletLsIdMap;
96-
}
97-
98-
public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
99109
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2024 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.location.model.partition;
19+
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.locks.ReentrantReadWriteLock;
23+
24+
import static com.alipay.oceanbase.rpc.protocol.payload.Constants.OB_INVALID_ID;
25+
26+
public class ObPartitionLocationInfo {
27+
private ObPartitionLocation partitionLocation = null;
28+
private Long tabletLsId = OB_INVALID_ID;
29+
private Long lastUpdateTime = 0L;
30+
public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
31+
public AtomicBoolean initialized = new AtomicBoolean(false);
32+
public final CountDownLatch initializationLatch = new CountDownLatch(1);
33+
34+
public ObPartitionLocation getPartitionLocation() {
35+
rwLock.readLock().lock();
36+
try {
37+
return partitionLocation;
38+
} finally {
39+
rwLock.readLock().unlock();
40+
}
41+
}
42+
43+
public void updateLocation(ObPartitionLocation newLocation) {
44+
this.partitionLocation = newLocation;
45+
this.lastUpdateTime = System.currentTimeMillis();
46+
}
47+
48+
public Long getTabletLsId() {
49+
return tabletLsId;
50+
}
51+
52+
public void setTabletLsId(Long tabletLsId) {
53+
this.tabletLsId = tabletLsId;
54+
}
55+
56+
public Long getLastUpdateTime() {
57+
rwLock.readLock().lock();
58+
try {
59+
return lastUpdateTime;
60+
} finally {
61+
rwLock.readLock().unlock();
62+
}
63+
}
64+
}

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObRangePartDesc.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ public ObRangePartDesc() {
6666
public List<ObObjType> getOrderedCompareColumnTypes() {
6767
return orderedCompareColumnTypes;
6868
}
69-
private List<Long> completeWorks;
69+
70+
private List<Long> completeWorks;
7071

7172
/*
7273
* Set ordered compare column types.
@@ -299,20 +300,20 @@ public int getBoundsIdx(boolean isScan, Row rowKey) {
299300
try {
300301
List<Object> evalParams = evalRowKeyValues(rowKey);
301302
List<Comparable> comparableElement = super.initComparableElementByTypes(evalParams,
302-
this.orderedCompareColumns);
303+
this.orderedCompareColumns);
303304
ObPartitionKey searchKey = ObPartitionKey.getInstance(orderedCompareColumns,
304-
comparableElement);
305+
comparableElement);
305306

306307
int pos = upperBound(this.bounds, new ObComparableKV<ObPartitionKey, Long>(searchKey,
307-
(long) -1));
308+
(long) -1));
308309
if (pos >= this.bounds.size()) {
309310
if (isScan) {
310311
// if range is bigger than rangeMax while scanning
311312
// we just scan until last range
312313
return this.bounds.size() - 1;
313314
}
314315
throw new ArrayIndexOutOfBoundsException("Table has no partition for value in "
315-
+ this.getPartExpr());
316+
+ this.getPartExpr());
316317
} else {
317318
return pos;
318319
}

src/main/java/com/alipay/oceanbase/rpc/property/Property.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public enum Property {
134134
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"),
135135

136136
// [ObTable][OTHERS]
137-
SERVER_ENABLE_REROUTING("server.enable.rerouting", true, "开启server端的重定向回复功能"),
137+
SERVER_ENABLE_REROUTING("server.enable.rerouting", false, "开启server端的重定向回复功能"),
138138

139139
/*
140140
* other config

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,9 @@ public enum ResultCodes {
358358
OB_CLUSTER_NO_MATCH(-4666), //
359359
OB_CHECK_ZONE_MERGE_ORDER(-4667), //
360360
OB_ERR_ZONE_NOT_EMPTY(-4668), //
361-
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), OB_LS_NOT_EXIST(-4719), //
361+
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), //
362+
OB_LS_NOT_EXIST(-4719), //
363+
OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST(-4723), //
362364
OB_TABLET_NOT_EXIST(-4725), //
363365
OB_ERR_PARSER_INIT(-5000), //
364366
OB_ERR_PARSE_SQL(-5001), //

0 commit comments

Comments
 (0)