Skip to content

Commit a4f407c

Browse files
authored
set and use runtimeBatchExecutor in ObTableClient rebase to obkv_params2 (#190)
* add pool parameter to set in batch operation * ObTableClient getPool when execute Hbase batch request * rebase from obkv_params2 * remove useless dependency
1 parent 2d40237 commit a4f407c

24 files changed

+283
-293
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableBatchOps.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,12 @@ void preCheck() {
193193
ObTableOperationType lastType = operations.get(0).getOperationType();
194194
if (returnOneResult && !ObGlobal.isReturnOneResultSupport()) {
195195
throw new FeatureNotSupportedException(
196-
"returnOneResult is not supported in this Observer version [" + ObGlobal.obVsnString() +"]");
196+
"returnOneResult is not supported in this Observer version ["
197+
+ ObGlobal.obVsnString() + "]");
197198
} else if (returnOneResult
198-
&& !(this.tableBatchOps.getObTableBatchOperation().isSameType() && (lastType == ObTableOperationType.INSERT
199-
|| lastType == ObTableOperationType.PUT
200-
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
199+
&& !(this.tableBatchOps.getObTableBatchOperation().isSameType() && (lastType == ObTableOperationType.INSERT
200+
|| lastType == ObTableOperationType.PUT
201+
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
201202
throw new IllegalArgumentException(
202203
"returnOneResult only support multi-insert/put/replace/del");
203204
}

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ public static boolean isLsOpSupport() {
8888

8989
// todo: use OB_VERSION_4_3_4_0 after observer upgrade version
9090
public static boolean isReturnOneResultSupport() {
91-
return OB_VERSION >= OB_VERSION_4_2_3_0 && OB_VERSION < OB_VERSION_4_3_0_0 || OB_VERSION >= OB_VERSION_4_3_3_0;
91+
return OB_VERSION >= OB_VERSION_4_2_3_0 && OB_VERSION < OB_VERSION_4_3_0_0
92+
|| OB_VERSION >= OB_VERSION_4_3_4_0;
9293
}
9394

9495
public static final long OB_VERSION_4_2_1_0 = calcVersion(4, (short) 2, (byte) 1, (byte) 0);

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2995,7 +2995,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
29952995
request.getTableName(),
29962996
((ObTableBatchOperationRequest) request).getBatchOperation(), this);
29972997
batchOps.setEntityType(request.getEntityType());
2998-
return new ObClusterTableBatchOps(batchOps).executeInternal();
2998+
return new ObClusterTableBatchOps(runtimeBatchExecutor, batchOps).executeInternal();
29992999
} else if (request instanceof ObTableQueryAndMutateRequest) {
30003000
ObTableQueryAndMutate tableQueryAndMutate = ((ObTableQueryAndMutateRequest) request)
30013001
.getTableQueryAndMutate();

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,19 @@
3838

3939
public class ObTableConnection {
4040

41-
private static final Logger LOGGER = TableClientLoggerFactory
42-
.getLogger(ObTableConnection.class);
41+
private static final Logger LOGGER = TableClientLoggerFactory
42+
.getLogger(ObTableConnection.class);
4343
private ObBytesString credential;
44-
private long tenantId = 1; //默认值切勿不要随意改动
44+
private long tenantId = 1; //默认值切勿不要随意改动
4545
private Connection connection;
4646
private final ObTable obTable;
47-
private long uniqueId; // as trace0 in rpc header
48-
private AtomicLong sequence; // as trace1 in rpc header
49-
private AtomicBoolean isReConnecting = new AtomicBoolean(false); // indicate is re-connecting or not
50-
private AtomicBoolean isExpired = new AtomicBoolean(false);
47+
private long uniqueId; // as trace0 in rpc header
48+
private AtomicLong sequence; // as trace1 in rpc header
49+
private AtomicBoolean isReConnecting = new AtomicBoolean(false); // indicate is re-connecting or not
50+
private AtomicBoolean isExpired = new AtomicBoolean(false);
5151
private LocalDateTime lastConnectionTime;
5252
private boolean loginWithConfigs = false;
53+
5354
public static long ipToLong(String strIp) {
5455
String[] ip = strIp.split("\\.");
5556
return (Long.parseLong(ip[0]) << 24) + (Long.parseLong(ip[1]) << 16)
@@ -69,10 +70,10 @@ public void setExpired(boolean expired) {
6970
isExpired.set(expired);
7071
}
7172

72-
7373
public void enableLoginWithConfigs() {
7474
loginWithConfigs = true;
7575
}
76+
7677
/*
7778
* Ob table connection.
7879
*/

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
124124
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
125125
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
126126
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127-
"routed to the wrong server: " + response.getMessage());
127+
"routed to the wrong server: " + response.getMessage());
128128
logger.warn(errMessage);
129129
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
130130
throw new ObTableNeedFetchAllException(errMessage);
@@ -141,16 +141,16 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
141141
}
142142
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
143143
String errMessage = TraceUtil.formatTraceMessage(conn, request,
144-
"routed to the wrong server: " + response.getMessage());
144+
"routed to the wrong server: " + response.getMessage());
145145
logger.warn(errMessage);
146146
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
147147
throw new ObTableNeedFetchAllException(errMessage);
148148
} else if (needFetchPartial(resultCode.getRcode())) {
149149
throw new ObTableRoutingWrongException(errMessage);
150150
} else {
151-
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn.getObTable()
152-
.getPort(), response.getHeader().getTraceId1(), response.getHeader()
153-
.getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg());
151+
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
152+
.getObTable().getPort(), response.getHeader().getTraceId1(), response
153+
.getHeader().getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg());
154154
}
155155
}
156156

@@ -190,25 +190,26 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
190190
// schema changed
191191
private boolean needFetchAll(int errorCode, int pcode) {
192192
return errorCode == ResultCodes.OB_SCHEMA_ERROR.errorCode
193-
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
194-
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
195-
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
196-
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
193+
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
194+
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
195+
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
196+
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
197197
}
198+
198199
private boolean needFetchPartial(int errorCode) {
199200
return errorCode == ResultCodes.OB_LOCATION_LEADER_NOT_EXIST.errorCode
200-
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
201-
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
202-
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
203-
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
204-
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
205-
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
206-
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
207-
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
208-
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
209-
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
210-
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
211-
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
212-
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
201+
|| errorCode == ResultCodes.OB_NOT_MASTER.errorCode
202+
|| errorCode == ResultCodes.OB_RS_NOT_MASTER.errorCode
203+
|| errorCode == ResultCodes.OB_RS_SHUTDOWN.errorCode
204+
|| errorCode == ResultCodes.OB_RPC_SEND_ERROR.errorCode
205+
|| errorCode == ResultCodes.OB_RPC_POST_ERROR.errorCode
206+
|| errorCode == ResultCodes.OB_PARTITION_NOT_EXIST.errorCode
207+
|| errorCode == ResultCodes.OB_LOCATION_NOT_EXIST.errorCode
208+
|| errorCode == ResultCodes.OB_PARTITION_IS_STOPPED.errorCode
209+
|| errorCode == ResultCodes.OB_PARTITION_IS_BLOCKED.errorCode
210+
|| errorCode == ResultCodes.OB_SERVER_IS_INIT.errorCode
211+
|| errorCode == ResultCodes.OB_SERVER_IS_STOPPING.errorCode
212+
|| errorCode == ResultCodes.OB_TRANS_RPC_TIMEOUT.errorCode
213+
|| errorCode == ResultCodes.OB_NO_READABLE_REPLICA.errorCode;
213214
}
214215
}

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,12 @@ public BatchOperation setReturnOneResult(boolean returnOneResult) {
160160
public BatchOperationResult execute() throws Exception {
161161
if (returnOneResult && !ObGlobal.isReturnOneResultSupport()) {
162162
throw new FeatureNotSupportedException(
163-
"returnOneResult is not supported in this Observer version [" + ObGlobal.obVsnString() +"]");
163+
"returnOneResult is not supported in this Observer version ["
164+
+ ObGlobal.obVsnString() + "]");
164165
} else if (returnOneResult
165-
&& !(isSameType && (lastType == ObTableOperationType.INSERT
166-
|| lastType == ObTableOperationType.PUT
167-
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
166+
&& !(isSameType && (lastType == ObTableOperationType.INSERT
167+
|| lastType == ObTableOperationType.PUT
168+
|| lastType == ObTableOperationType.REPLACE || lastType == ObTableOperationType.DEL))) {
168169
throw new IllegalArgumentException(
169170
"returnOneResult only support multi-insert/put/replace/del");
170171
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/ResultCodes.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,7 @@ public enum ResultCodes {
358358
OB_CLUSTER_NO_MATCH(-4666), //
359359
OB_CHECK_ZONE_MERGE_ORDER(-4667), //
360360
OB_ERR_ZONE_NOT_EMPTY(-4668), //
361-
OB_USE_DUP_FOLLOW_AFTER_DML(-4686),
362-
OB_LS_NOT_EXIST(-4719), //
361+
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), OB_LS_NOT_EXIST(-4719), //
363362
OB_TABLET_NOT_EXIST(-4725), //
364363
OB_ERR_PARSER_INIT(-5000), //
365364
OB_ERR_PARSE_SQL(-5001), //

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
139139
}
140140
subObTable = client
141141
.getTableWithPartId(indexTableName, partIdWithIndex.getLeft(),
142-
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(), false,
143-
route).getRight().getObTable();
142+
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(),
143+
false, route).getRight().getObTable();
144144
}
145145
}
146146
if (client.isOdpMode()) {

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/login/ObTableLoginRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ public long getTtlUs() {
373373
public void setTtlUs(long ttlUs) {
374374
this.ttlUs = ttlUs;
375375
}
376-
376+
377377
public String getConfigsStr() {
378378
return configsStr;
379379
}

src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTable.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public abstract class AbstractObTable extends AbstractTable {
4141

4242
protected int nettyBlockingWaitInterval = NETTY_BLOCKING_WAIT_INTERVAL.getDefaultInt();
4343

44-
protected long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME.getDefaultLong();
44+
protected long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME.getDefaultLong();
4545

4646
/*
4747
* Get ob table connect try times.
@@ -165,5 +165,7 @@ public int getNettyBlockingWaitInterval() {
165165
/*
166166
* Get connection max expired time
167167
*/
168-
public long getConnMaxExpiredTime() { return maxConnExpiredTime; }
168+
public long getConnMaxExpiredTime() {
169+
return maxConnExpiredTime;
170+
}
169171
}

0 commit comments

Comments
 (0)