Skip to content

Commit 47ad913

Browse files
JackShi148maochongxinGroundWuWeiXinChanmiyuan-ljr
authored
Get partition meta for HBase data synchronization and Table batch single-partition operation (#178)
* git reset to no test util commit * add ObFetchPartitionMetaType in request, pass re-fetch meta * pass all situations to get ODP partition meta * fix LSBatch in HBase mode, pass getPartition self-testing * correct format * revert LSBatch negateHbaseTimestamp * revert batch atomic change * correct substr test * compatible with table master branch * update addScanRange test with partition * update odp rpc lock timeout, add odp range query * add odp refresh lock timeout into initProperties * remove getPartition with only row key values * delete useless getStart and getEnd interface in Partition * format code * remove testing output * bugfix hbase mode using odp mode does not add row key; format code * add exception dealing in fetching odp partition meta * correct test case * do not calculate partition_id in ODP mode when using index * do not calculate partition_id in ODP mode when using index query * fetch the latest partition information in ocp mode everytime using getPartition * add refresh flag in getPartition to get latest table and partition information * Optimize SQL for refreshing table location information * partical refresh * Fix infinite loop caused by removed reference * fix lsop retry fail * Remove unnecessary comments and format code * Fix frequent refresh lock failures due to short refresh interval * Fix frequent refresh lock failures due to short refresh interval * add result code -4723 * correct test; getPartition with rowKey do not need to add rowKey before * correct test cases * fix batch in odp mode after merge * bugfix query with part_id * fix review * add -4138 * fix review: add ut for byteutil * fix lsop refresh location * fix: correct reverse-scan results affected by retry logic * set ODP mode retry number to 3 * revert retry time * feature: Optimize partial refresh and add retry for executeMutation/query (#213) * remove lock in refreshTableLocationByTabletId * fix refresh sql * fix refresh interval * add retry logic for common query * add retry logic for executeMutation * add retry logic for executeMutation * add retry logic for ObTableQueryAndMutateRequest * fix: correct ineffective retry logic * retry logic for ObTableClient execute * fix * fix * opt for errorMsg when execute batch in the scene of table not exist * fix ls batch errMsg * update pom.xml * remove some unused dependencies in pom.xml * fix 3.x null exception (#214) * fix * fix regress * [fix] add threshold for refresh table entry with location (#220) * add threshold for refresh table entry with location * [Fix] remove TABLE_ENTRY_LOCATION_REFRESH_THRESHOLD in Property * [Fix] fix compile error --------- Co-authored-by: shenyunlong.syl <[email protected]> * Fix the issue where the tablet_id is incorrect when the partition table has a single partition (#226) * [fix] 3.x compatible (#223) * fix * fix * [fix] test case stuck in await (#227) * fix * fix * refine * partition=1 error fix * fix after merge and correct some test cases * fix getPartition after merge * add logic id into partition * fix global index route refresh wrong when route need refresh (#228) * Fix global route refresh wrong (#231) * fix global index route refresh wrong when route need refresh * fix global index route wrong * revert unused commit * revert ls batch, do not fetch odp partition meta in ls batch * [Chore] remove the use of sofa common log * modify version control for direct-load partition (#236) * param (#221) * [fix] multi-cf retry table not exist (#229) * fix global index route wrong when need partial route refresh (#232) * fix global index route wrong when need partial route refresh * Ensure PartitionInfo is Exposed Only After Leader is Found During Single Shard Refresh --------- Co-authored-by: maochongxin <[email protected]> * feature: hbase compatible 2.x (#242) * support hbase scan renewLease (#211) * hbase tablename bug fix (#237) * hbase empty family scan error (#239) * hbase tablename bug fix * hbase empty family scan error --------- Co-authored-by: stuBirdFly <[email protected]> * everytime force new if flag is true * add comments * fix -5150 error and try to refresh route after relogin * fix getAllPartitionInternal * fix -5150 tenent not exists * remove useless import * revert global index test case --------- Co-authored-by: maochongxin <[email protected]> Co-authored-by: GroundWu <[email protected]> Co-authored-by: WeiXinChan <[email protected]> Co-authored-by: miyuan-ljr <[email protected]> Co-authored-by: shenyunlong.syl <[email protected]> Co-authored-by: stuBirdFly <[email protected]> Co-authored-by: medcll <[email protected]>
1 parent ca68517 commit 47ad913

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+3834
-97
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package com.alipay.oceanbase.rpc;
1919

20+
import com.alipay.oceanbase.rpc.location.model.partition.Partition;
2021
import com.alipay.oceanbase.rpc.mutation.Row;
22+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2123
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
2224
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
2325
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObHTableFilter;
@@ -152,6 +154,28 @@ public TableQuery addScanRange(Object[] start, boolean startEquals, Object[] end
152154
return this;
153155
}
154156

157+
@Override
158+
public TableQuery addScanRange(Object start, Object end) {
159+
if (start instanceof Partition) {
160+
Long startPartitionId = ((Partition) start).getPartitionId();
161+
Long endPartitionId = ((Partition) end).getPartitionId();
162+
if (!startPartitionId.equals(endPartitionId)) {
163+
throw new IllegalArgumentException(
164+
"The partition id must be the same for start and end partition in scan range");
165+
}
166+
Long startPartId = ((Partition) start).getPartId();
167+
Long endPartId = ((Partition) end).getPartId();
168+
if (!startPartId.equals(endPartId)) {
169+
throw new IllegalArgumentException(
170+
"The logic part id must be the same for start and end partition in scan range");
171+
}
172+
tableClientQuery.setPartId(startPartId);
173+
start = ObObj.getMin();
174+
end = ObObj.getMax();
175+
}
176+
return addScanRange(new Object[] { start }, true, new Object[] { end }, true);
177+
}
178+
155179
/**
156180
* Add scan range starts with.
157181
*/

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 443 additions & 5 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
2424
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableDirectLoadResult;
2525
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove;
26+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObFetchPartitionMetaResult;
2627
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
2728
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableLSOpResult;
2829
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult;
@@ -106,6 +107,15 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
106107
return new ObTableLSOpResult();
107108
}
108109
}, //
110+
OB_TABLE_API_PART_META_QUERY(Pcodes.OB_TABLE_API_PART_META_QUERY) {
111+
/**
112+
* New payload.
113+
*/
114+
@Override
115+
public ObPayload newPayload(ObRpcPacketHeader header) {
116+
return new ObFetchPartitionMetaResult();
117+
}
118+
}, //
109119
OB_TABLE_API_MOVE(Pcodes.OB_TABLE_API_MOVE) {
110120
/**
111121
* New payload.
@@ -159,6 +169,8 @@ public static ObTablePacketCode valueOf(short value) {
159169
return OB_TABLE_API_DIRECT_LOAD;
160170
case Pcodes.OB_TABLE_API_LS_EXECUTE:
161171
return OB_TABLE_API_LS_EXECUTE;
172+
case Pcodes.OB_TABLE_API_PART_META_QUERY:
173+
return OB_TABLE_API_PART_META_QUERY;
162174
case Pcodes.OB_TABLE_API_MOVE:
163175
return OB_TABLE_API_MOVE;
164176
case Pcodes.OB_ERROR_PACKET:

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
122122
ObRpcResultCode resultCode = new ObRpcResultCode();
123123
resultCode.decode(buf);
124124
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
125-
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
125+
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
126126
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127127
"routed to the wrong server: " + response.getMessage());
128128
logger.warn(errMessage);
@@ -139,7 +139,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
139139
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
140140
}
141141
}
142-
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
142+
if (resultCode.getRcode() != 0
143+
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
143144
String errMessage = TraceUtil.formatTraceMessage(conn, request,
144145
"routed to the wrong server: " + response.getMessage());
145146
logger.warn(errMessage);
@@ -193,23 +194,25 @@ private boolean needFetchAll(int errorCode, int pcode) {
193194
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
194195
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
195196
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
197+
|| errorCode == ResultCodes.OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST.errorCode
198+
|| errorCode == ResultCodes.OB_SNAPSHOT_DISCARDED.errorCode
196199
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
197200
}
198201

199202
private boolean needFetchPartial(int errorCode) {
200203
return errorCode == ResultCodes.OB_LOCATION_LEADER_NOT_EXIST.errorCode
201-
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
202-
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
203-
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
204-
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
205-
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
206-
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
207-
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
208-
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
209-
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
210-
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
211-
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
212-
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
213-
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
204+
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
205+
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
206+
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
207+
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
208+
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
209+
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
210+
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
211+
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
212+
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
213+
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
214+
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
215+
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
216+
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
214217
}
215218
}

src/main/java/com/alipay/oceanbase/rpc/exception/ExceptionUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ public static ObTableException convertToObTableException(String host, int port,
8181
+ "]" + "[" + resultCodes.name() + "]"
8282
+ "[" + errMsg + "]" + "[" + server + "]"
8383
+ "[" + trace + "]", resultCodes.errorCode);
84+
} else if (resultCodes.errorCode == OB_ERR_KV_ROUTE_ENTRY_EXPIRE.errorCode) {
85+
return new ObTablePartitionChangeException("[" + String.valueOf(resultCodes.errorCode)
86+
+ "]" + "[" + resultCodes.name() + "]" + "["
87+
+ errMsg + "]" + "[" + server + "]" + "["
88+
+ trace + "]", resultCodes.errorCode);
8489
} else {
8590
// [errCode][errCodeName][errMsg][server][trace]
8691
return new ObTableException("[" + String.valueOf(resultCodes.errorCode) + "]" + "["

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.alibaba.fastjson.JSONException;
2222
import com.alibaba.fastjson.JSONObject;
2323
import com.alibaba.fastjson.parser.ParserConfig;
24-
import com.alipay.oceanbase.rpc.ObClusterTableBatchOps;
2524
import com.alipay.oceanbase.rpc.ObGlobal;
2625
import com.alipay.oceanbase.rpc.constant.Constants;
2726
import com.alipay.oceanbase.rpc.exception.*;

src/main/java/com/alipay/oceanbase/rpc/location/model/TableEntry.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
2424
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
2525

26-
import java.util.HashMap;
2726
import java.util.LinkedHashMap;
2827
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.locks.Lock;
2930

3031
import static com.google.common.base.Preconditions.checkArgument;
3132

@@ -46,14 +47,17 @@ public class TableEntry {
4647
private ObPartitionInfo partitionInfo = null;
4748
private volatile long refreshTimeMills;
4849
private volatile long refreshAllTimeMills;
50+
private volatile long odpRefreshTimeMills;
4951
private Map<String, Integer> rowKeyElement = null;
5052

5153
// table location
5254
private TableLocation tableLocation = null;
5355
// partition location
5456
private TableEntryKey tableEntryKey = null;
5557
private volatile ObPartitionEntry partitionEntry = null;
56-
58+
59+
public ConcurrentHashMap<Long, Lock> refreshLockMap = new ConcurrentHashMap<>();
60+
5761
/*
5862
* Is valid.
5963
*/
@@ -156,6 +160,10 @@ public long getRefreshAllTimeMills() {
156160
return refreshAllTimeMills;
157161
}
158162

163+
public long getOdpRefreshTimeMills() {
164+
return odpRefreshTimeMills;
165+
}
166+
159167
/*
160168
* Set refresh time mills.
161169
*/
@@ -170,6 +178,10 @@ public void setRefreshAllTimeMills(long refreshAllTimeMills) {
170178
this.refreshAllTimeMills = refreshAllTimeMills;
171179
}
172180

181+
public void setOdpRefreshTimeMills(long odpRefreshTimeMills) {
182+
this.odpRefreshTimeMills = odpRefreshTimeMills;
183+
}
184+
173185
public Map<String, Integer> getRowKeyElement() {
174186
return rowKeyElement;
175187
}
@@ -220,8 +232,6 @@ public void prepare() throws IllegalArgumentException {
220232
checkArgument(partitionInfo != null, "partition table partition info is not ready. key"
221233
+ tableEntryKey);
222234
partitionInfo.prepare();
223-
checkArgument(partitionEntry != null,
224-
"partition table partition entry is not ready. key" + tableEntryKey);
225235
}
226236
}
227237

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObHashPartDesc.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2525
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType;
2626
import com.alipay.oceanbase.rpc.util.RandomUtil;
27+
import com.alipay.oceanbase.rpc.util.Serialization;
2728
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
2829
import com.alipay.oceanbase.rpc.mutation.Row;
2930
import org.apache.commons.lang.builder.ToStringBuilder;

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObKeyPartDesc.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObColumn;
2525
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2626
import com.alipay.oceanbase.rpc.util.ObHashUtils;
27+
import com.alipay.oceanbase.rpc.util.Serialization;
2728
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
2829
import org.apache.commons.lang.builder.ToStringBuilder;
2930
import org.slf4j.Logger;

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionEntry.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,23 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
2426

2527
public class ObPartitionEntry {
2628
private Map<Long, ObPartitionLocation> partitionLocation = new HashMap<Long, ObPartitionLocation>();
2729

2830
// mapping from tablet id to ls id, and the part id to tablet id mapping is in ObPartitionInfo
2931
private Map<Long, Long> tabletLsIdMap = new HashMap<>();
32+
33+
// tabelt id -> (PartitionLocation, LsId)
34+
private ConcurrentHashMap<Long, ObPartitionLocationInfo> partitionInfos = new ConcurrentHashMap<>();
35+
3036

37+
public ObPartitionLocationInfo getPartitionInfo(long tabletId) {
38+
return partitionInfos.computeIfAbsent(tabletId, id -> new ObPartitionLocationInfo());
39+
}
40+
3141
public Map<Long, ObPartitionLocation> getPartitionLocation() {
3242
return partitionLocation;
3343
}
@@ -39,6 +49,16 @@ public void setPartitionLocation(Map<Long, ObPartitionLocation> partitionLocatio
3949
this.partitionLocation = partitionLocation;
4050
}
4151

52+
public Map<Long, Long> getTabletLsIdMap() {
53+
return tabletLsIdMap;
54+
}
55+
56+
public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
57+
this.tabletLsIdMap = tabletLsIdMap;
58+
}
59+
60+
public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
61+
4262
/*
4363
* Get partition location with part id.
4464
*/
@@ -86,14 +106,4 @@ public void prepareForWeakRead(ObServerLdcLocation ldcLocation) {
86106
public String toString() {
87107
return "ObPartitionEntry{" + "partitionLocation=" + partitionLocation + '}';
88108
}
89-
90-
public Map<Long, Long> getTabletLsIdMap() {
91-
return tabletLsIdMap;
92-
}
93-
94-
public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
95-
this.tabletLsIdMap = tabletLsIdMap;
96-
}
97-
98-
public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
99109
}

0 commit comments

Comments
 (0)