Skip to content

Commit c5f4270

Browse files
authored
Merge pull request #201 from stuBirdFly/master
merge branch obkv_params2 into master
2 parents 93181fb + 559fbe9 commit c5f4270

31 files changed

+635
-298
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,12 @@ void preCheck() {
193193
ObTableOperationType lastType = operations.get(0).getOperationType();
194194
if (returnOneResult && !ObGlobal.isReturnOneResultSupport()) {
195195
throw new FeatureNotSupportedException(
196-
"returnOneResult is not supported in this Observer version [" + ObGlobal.obVsnString() +"]");
196+
"returnOneResult is not supported in this Observer version ["
197+
+ ObGlobal.obVsnString() + "]");
197198
} else if (returnOneResult
198-
&& !(this.tableBatchOps.getObTableBatchOperation().isSameType() && (lastType == ObTableOperationType.INSERT
199-
|| lastType == ObTableOperationType.PUT
200-
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
199+
&& !(this.tableBatchOps.getObTableBatchOperation().isSameType() && (lastType == ObTableOperationType.INSERT
200+
|| lastType == ObTableOperationType.PUT
201+
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
201202
throw new IllegalArgumentException(
202203
"returnOneResult only support multi-insert/put/replace/del");
203204
}

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ public static boolean isReturnOneResultSupport() {
9191
|| OB_VERSION >= OB_VERSION_4_3_4_0;
9292
}
9393

94-
public static final long OB_VERSION_4_2_1_0 = calcVersion(4, (short) 2, (byte) 1, (byte) 0);
9594

9695
public static final long OB_VERSION_4_2_3_0 = calcVersion(4, (short) 2, (byte) 3, (byte) 0);
9796

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,6 +1063,17 @@ public String getIndexTableName(final String dataTableName, final String indexNa
10631063
return indexTableName;
10641064
}
10651065

1066+
@Override
1067+
public void setRpcExecuteTimeout(int rpcExecuteTimeout) {
1068+
this.properties.put(RPC_EXECUTE_TIMEOUT.getKey(), String.valueOf(rpcExecuteTimeout));
1069+
this.rpcExecuteTimeout = rpcExecuteTimeout;
1070+
for (ObTable obTable : tableRoster.values()) {
1071+
if (obTable != null) {
1072+
obTable.setObTableExecuteTimeout(rpcExecuteTimeout);
1073+
}
1074+
}
1075+
}
1076+
10661077
public String constructIndexTableName(final String dataTableName, final String indexName)
10671078
throws Exception {
10681079
// construct index table name
@@ -3012,7 +3023,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
30123023
request.getTableName(),
30133024
((ObTableBatchOperationRequest) request).getBatchOperation(), this);
30143025
batchOps.setEntityType(request.getEntityType());
3015-
return new ObClusterTableBatchOps(batchOps).executeInternal();
3026+
return new ObClusterTableBatchOps(runtimeBatchExecutor, batchOps).executeInternal();
30163027
} else if (request instanceof ObTableQueryAndMutateRequest) {
30173028
ObTableQueryAndMutate tableQueryAndMutate = ((ObTableQueryAndMutateRequest) request)
30183029
.getTableQueryAndMutate();

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,19 @@
3838

3939
public class ObTableConnection {
4040

41-
private static final Logger LOGGER = TableClientLoggerFactory
42-
.getLogger(ObTableConnection.class);
41+
private static final Logger LOGGER = TableClientLoggerFactory
42+
.getLogger(ObTableConnection.class);
4343
private ObBytesString credential;
44-
private long tenantId = 1; //默认值切勿不要随意改动
44+
private long tenantId = 1; //默认值切勿不要随意改动
4545
private Connection connection;
4646
private final ObTable obTable;
47-
private long uniqueId; // as trace0 in rpc header
48-
private AtomicLong sequence; // as trace1 in rpc header
49-
private AtomicBoolean isReConnecting = new AtomicBoolean(false); // indicate is re-connecting or not
50-
private AtomicBoolean isExpired = new AtomicBoolean(false);
47+
private long uniqueId; // as trace0 in rpc header
48+
private AtomicLong sequence; // as trace1 in rpc header
49+
private AtomicBoolean isReConnecting = new AtomicBoolean(false); // indicate is re-connecting or not
50+
private AtomicBoolean isExpired = new AtomicBoolean(false);
5151
private LocalDateTime lastConnectionTime;
5252
private boolean loginWithConfigs = false;
53+
5354
public static long ipToLong(String strIp) {
5455
String[] ip = strIp.split("\\.");
5556
return (Long.parseLong(ip[0]) << 24) + (Long.parseLong(ip[1]) << 16)
@@ -69,10 +70,10 @@ public void setExpired(boolean expired) {
6970
isExpired.set(expired);
7071
}
7172

72-
7373
public void enableLoginWithConfigs() {
7474
loginWithConfigs = true;
7575
}
76+
7677
/*
7778
* Ob table connection.
7879
*/

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
124124
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
125125
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
126126
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127-
"routed to the wrong server: " + response.getMessage());
127+
"routed to the wrong server: " + response.getMessage());
128128
logger.warn(errMessage);
129129
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
130130
throw new ObTableNeedFetchAllException(errMessage);
@@ -141,16 +141,16 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
141141
}
142142
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
143143
String errMessage = TraceUtil.formatTraceMessage(conn, request,
144-
"routed to the wrong server: " + response.getMessage());
144+
"routed to the wrong server: " + response.getMessage());
145145
logger.warn(errMessage);
146146
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
147147
throw new ObTableNeedFetchAllException(errMessage);
148148
} else if (needFetchPartial(resultCode.getRcode())) {
149149
throw new ObTableRoutingWrongException(errMessage);
150150
} else {
151-
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn.getObTable()
152-
.getPort(), response.getHeader().getTraceId1(), response.getHeader()
153-
.getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg());
151+
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
152+
.getObTable().getPort(), response.getHeader().getTraceId1(), response
153+
.getHeader().getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg());
154154
}
155155
}
156156

@@ -190,25 +190,26 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
190190
// schema changed
191191
private boolean needFetchAll(int errorCode, int pcode) {
192192
return errorCode == ResultCodes.OB_SCHEMA_ERROR.errorCode
193-
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
194-
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
195-
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
196-
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
193+
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
194+
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
195+
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
196+
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
197197
}
198+
198199
private boolean needFetchPartial(int errorCode) {
199200
return errorCode == ResultCodes.OB_LOCATION_LEADER_NOT_EXIST.errorCode
200-
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
201-
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
202-
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
203-
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
204-
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
205-
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
206-
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
207-
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
208-
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
209-
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
210-
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
211-
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
212-
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
201+
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
202+
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
203+
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
204+
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
205+
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
206+
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
207+
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
208+
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
209+
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
210+
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
211+
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
212+
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
213+
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
213214
}
214215
}

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,12 @@ public BatchOperation setReturnOneResult(boolean returnOneResult) {
160160
public BatchOperationResult execute() throws Exception {
161161
if (returnOneResult && !ObGlobal.isReturnOneResultSupport()) {
162162
throw new FeatureNotSupportedException(
163-
"returnOneResult is not supported in this Observer version [" + ObGlobal.obVsnString() +"]");
163+
"returnOneResult is not supported in this Observer version ["
164+
+ ObGlobal.obVsnString() + "]");
164165
} else if (returnOneResult
165-
&& !(isSameType && (lastType == ObTableOperationType.INSERT
166-
|| lastType == ObTableOperationType.PUT
167-
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
166+
&& !(isSameType && (lastType == ObTableOperationType.INSERT
167+
|| lastType == ObTableOperationType.PUT
168+
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
168169
throw new IllegalArgumentException(
169170
"returnOneResult only support multi-insert/put/replace/del");
170171
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,7 @@ public enum ResultCodes {
358358
OB_CLUSTER_NO_MATCH(-4666), //
359359
OB_CHECK_ZONE_MERGE_ORDER(-4667), //
360360
OB_ERR_ZONE_NOT_EMPTY(-4668), //
361-
OB_USE_DUP_FOLLOW_AFTER_DML(-4686),
362-
OB_LS_NOT_EXIST(-4719), //
361+
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), OB_LS_NOT_EXIST(-4719), //
363362
OB_TABLET_NOT_EXIST(-4725), //
364363
OB_ERR_PARSER_INIT(-5000), //
365364
OB_ERR_PARSE_SQL(-5001), //

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
139139
}
140140
subObTable = client
141141
.getTableWithPartId(indexTableName, partIdWithIndex.getLeft(),
142-
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(), false,
143-
route).getRight().getObTable();
142+
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(),
143+
false, route).getRight().getObTable();
144144
}
145145
}
146146
if (client.isOdpMode()) {

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;
1919

20+
import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException;
21+
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
22+
import com.alipay.oceanbase.rpc.table.ObKVParams;
2023
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
2124
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationSingle;
2225
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
@@ -58,12 +61,14 @@ public class ObTableQuery extends AbstractPayload {
5861
private long maxResultSize = -1;
5962
private ObHTableFilter hTableFilter;
6063

61-
private static final byte[] HTABLE_FILTER_DUMMY_BYTES = new byte[] { 0x01, 0x00 };
64+
private static final byte[] HTABLE_DUMMY_BYTES = new byte[] { 0x01, 0x00 };
6265
private boolean isHbaseQuery = false;
6366
private List<String> scanRangeColumns = new LinkedList<String>();
6467

6568
private List<ObTableAggregationSingle> aggregations = new LinkedList<>();
6669

70+
private ObKVParams obKVParams;
71+
6772
/*
6873
* Check filter.
6974
*/
@@ -149,8 +154,8 @@ public byte[] encode() {
149154
len = (int) hTableFilter.getPayloadSize();
150155
System.arraycopy(hTableFilter.encode(), 0, bytes, idx, len);
151156
} else {
152-
len = HTABLE_FILTER_DUMMY_BYTES.length;
153-
System.arraycopy(HTABLE_FILTER_DUMMY_BYTES, 0, bytes, idx, len);
157+
len = HTABLE_DUMMY_BYTES.length;
158+
System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, len);
154159
}
155160
idx += len;
156161

@@ -173,6 +178,16 @@ public byte[] encode() {
173178
idx += len;
174179
}
175180

181+
if (isHbaseQuery && obKVParams != null) {
182+
len = (int) obKVParams.getPayloadSize();
183+
System.arraycopy(obKVParams.encode(), 0, bytes, idx, len);
184+
idx += len;
185+
} else {
186+
len = HTABLE_DUMMY_BYTES.length;
187+
System.arraycopy(HTABLE_DUMMY_BYTES, 0, bytes, idx, len);
188+
idx += len;
189+
}
190+
176191
return bytes;
177192
}
178193

@@ -230,6 +245,10 @@ public Object decode(ByteBuf buf) {
230245
String agg_column = Serialization.decodeVString(buf);
231246
this.aggregations.add(new ObTableAggregationSingle(ObTableAggregationType.fromByte(agg_type), agg_column));
232247
}
248+
if (isHbaseQuery) {
249+
obKVParams = new ObKVParams();
250+
this.obKVParams.decode(buf);
251+
}
233252
return this;
234253
}
235254

@@ -259,7 +278,12 @@ public long getPayloadContentSize() {
259278
if (isHbaseQuery) {
260279
contentSize += hTableFilter.getPayloadSize();
261280
} else {
262-
contentSize += HTABLE_FILTER_DUMMY_BYTES.length;
281+
contentSize += HTABLE_DUMMY_BYTES.length;
282+
}
283+
if (isHbaseQuery && obKVParams != null) {
284+
contentSize += obKVParams.getPayloadSize();
285+
} else {
286+
contentSize += HTABLE_DUMMY_BYTES.length;
263287
}
264288
contentSize += Serialization.getNeedBytes(scanRangeColumns.size());
265289
for (String scanRangeColumn : scanRangeColumns) {
@@ -466,4 +490,17 @@ public void setScanRangeColumns(String... scanRangeColumns) {
466490
public void setScanRangeColumns(List<String> scanRangeColumns) {
467491
this.scanRangeColumns = scanRangeColumns;
468492
}
493+
494+
// This interface is just for OBKV-Hbase
495+
public void setObKVParams(ObKVParams obKVParams) {
496+
if (!(obKVParams.getObParamsBase() instanceof ObHBaseParams)) {
497+
throw new FeatureNotSupportedException("only ObHBaseParams support currently");
498+
}
499+
this.isHbaseQuery = true;
500+
this.obKVParams = obKVParams;
501+
}
502+
503+
public ObKVParams getObKVParams() {
504+
return obKVParams;
505+
}
469506
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/login/ObTableLoginRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ public long getTtlUs() {
373373
public void setTtlUs(long ttlUs) {
374374
this.ttlUs = ttlUs;
375375
}
376-
376+
377377
public String getConfigsStr() {
378378
return configsStr;
379379
}

0 commit comments

Comments
 (0)