Skip to content

Commit c692b0b

Browse files
committed
git reset to no test util commit
1 parent 94aeb82 commit c692b0b

File tree

57 files changed

+4583
-1282
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+4583
-1282
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package com.alipay.oceanbase.rpc;
1919

20+
import com.alipay.oceanbase.rpc.location.model.partition.Partition;
2021
import com.alipay.oceanbase.rpc.mutation.Row;
22+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2123
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
2224
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
2325
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObHTableFilter;
@@ -152,6 +154,28 @@ public TableQuery addScanRange(Object[] start, boolean startEquals, Object[] end
152154
return this;
153155
}
154156

157+
@Override
158+
public TableQuery addScanRange(Object start, Object end) {
159+
if (start instanceof Partition) {
160+
Long startPartitionId = ((Partition) start).getPartitionId();
161+
Long endPartitionId = ((Partition) end).getPartitionId();
162+
if (!startPartitionId.equals(endPartitionId)) {
163+
throw new IllegalArgumentException(
164+
"The partition id must be the same for start and end partition in scan range");
165+
}
166+
Long startPartId = ((Partition) start).getPartId();
167+
Long endPartId = ((Partition) end).getPartId();
168+
if (!startPartId.equals(endPartId)) {
169+
throw new IllegalArgumentException(
170+
"The logic part id must be the same for start and end partition in scan range");
171+
}
172+
tableClientQuery.setPartId(startPartId);
173+
start = ObObj.getMin();
174+
end = ObObj.getMax();
175+
}
176+
return addScanRange(new Object[] { start }, true, new Object[] { end }, true);
177+
}
178+
155179
/**
156180
* Add scan range starts with.
157181
*/

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 584 additions & 103 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
2323
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
2424
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableDirectLoadResult;
25+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObFetchPartitionMetaResult;
2526
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
2627
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableLSOpResult;
2728
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult;
@@ -105,6 +106,15 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
105106
return new ObTableLSOpResult();
106107
}
107108
}, //
109+
OB_TABLE_API_PART_META_QUERY(Pcodes.OB_TABLE_API_PART_META_QUERY) {
110+
/**
111+
* New payload.
112+
*/
113+
@Override
114+
public ObPayload newPayload(ObRpcPacketHeader header) {
115+
return new ObFetchPartitionMetaResult();
116+
}
117+
}, //
108118
OB_TABLE_API_MOVE(Pcodes.OB_TABLE_API_MOVE) {
109119
/**
110120
* New payload.
@@ -160,6 +170,8 @@ public static ObTablePacketCode valueOf(short value) {
160170
return OB_TABLE_API_DIRECT_LOAD;
161171
case Pcodes.OB_TABLE_API_LS_EXECUTE:
162172
return OB_TABLE_API_LS_EXECUTE;
173+
case Pcodes.OB_TABLE_API_PART_META_QUERY:
174+
return OB_TABLE_API_PART_META_QUERY;
163175
case Pcodes.OB_TABLE_API_MOVE:
164176
throw new ObTableRoutingWrongException(
165177
"Receive rerouting response packet. "

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,8 @@
2929
import com.alipay.remoting.*;
3030
import com.alipay.remoting.exception.RemotingException;
3131
import io.netty.buffer.ByteBuf;
32-
import io.netty.util.internal.ResourcesUtil;
3332
import org.slf4j.Logger;
3433

35-
import javax.xml.transform.Result;
36-
3734
import static com.alipay.oceanbase.rpc.protocol.packet.ObCompressType.INVALID_COMPRESSOR;
3835
import static com.alipay.oceanbase.rpc.protocol.packet.ObCompressType.NONE_COMPRESSOR;
3936

@@ -127,18 +124,18 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
127124
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
128125
if (response.getHeader().isRoutingWrong()) {
129126
String errMessage = TraceUtil.formatTraceMessage(conn, request,
130-
"routed to the wrong server: " + response.getMessage());
127+
"routed to the wrong server: " + response.getMessage());
131128
logger.warn(errMessage);
132129
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
133130
throw new ObTableNeedFetchAllException(errMessage);
134-
} else if (needFetchPartical(resultCode.getRcode())) {
131+
} else if (needFetchPartial(resultCode.getRcode())) {
135132
throw new ObTableRoutingWrongException(errMessage);
136133
} else {
137134
// Encountered an unexpected RoutingWrong error code,
138135
// possibly due to the client error code version being behind the observer's version.
139136
// Attempting a full refresh here
140137
// and delegating to the upper-level call to determine whether to throw the exception to the user based on the retry result.
141-
logger.warn("get unexpected error code: " + response.getMessage());
138+
logger.warn("get unexpected error code: {}", response.getMessage());
142139
throw new ObTableNeedFetchAllException(errMessage);
143140
}
144141
}
@@ -185,31 +182,27 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
185182
// schema changed
186183
private boolean needFetchAll(int errorCode, int pcode) {
187184
return errorCode == ResultCodes.OB_SCHEMA_ERROR.errorCode
188-
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
189-
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
190-
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
191-
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode)
192-
// Theoretically, this code should not be executed since there is no corresponding handling within the ODP.
193-
// However, the observer has flagged the following three error codes.
194-
// Adding this handling as a precautionary measure.
195-
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode
196-
|| errorCode == ResultCodes.OB_USE_DUP_FOLLOW_AFTER_DML.errorCode
197-
|| errorCode == ResultCodes.OB_TRANS_STMT_NEED_RETRY.errorCode;
185+
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
186+
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
187+
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
188+
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
198189
}
199-
private boolean needFetchPartical(int errorCode) {
190+
191+
private boolean needFetchPartial(int errorCode) {
200192
return errorCode == ResultCodes.OB_LOCATION_LEADER_NOT_EXIST.errorCode
201-
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
202-
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
203-
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
204-
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
205-
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
206-
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
207-
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
208-
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
209-
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
210-
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
211-
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
212-
|| errorCode == ResultCodes.OB_TENANT_NOT_IN_SERVER.errorCode
213-
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode;
193+
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
194+
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
195+
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
196+
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
197+
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
198+
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
199+
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
200+
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
201+
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
202+
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
203+
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
204+
|| errorCode == ResultCodes.OB_TENANT_NOT_IN_SERVER.errorCode
205+
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
206+
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
214207
}
215208
}

src/main/java/com/alipay/oceanbase/rpc/checkandmutate/CheckAndInsUp.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alipay.oceanbase.rpc.exception.ObTableException;
2222
import com.alipay.oceanbase.rpc.filter.ObTableFilter;
2323
import com.alipay.oceanbase.rpc.mutation.InsertOrUpdate;
24+
import com.alipay.oceanbase.rpc.mutation.Row;
2425
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
2526
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
2627
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation;
@@ -58,7 +59,7 @@ public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,
5859
this.checkExists = check_exists;
5960
}
6061

61-
public Object[] getRowKey() {
62+
public Row getRowKey() {
6263
return insUp.getRowKey();
6364
}
6465

@@ -85,15 +86,15 @@ public MutationResult execute() throws Exception {
8586

8687
TableQuery query = client.query(tableName);
8788
query.setFilter(filter);
88-
Object[] rowKey = getRowKey();
89+
Row rowKey = getRowKey();
8990
List<ObNewRange> ranges = new ArrayList<>();
9091
ObNewRange range = new ObNewRange();
91-
range.setStartKey(ObRowKey.getInstance(insUp.getRowKey()));
92-
range.setEndKey(ObRowKey.getInstance(insUp.getRowKey()));
92+
range.setStartKey(ObRowKey.getInstance(insUp.getRowKey().getValues()));
93+
range.setEndKey(ObRowKey.getInstance(insUp.getRowKey().getValues()));
9394
ranges.add(range);
9495
query.getObTableQuery().setKeyRanges(ranges);
9596
ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.INSERT_OR_UPDATE,
96-
insUp.getRowKey(), insUp.getColumns(), insUp.getValues());
97+
insUp.getRowKey().getValues(), insUp.getColumns(), insUp.getValues());
9798

9899
return new MutationResult(((ObTableClient)client).mutationWithFilter(query, rowKey, ranges, operation, false, true, checkExists));
99100
}

src/main/java/com/alipay/oceanbase/rpc/exception/ExceptionUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ public static ObTableException convertToObTableException(String host, int port,
8181
+ "]" + "[" + resultCodes.name() + "]"
8282
+ "[" + errMsg + "]" + "[" + server + "]"
8383
+ "[" + trace + "]", resultCodes.errorCode);
84+
} else if (resultCodes.errorCode == OB_ERR_KV_ROUTE_ENTRY_EXPIRE.errorCode) {
85+
return new ObTablePartitionChangeException("[" + String.valueOf(resultCodes.errorCode)
86+
+ "]" + "[" + resultCodes.name() + "]" + "["
87+
+ errMsg + "]" + "[" + server + "]" + "["
88+
+ trace + "]", resultCodes.errorCode);
8489
} else {
8590
// [errCode][errCodeName][errMsg][server][trace]
8691
return new ObTableException("[" + String.valueOf(resultCodes.errorCode) + "]" + "["

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alibaba.fastjson.JSONException;
2222
import com.alibaba.fastjson.JSONObject;
2323
import com.alibaba.fastjson.parser.ParserConfig;
24+
import com.alipay.oceanbase.rpc.ObClusterTableBatchOps;
2425
import com.alipay.oceanbase.rpc.ObGlobal;
2526
import com.alipay.oceanbase.rpc.constant.Constants;
2627
import com.alipay.oceanbase.rpc.exception.*;
@@ -1293,27 +1294,48 @@ private static ObPartitionInfo parsePartitionInfo(ResultSet rs)
12931294
}
12941295

12951296
// get list partition column types here
1296-
List<ObColumn> orderedPartedColumns1 = null;
1297+
// List<ObColumn> orderedPartedColumns1 = null;
1298+
// if (null != info.getFirstPartDesc()) {
1299+
// if (info.getFirstPartDesc().getPartFuncType().isListPart()
1300+
// || info.getFirstPartDesc().getPartFuncType().isRangePart()) {
1301+
// orderedPartedColumns1 = getOrderedPartColumns(info.getPartColumns(),
1302+
// info.getFirstPartDesc());
1303+
// }
1304+
// }
1305+
//
1306+
// List<ObColumn> orderedPartedColumns2 = null;
1307+
// if (null != info.getSubPartDesc()) {
1308+
// if (info.getSubPartDesc().getPartFuncType().isListPart()
1309+
// || info.getSubPartDesc().getPartFuncType().isRangePart()) {
1310+
// orderedPartedColumns2 = getOrderedPartColumns(info.getPartColumns(),
1311+
// info.getSubPartDesc());
1312+
// }
1313+
// }
1314+
1315+
// set the property of first part and sub part
1316+
List<ObColumn> firstPartColumns = new ArrayList<ObColumn>(), subPartColumns = new ArrayList<ObColumn>();
12971317
if (null != info.getFirstPartDesc()) {
1298-
if (info.getFirstPartDesc().getPartFuncType().isListPart()
1299-
|| info.getFirstPartDesc().getPartFuncType().isRangePart()) {
1300-
orderedPartedColumns1 = getOrderedPartColumns(info.getPartColumns(),
1301-
info.getFirstPartDesc());
1318+
for (String partColumnNames : info.getFirstPartDesc().getOrderedPartColumnNames()) {
1319+
for (ObColumn curColumn : info.getPartColumns()) {
1320+
if (curColumn.getColumnName().equalsIgnoreCase(partColumnNames)) {
1321+
firstPartColumns.add(curColumn);
1322+
break;
1323+
}
1324+
}
13021325
}
13031326
}
1304-
1305-
List<ObColumn> orderedPartedColumns2 = null;
13061327
if (null != info.getSubPartDesc()) {
1307-
if (info.getSubPartDesc().getPartFuncType().isListPart()
1308-
|| info.getSubPartDesc().getPartFuncType().isRangePart()) {
1309-
orderedPartedColumns2 = getOrderedPartColumns(info.getPartColumns(),
1310-
info.getSubPartDesc());
1328+
for (String partColumnNames : info.getSubPartDesc().getOrderedPartColumnNames()) {
1329+
for (ObColumn curColumn : info.getPartColumns()) {
1330+
if (curColumn.getColumnName().equalsIgnoreCase(partColumnNames)) {
1331+
subPartColumns.add(curColumn);
1332+
break;
1333+
}
1334+
}
13111335
}
13121336
}
1313-
1314-
// set the property of first part and sub part
1315-
setPartDescProperty(info.getFirstPartDesc(), info.getPartColumns(), orderedPartedColumns1);
1316-
setPartDescProperty(info.getSubPartDesc(), info.getPartColumns(), orderedPartedColumns2);
1337+
setPartDescProperty(info.getFirstPartDesc(), firstPartColumns, firstPartColumns);
1338+
setPartDescProperty(info.getSubPartDesc(), subPartColumns, subPartColumns);
13171339

13181340
return info;
13191341
}

src/main/java/com/alipay/oceanbase/rpc/location/model/TableEntry.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class TableEntry {
4646
private ObPartitionInfo partitionInfo = null;
4747
private volatile long refreshTimeMills;
4848
private volatile long refreshAllTimeMills;
49+
private volatile long odpRefreshTimeMills;
4950
private Map<String, Integer> rowKeyElement = null;
5051

5152
// table location
@@ -154,6 +155,8 @@ public long getRefreshAllTimeMills() {
154155
return refreshAllTimeMills;
155156
}
156157

158+
public long getOdpRefreshTimeMills() { return odpRefreshTimeMills; }
159+
157160
/*
158161
* Set refresh time mills.
159162
*/
@@ -168,6 +171,10 @@ public void setRefreshAllTimeMills(long refreshAllTimeMills) {
168171
this.refreshAllTimeMills = refreshAllTimeMills;
169172
}
170173

174+
public void setOdpRefreshTimeMills(long odpRefreshTimeMills) {
175+
this.odpRefreshTimeMills = odpRefreshTimeMills;
176+
}
177+
171178
public Map<String, Integer> getRowKeyElement() {
172179
return rowKeyElement;
173180
}

0 commit comments

Comments
 (0)