Skip to content

Commit 01537f2

Browse files
authored
Log performance optimization, scan timeout, and secondary routing (#182)
* 日志性能优化、scan超时时间以及二次路由 * throw error when reroute meet server switch leader not ready
1 parent cb7d138 commit 01537f2

File tree

21 files changed

+1323
-48
lines changed

21 files changed

+1323
-48
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,12 @@ public TableQuery setMaxResultSize(long maxResultSize) {
218218
return tableClientQuery.setMaxResultSize(maxResultSize);
219219
}
220220

221+
@Override
222+
public TableQuery setOperationTimeout(long operationTimeout) {
223+
tableClientQuery.setOperationTimeout(operationTimeout);
224+
return this;
225+
}
226+
221227
/**
222228
* Clear.
223229
*/

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 99 additions & 23 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
2323
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
2424
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableDirectLoadResult;
25+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove;
2526
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
2627
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableLSOpResult;
2728
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult;
@@ -111,9 +112,7 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
111112
*/
112113
@Override
113114
public ObPayload newPayload(ObRpcPacketHeader header) {
114-
throw new ObTableRoutingWrongException(
115-
"Receive rerouting response packet. "
116-
+ "Java client is not supported and need to Refresh table router entry");
115+
return new ObTableApiMove();
117116
}
118117
}, //
119118
OB_ERROR_PACKET(Pcodes.OB_ERROR_PACKET) {
@@ -161,9 +160,7 @@ public static ObTablePacketCode valueOf(short value) {
161160
case Pcodes.OB_TABLE_API_LS_EXECUTE:
162161
return OB_TABLE_API_LS_EXECUTE;
163162
case Pcodes.OB_TABLE_API_MOVE:
164-
throw new ObTableRoutingWrongException(
165-
"Receive rerouting response packet. "
166-
+ "Java client is not supported and need to Refresh table router entry");
163+
return OB_TABLE_API_MOVE;
167164
case Pcodes.OB_ERROR_PACKET:
168165
return OB_ERROR_PACKET;
169166
}

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
122122
ObRpcResultCode resultCode = new ObRpcResultCode();
123123
resultCode.decode(buf);
124124
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
125-
if (response.getHeader().isRoutingWrong()) {
125+
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
126126
String errMessage = TraceUtil.formatTraceMessage(conn, request,
127127
"routed to the wrong server: " + response.getMessage());
128128
logger.warn(errMessage);
@@ -139,7 +139,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
139139
throw new ObTableNeedFetchAllException(errMessage);
140140
}
141141
}
142-
if (resultCode.getRcode() != 0) {
142+
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
143143
String errMessage = TraceUtil.formatTraceMessage(conn, request,
144144
"routed to the wrong server: " + response.getMessage());
145145
logger.warn(errMessage);
@@ -186,7 +186,6 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
186186
InvokeCallback invokeCallback) {
187187
return new ObClientFuture(request.getId());
188188
}
189-
190189

191190
// schema changed
192191
private boolean needFetchAll(int errorCode, int pcode) {

src/main/java/com/alipay/oceanbase/rpc/location/model/ObReplicaType.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package com.alipay.oceanbase.rpc.location.model;
1919

20+
import java.util.Objects;
21+
2022
/**
2123
* ObReplicaType(副本类型)
2224
*
@@ -72,6 +74,10 @@ static public ObReplicaType getReplicaType(int idx) {
7274
}
7375
}
7476

77+
public int getIndex() {
78+
return this.index;
79+
}
80+
7581
/*
7682
* whether the replica is readable.
7783
*/
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*-
2+
* #%L
3+
* OBKV Table Client Framework
4+
* %%
5+
* Copyright (C) 2021 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
package com.alipay.oceanbase.rpc.location.model;
18+
19+
import com.alipay.oceanbase.rpc.ObTableClient;
20+
21+
22+
import java.util.concurrent.ConcurrentLinkedQueue;
23+
import java.util.concurrent.Semaphore;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.locks.Lock;
26+
import java.util.concurrent.locks.ReentrantLock;
27+
28+
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
29+
import org.slf4j.Logger;
30+
31+
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.getLogger;
32+
33+
public class RouteTableRefresher extends Thread{
34+
35+
private static final Logger logger = getLogger(RouteTableRefresher.class);
36+
37+
private volatile AtomicBoolean isFinished = new AtomicBoolean(false); // Thread end flag
38+
39+
private final Semaphore semaphore = new Semaphore(0);
40+
41+
private volatile ConcurrentLinkedQueue<ObPair<String, Boolean>> refreshTableTasks; // Task refresh queue
42+
43+
ObTableClient client;
44+
45+
private final Lock lock = new ReentrantLock(); // Ensure the atomicity of the AddIfAbsent operation.
46+
47+
public RouteTableRefresher(ObTableClient client){
48+
this.client = client;
49+
}
50+
51+
public void finish() {
52+
isFinished.set(true);
53+
}
54+
55+
@Override
56+
public void run() {
57+
refreshTableTasks = new ConcurrentLinkedQueue<>();
58+
while (!isFinished.get()) {
59+
try {
60+
semaphore.acquire(); // A semaphore is associated with a task; it ensures that only one task is processed at a time.
61+
logger.info("Thread name {}, id{} acquire semaphore, begin execute route refresher", currentThread().getName(), currentThread().getId());
62+
} catch (InterruptedException e) {
63+
logger.info("Thread name {}, id {} is interrupted", currentThread().getName(), currentThread().getId());
64+
}
65+
ObPair<String, Boolean> refreshTableTask = refreshTableTasks.peek();
66+
if (refreshTableTask != null && refreshTableTask.getRight()) {
67+
String tableName = refreshTableTask.getLeft();
68+
try {
69+
logger.info("backgroundRefreshTableTask run refresh, table name {}", tableName);
70+
TableEntry tableEntry = client.getOrRefreshTableEntry(tableName, true, false, false);
71+
client.getTableLocations().put(refreshTableTask.getLeft(), tableEntry);
72+
} catch (Exception e) {
73+
String message = "RefreshTableBackground run meet exception" + e.getMessage();
74+
logger.warn(message);
75+
}
76+
}
77+
refreshTableTasks.poll();
78+
}
79+
}
80+
81+
public void addTableIfAbsent(String tableName, Boolean isRefreshing){
82+
lock.lock();
83+
if (!refreshTableTasks.contains(new ObPair<>(tableName, isRefreshing))) {
84+
logger.info("add table {}, is refreshing {} to refresh task.", tableName, isRefreshing);
85+
refreshTableTasks.add(new ObPair<>(tableName,isRefreshing));
86+
}
87+
lock.unlock();
88+
}
89+
90+
public void triggerRefreshTable() {
91+
semaphore.release();
92+
}
93+
}

src/main/java/com/alipay/oceanbase/rpc/property/AbstractPropertyAware.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ public long parseToLong(String key, long defaultV) {
6060
}
6161
}
6262

63-
public boolean parseToBoolean(String key) {
63+
public boolean parseToBoolean(String key) throws Exception {
64+
if (System.getProperty(OB_TABLE_CLIENT_PREFIX + key) == null && getProperty(key) == null){
65+
throw new Exception();
66+
}
6467
return Boolean.parseBoolean(System.getProperty(OB_TABLE_CLIENT_PREFIX + key,
6568
getProperty(key)));
6669
}

src/main/java/com/alipay/oceanbase/rpc/property/Property.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public enum Property {
102102
RUNTIME_BATCH_MAX_WAIT("runtime.batch.max.wait", 3000L, "批量执行请求的超时时间"),
103103

104104
// [ObTableClient][LOG]
105-
SLOW_QUERY_MONITOR_THRESHOLD("slow.query.monitor.threshold", 10L, "记录到 MONITOR 日志中的慢操作的运行时间阈值"),
105+
SLOW_QUERY_MONITOR_THRESHOLD("slow.query.monitor.threshold", -1L, "记录到 MONITOR 日志中的慢操作的运行时间阈值"),
106106

107107
/*
108108
* property in [`ObTable`]
@@ -134,7 +134,7 @@ public enum Property {
134134
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"),
135135

136136
// [ObTable][OTHERS]
137-
SERVER_ENABLE_REROUTING("server.enable.rerouting", "false", "开启server端的重定向回复功能"),
137+
SERVER_ENABLE_REROUTING("server.enable.rerouting", true, "开启server端的重定向回复功能"),
138138

139139
/*
140140
* other config
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;
2+
3+
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
4+
import com.alipay.oceanbase.rpc.util.Serialization;
5+
import io.netty.buffer.ByteBuf;
6+
7+
import java.net.InetAddress;
8+
import java.net.UnknownHostException;
9+
import java.nio.ByteBuffer;
10+
import java.nio.ByteOrder;
11+
import java.util.Arrays;
12+
13+
public class ObAddr extends AbstractPayload {
14+
15+
private ObAddrVersion obAddrVersion;
16+
17+
private byte[] ip;
18+
private int port;
19+
20+
public ObAddr(){
21+
this.obAddrVersion = ObAddrVersion.ObAddrIPV4;
22+
this.ip = new byte[16];
23+
this.port = 0;
24+
}
25+
26+
public String ipToString() {
27+
if (isIPv4()){
28+
return getIPv4().getHostAddress();
29+
}
30+
return getIPv6().getHostAddress();
31+
}
32+
33+
public InetAddress getIPv4(){
34+
if (isIPv6()){
35+
return null;
36+
}
37+
try {
38+
return InetAddress.getByAddress(Arrays.copyOf(ip, 4));
39+
} catch (UnknownHostException e){
40+
// 需要查看对应的错误吗进行处理
41+
}
42+
return null;
43+
}
44+
45+
public InetAddress getIPv6() {
46+
if (isIPv6()){
47+
return null;
48+
}
49+
try {
50+
return InetAddress.getByAddress(ip);
51+
} catch (UnknownHostException e){
52+
// 需要查看对应的错误吗进行处理
53+
}
54+
return null;
55+
}
56+
57+
public boolean isIPv4() {
58+
return obAddrVersion == ObAddrVersion.ObAddrIPV4;
59+
}
60+
61+
public boolean isIPv6() {
62+
return obAddrVersion == ObAddrVersion.ObAddrIPV6;
63+
}
64+
65+
public int getPort(){
66+
return port;
67+
}
68+
69+
@Override
70+
public long getPayloadContentSize() {
71+
return 0;
72+
}
73+
74+
@Override
75+
public byte[] encode() {
76+
byte[] bytes = new byte[(int) getPayloadSize()];
77+
int idx = 0;
78+
79+
// 0. encode header
80+
idx = encodeHeader(bytes, idx);
81+
System.arraycopy(Serialization.encodeI8(obAddrVersion.getValue()), 0, bytes, idx, Serialization.getNeedBytes(obAddrVersion.getValue()));
82+
83+
ByteBuffer buffer = ByteBuffer.wrap(ip).order(ByteOrder.BIG_ENDIAN);
84+
int ip1 = buffer.getInt(0);
85+
int ip2 = buffer.getInt(4);
86+
int ip3 = buffer.getInt(8);
87+
int ip4 = buffer.getInt(12);
88+
System.arraycopy(Serialization.encodeVi32(ip1), 0, bytes, idx, Serialization.getNeedBytes(ip1));
89+
System.arraycopy(Serialization.encodeVi32(ip2), 0, bytes, idx, Serialization.getNeedBytes(ip2));
90+
System.arraycopy(Serialization.encodeVi32(ip3), 0, bytes, idx, Serialization.getNeedBytes(ip3));
91+
System.arraycopy(Serialization.encodeVi32(ip4), 0, bytes, idx, Serialization.getNeedBytes(ip4));
92+
93+
System.arraycopy(Serialization.encodeVi32(port), 0, bytes, idx, Serialization.getNeedBytes(port));
94+
return bytes;
95+
}
96+
97+
@Override
98+
public ObAddr decode(ByteBuf buf) {
99+
super.decode(buf);
100+
this.obAddrVersion = ObAddrVersion.fromValue(Serialization.decodeI8(buf));
101+
//decode ip addr
102+
103+
int ip1, ip2, ip3, ip4;
104+
ip1 = Serialization.decodeVi32(buf);
105+
ip2 = Serialization.decodeVi32(buf);
106+
ip3 = Serialization.decodeVi32(buf);
107+
ip4 = Serialization.decodeVi32(buf);
108+
ByteBuffer ipBuffer = ByteBuffer.wrap(ip).order(ByteOrder.BIG_ENDIAN);
109+
ipBuffer.putInt(0, ip1);
110+
ipBuffer.putInt(4, ip2);
111+
ipBuffer.putInt(8, ip3);
112+
ipBuffer.putInt(12, ip4);
113+
114+
this.port = Serialization.decodeVi32(buf);
115+
116+
return this;
117+
}
118+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;
2+
3+
public enum ObAddrVersion {
4+
ObAddrIPV4((byte) 4),
5+
ObAddrIPV6((byte) 6);
6+
7+
private final byte value;
8+
9+
ObAddrVersion(byte value) {
10+
this.value = value;
11+
}
12+
13+
public byte getValue() {
14+
return value;
15+
}
16+
17+
public static ObAddrVersion fromValue(int value){
18+
for (ObAddrVersion obAddrVersion : values()) {
19+
if (obAddrVersion.value == value) {
20+
return obAddrVersion;
21+
}
22+
}
23+
return ObAddrIPV4; //默认使用IPV4, 或者抛异常。
24+
}
25+
}

0 commit comments

Comments
 (0)