Skip to content

Commit 9a71482

Browse files
committed
merge master into get_partition_meta
2 parents 5691945 + a53ba60 commit 9a71482

File tree

18 files changed

+1436
-429
lines changed

18 files changed

+1436
-429
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 264 additions & 116 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,31 +122,32 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
122122
ObRpcResultCode resultCode = new ObRpcResultCode();
123123
resultCode.decode(buf);
124124
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
125-
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
125+
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
126126
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127127
"routed to the wrong server: " + response.getMessage());
128128
logger.warn(errMessage);
129129
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
130-
throw new ObTableNeedFetchAllException(errMessage);
130+
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
131131
} else if (needFetchPartial(resultCode.getRcode())) {
132-
throw new ObTableRoutingWrongException(errMessage);
132+
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
133133
} else {
134134
// Encountered an unexpected RoutingWrong error code,
135135
// possibly due to the client error code version being behind the observer's version.
136136
// Attempting a full refresh here
137137
// and delegating to the upper-level call to determine whether to throw the exception to the user based on the retry result.
138138
logger.warn("get unexpected error code: {}", response.getMessage());
139-
throw new ObTableNeedFetchAllException(errMessage);
139+
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
140140
}
141141
}
142-
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
142+
if (resultCode.getRcode() != 0
143+
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
143144
String errMessage = TraceUtil.formatTraceMessage(conn, request,
144145
"routed to the wrong server: " + response.getMessage());
145146
logger.warn(errMessage);
146147
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
147-
throw new ObTableNeedFetchAllException(errMessage);
148+
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
148149
} else if (needFetchPartial(resultCode.getRcode())) {
149-
throw new ObTableRoutingWrongException(errMessage);
150+
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
150151
} else {
151152
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
152153
.getObTable().getPort(), response.getHeader().getTraceId1(), response
@@ -193,6 +194,8 @@ private boolean needFetchAll(int errorCode, int pcode) {
193194
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
194195
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
195196
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
197+
|| errorCode == ResultCodes.OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST.errorCode
198+
|| errorCode == ResultCodes.OB_SNAPSHOT_DISCARDED.errorCode
196199
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
197200
}
198201

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 289 additions & 113 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/location/model/TableEntry.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
2424
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
2525

26-
import java.util.HashMap;
2726
import java.util.LinkedHashMap;
2827
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.locks.Lock;
2930

3031
import static com.google.common.base.Preconditions.checkArgument;
3132

@@ -54,7 +55,9 @@ public class TableEntry {
5455
// partition location
5556
private TableEntryKey tableEntryKey = null;
5657
private volatile ObPartitionEntry partitionEntry = null;
57-
58+
59+
public ConcurrentHashMap<Long, Lock> refreshLockMap = new ConcurrentHashMap<>();
60+
5861
/*
5962
* Is valid.
6063
*/
@@ -227,8 +230,6 @@ public void prepare() throws IllegalArgumentException {
227230
checkArgument(partitionInfo != null, "partition table partition info is not ready. key"
228231
+ tableEntryKey);
229232
partitionInfo.prepare();
230-
checkArgument(partitionEntry != null,
231-
"partition table partition entry is not ready. key" + tableEntryKey);
232233
}
233234
}
234235

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionEntry.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,23 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
2426

2527
public class ObPartitionEntry {
2628
private Map<Long, ObPartitionLocation> partitionLocation = new HashMap<Long, ObPartitionLocation>();
2729

2830
// mapping from tablet id to ls id, and the part id to tablet id mapping is in ObPartitionInfo
2931
private Map<Long, Long> tabletLsIdMap = new HashMap<>();
32+
33+
// tabelt id -> (PartitionLocation, LsId)
34+
private ConcurrentHashMap<Long, ObPartitionLocationInfo> partitionInfos = new ConcurrentHashMap<>();
35+
3036

37+
public ObPartitionLocationInfo getPartitionInfo(long tabletId) {
38+
return partitionInfos.computeIfAbsent(tabletId, id -> new ObPartitionLocationInfo());
39+
}
40+
3141
public Map<Long, ObPartitionLocation> getPartitionLocation() {
3242
return partitionLocation;
3343
}
@@ -39,6 +49,16 @@ public void setPartitionLocation(Map<Long, ObPartitionLocation> partitionLocatio
3949
this.partitionLocation = partitionLocation;
4050
}
4151

52+
public Map<Long, Long> getTabletLsIdMap() {
53+
return tabletLsIdMap;
54+
}
55+
56+
public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
57+
this.tabletLsIdMap = tabletLsIdMap;
58+
}
59+
60+
public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
61+
4262
/*
4363
* Get partition location with part id.
4464
*/
@@ -86,14 +106,4 @@ public void prepareForWeakRead(ObServerLdcLocation ldcLocation) {
86106
public String toString() {
87107
return "ObPartitionEntry{" + "partitionLocation=" + partitionLocation + '}';
88108
}
89-
90-
public Map<Long, Long> getTabletLsIdMap() {
91-
return tabletLsIdMap;
92-
}
93-
94-
public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
95-
this.tabletLsIdMap = tabletLsIdMap;
96-
}
97-
98-
public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
99109
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2024 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.location.model.partition;
19+
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.locks.ReentrantReadWriteLock;
23+
24+
import static com.alipay.oceanbase.rpc.protocol.payload.Constants.OB_INVALID_ID;
25+
26+
public class ObPartitionLocationInfo {
27+
private ObPartitionLocation partitionLocation = null;
28+
private Long tabletLsId = OB_INVALID_ID;
29+
private Long lastUpdateTime = 0L;
30+
public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
31+
public AtomicBoolean initialized = new AtomicBoolean(false);
32+
public final CountDownLatch initializationLatch = new CountDownLatch(1);
33+
34+
public ObPartitionLocation getPartitionLocation() {
35+
rwLock.readLock().lock();
36+
try {
37+
return partitionLocation;
38+
} finally {
39+
rwLock.readLock().unlock();
40+
}
41+
}
42+
43+
public void updateLocation(ObPartitionLocation newLocation, Long tabletLsId) {
44+
rwLock.writeLock().lock();
45+
try {
46+
this.partitionLocation = newLocation;
47+
this.tabletLsId = tabletLsId;
48+
this.lastUpdateTime = System.currentTimeMillis();
49+
} finally {
50+
rwLock.writeLock().unlock();
51+
}
52+
}
53+
54+
public Long getTabletLsId() {
55+
rwLock.readLock().lock();
56+
try {
57+
return tabletLsId;
58+
} finally {
59+
rwLock.readLock().unlock();
60+
}
61+
}
62+
63+
public Long getLastUpdateTime() {
64+
rwLock.readLock().lock();
65+
try {
66+
return lastUpdateTime;
67+
} finally {
68+
rwLock.readLock().unlock();
69+
}
70+
}
71+
}

src/main/java/com/alipay/oceanbase/rpc/property/Property.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public enum Property {
137137
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"),
138138

139139
// [ObTable][OTHERS]
140-
SERVER_ENABLE_REROUTING("server.enable.rerouting", true, "开启server端的重定向回复功能"),
140+
SERVER_ENABLE_REROUTING("server.enable.rerouting", false, "开启server端的重定向回复功能"),
141141

142142
/*
143143
* other config

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,9 @@ public enum ResultCodes {
358358
OB_CLUSTER_NO_MATCH(-4666), //
359359
OB_CHECK_ZONE_MERGE_ORDER(-4667), //
360360
OB_ERR_ZONE_NOT_EMPTY(-4668), //
361-
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), OB_LS_NOT_EXIST(-4719), //
361+
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), //
362+
OB_LS_NOT_EXIST(-4719), //
363+
OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST(-4723), //
362364
OB_TABLET_NOT_EXIST(-4725), //
363365
OB_ERR_PARSER_INIT(-5000), //
364366
OB_ERR_PARSE_SQL(-5001), //

0 commit comments

Comments
 (0)