Skip to content

Commit 255281c

Browse files
shenyunlongIHEII
authored andcommitted
[feature] Add slow_query_monitor_threshold property
1. change Monitor syncAppender to AsyncAppender 2. add slow_query_threshold property 3. fix some monitor log format problem 4. fix setRuntimeRetryTimes(0) is not works Link: https://code.alibaba-inc.com/oceanbase/obkv-table-client-java/codereview/11966579 * use async appender to print monitor log * [feature] Add slow_query_threshold property * use async appender to print monitor log * [feature] Add slow_query_monitor_threshold property
1 parent 4b7531c commit 255281c

File tree

9 files changed

+416
-245
lines changed

9 files changed

+416
-245
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 54 additions & 125 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121
import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginRequest;
2222
import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginResult;
2323
import com.alipay.oceanbase.rpc.table.ObTable;
24-
import com.alipay.oceanbase.rpc.util.ObBytesString;
25-
import com.alipay.oceanbase.rpc.util.Security;
26-
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
27-
import com.alipay.oceanbase.rpc.util.TraceUtil;
24+
import com.alipay.oceanbase.rpc.util.*;
2825
import com.alipay.remoting.Connection;
2926
import org.slf4j.Logger;
3027

@@ -99,9 +96,11 @@ private synchronized boolean connect() throws Exception {
9996
}
10097
}
10198
String endpoint = obTable.getIp() + ":" + obTable.getPort();
102-
MONITOR.info(logMessage("", "CONNECT", endpoint, System.currentTimeMillis() - start));
99+
MONITOR.info(logMessage(null, "CONNECT", endpoint, System.currentTimeMillis() - start));
103100

104101
if (tries >= maxTryTimes) {
102+
LOGGER.warn("connect failed after max " + maxTryTimes + " tries "
103+
+ TraceUtil.formatIpPort(obTable));
105104
throw new ObTableServerConnectException("connect failed after max " + maxTryTimes
106105
+ " tries " + TraceUtil.formatIpPort(obTable),
107106
cause);
@@ -156,6 +155,8 @@ private synchronized void login() throws Exception {
156155
MONITOR.info(logMessage(formatTraceMessage(request), "LOGIN", endpoint,
157156
System.currentTimeMillis() - start));
158157
if (tries >= maxTryTimes) {
158+
LOGGER.warn("login failed after max " + maxTryTimes + " tries "
159+
+ TraceUtil.formatIpPort(obTable));
159160
throw new ObTableServerConnectException("login failed after max " + maxTryTimes
160161
+ " tries " + TraceUtil.formatIpPort(obTable),
161162
cause);
@@ -291,8 +292,11 @@ private String logMessage(String traceId, String methodName, String endpoint, lo
291292
}
292293

293294
StringBuilder stringBuilder = new StringBuilder();
294-
stringBuilder.append(traceId).append(",").append(methodName).append(",").append(endpoint)
295-
.append(",").append(executeTime);
295+
if (traceId != null) {
296+
stringBuilder.append(traceId).append(" - ");
297+
}
298+
stringBuilder.append(methodName).append(",").append(endpoint).append(",")
299+
.append(executeTime);
296300
return stringBuilder.toString();
297301
}
298302

src/main/java/com/alipay/oceanbase/rpc/property/Property.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,9 @@ public enum Property {
125125
// that is, flush the buffer from high_watermark(1M) to low_watermark(512k) by default.
126126
// So before throw Exception when buffer is full, sleep 1ms by default for the scenario
127127
// when a big package is blocking the buffer but the server is OK.
128-
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间");
128+
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"),
129+
130+
SLOW_QUERY_MONITOR_THRESHOLD("slow.query.monitor.threshold", 10L, "记录到 MONITOR 日志中的慢操作的运行时间阈值");
129131

130132
private final String key;
131133
private final Object defaultV;

src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTableClient.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ public abstract class AbstractObTableClient extends AbstractTable {
9292
.getDefaultInt();
9393
protected int rpcLoginTimeout = RPC_LOGIN_TIMEOUT
9494
.getDefaultInt();
95+
protected long slowQueryMonitorThreshold = SLOW_QUERY_MONITOR_THRESHOLD
96+
.getDefaultLong();
9597

9698
@Deprecated
9799
/*
@@ -361,6 +363,8 @@ public long getRuntimeMaxWait() {
361363
*/
362364
public void setRuntimeMaxWait(long runtimeMaxWait) {
363365
this.runtimeMaxWait = runtimeMaxWait;
366+
this.properties.put(RUNTIME_MAX_WAIT.getKey(), String.valueOf(runtimeMaxWait));
367+
364368
}
365369

366370
/*
@@ -388,10 +392,8 @@ public void setRuntimeRetryInterval(int runtimeRetryInterval) {
388392
* Set runtime retry times.
389393
*/
390394
public void setRuntimeRetryTimes(int runtimeRetryTimes) {
391-
// ignore the illegal param
392-
if (runtimeRetryTimes >= 1) {
393-
this.runtimeRetryTimes = runtimeRetryTimes;
394-
}
395+
this.properties.put(RUNTIME_RETRY_TIMES.getKey(), String.valueOf(runtimeRetryTimes));
396+
this.runtimeRetryTimes = runtimeRetryTimes;
395397
}
396398

397399
/*
@@ -464,6 +466,7 @@ public long getRuntimeBatchMaxWait() {
464466
* Set runtime batch max wait.
465467
*/
466468
public void setRuntimeBatchMaxWait(long runtimeBatchMaxWait) {
469+
this.properties.put(RUNTIME_BATCH_MAX_WAIT.getKey(), String.valueOf(runtimeBatchMaxWait));
467470
this.runtimeBatchMaxWait = runtimeBatchMaxWait;
468471
}
469472

@@ -480,4 +483,20 @@ public ExecutorService getRuntimeBatchExecutor() {
480483
public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) {
481484
this.runtimeBatchExecutor = runtimeBatchExecutor;
482485
}
486+
487+
/*
488+
* Get slow query threshold.
489+
*/
490+
public long getslowQueryMonitorThreshold() {
491+
return slowQueryMonitorThreshold;
492+
}
493+
494+
/*
495+
* Set slow query threshold.
496+
*/
497+
public void setslowQueryMonitorThreshold(long slowQueryMonitorThreshold) {
498+
this.properties.put(SLOW_QUERY_MONITOR_THRESHOLD.getKey(),
499+
String.valueOf(slowQueryMonitorThreshold));
500+
this.slowQueryMonitorThreshold = slowQueryMonitorThreshold;
501+
}
483502
}

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java

Lines changed: 9 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
2828
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
2929
import com.alipay.oceanbase.rpc.threadlocal.ThreadLocalMap;
30+
import com.alipay.oceanbase.rpc.util.MonitorUtil;
3031
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
3132
import org.slf4j.Logger;
3233

3334
import java.util.*;
3435
import java.util.concurrent.ExecutorService;
3536
import java.util.concurrent.TimeUnit;
3637

37-
import static com.alipay.oceanbase.rpc.ObTableClient.buildParamsString;
3838
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.*;
3939
import static com.alipay.oceanbase.rpc.util.TraceUtil.formatTraceMessage;
4040

@@ -392,9 +392,6 @@ public void partitionExecute(ObTableOperationResult[] results,
392392

393393
List<ObTableOperationResult> subObTableOperationResults = subObTableBatchOperationResult
394394
.getResults();
395-
String endpoint = subObTable.getIp() + ":" + subObTable.getPort();
396-
logMessage0(formatTraceMessage(subRequest), tableName, "BATCH-partitionExecute-", endpoint,
397-
subOperations, partId, subObTableOperationResults.size(), endExecute - startExecute);
398395

399396
if (subObTableOperationResults.size() < subOperations.getTableOperations().size()) {
400397
// only one result when it across failed
@@ -430,49 +427,13 @@ public void partitionExecute(ObTableOperationResult[] results,
430427
results[subOperationWithIndexList.get(i).getLeft()] = subObTableOperationResult;
431428
}
432429
}
430+
String endpoint = subObTable.getIp() + ":" + subObTable.getPort();
431+
MonitorUtil.info(subRequest, subObTable.getDatabase(), tableName,
432+
"BATCH-partitionExecute-", endpoint, subOperations, partId,
433+
subObTableOperationResults.size(), endExecute - startExecute,
434+
obTableClient.getslowQueryMonitorThreshold());
433435
}
434436

435-
private void logMessage0(String traceId, String tableName, String methodName, String endpoint, ObTableBatchOperation subOperations,
436-
long partId, int resultSize, long executeTime) {
437-
List<ObTableOperation> ops = subOperations.getTableOperations();
438-
for (ObTableOperation op : ops) {
439-
List<Object> rowKeys = new ArrayList<>();
440-
ObTableOperationType type = op.getOperationType();
441-
ObITableEntity entity = op.getEntity();
442-
if (entity != null) {
443-
long rowkeySize = entity.getRowKeySize();
444-
ObRowKey rowKey = entity.getRowKey();
445-
if (rowKey != null && rowkeySize != 0) {
446-
for (int i = 0; i < rowkeySize; i++) {
447-
ObObj obObj = entity.getRowKeyValue(i);
448-
if (obObj != null) {
449-
rowKeys.add(obObj.getValue());
450-
}
451-
}
452-
}
453-
}
454-
MONITOR.info(logMessage(traceId, tableName, methodName+type+"-"+partId, endpoint, rowKeys, resultSize, executeTime));
455-
}
456-
}
457-
458-
private String logMessage(String traceId, String tableName, String methodName, String endpoint,
459-
List<Object> rowKeys, int resultSize, long executeTime) {
460-
if (org.apache.commons.lang.StringUtils.isNotBlank(endpoint)) {
461-
endpoint = endpoint.replaceAll(",", "#");
462-
}
463-
464-
String argsValue = buildParamsString(rowKeys);
465-
466-
StringBuilder stringBuilder = new StringBuilder();
467-
stringBuilder.append(traceId).append(",").append(obTableClient.getDatabase()).append(",")
468-
.append(tableName).append(",").append(methodName).append(",").append(endpoint)
469-
.append(",").append(argsValue).append(",").append(",").append(resultSize).append(",")
470-
.append(0).append(",").append(executeTime).append(",").append(executeTime);
471-
return stringBuilder.toString();
472-
}
473-
474-
;
475-
476437
/*
477438
* Execute internal.
478439
*/
@@ -557,28 +518,14 @@ public void doTask() {
557518
batchOperationResult.addResult(obTableOperationResult);
558519
}
559520

560-
MONITOR.info(logMessage(formatTraceMessage(batchOperationResult), tableName, "BATCH", "",
521+
MonitorUtil.info(batchOperationResult, obTableClient.getDatabase(), tableName, "BATCH", "",
561522
obTableOperationResults.length, getTableTime - start, System.currentTimeMillis()
562-
- getTableTime));
523+
- getTableTime,
524+
obTableClient.getslowQueryMonitorThreshold());
563525

564526
return batchOperationResult;
565527
}
566528

567-
private String logMessage(String traceId, String tableName, String methodName, String endpoint,
568-
int resultSize, long routeTableTime, long executeTime) {
569-
if (org.apache.commons.lang.StringUtils.isNotBlank(endpoint)) {
570-
endpoint = endpoint.replaceAll(",", "#");
571-
}
572-
573-
StringBuilder stringBuilder = new StringBuilder();
574-
stringBuilder.append(traceId).append(",").append(obTableClient.getDatabase())
575-
.append(tableName).append(",").append(methodName).append(",").append(endpoint)
576-
.append(",").append(",").append(",").append(resultSize).append(",")
577-
.append(routeTableTime).append(",").append(executeTime).append(",")
578-
.append(routeTableTime + executeTime);
579-
return stringBuilder.toString();
580-
}
581-
582529
/*
583530
* clear batch operations1
584531
*/

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,13 @@
2626
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
2727
import com.alipay.oceanbase.rpc.stream.QueryResultSet;
2828
import com.alipay.oceanbase.rpc.table.api.TableQuery;
29+
import com.alipay.oceanbase.rpc.util.MonitorUtil;
2930

3031
import java.util.ArrayList;
3132
import java.util.HashMap;
3233
import java.util.List;
3334
import java.util.Map;
3435

35-
import static com.alipay.oceanbase.rpc.ObTableClient.buildParamsString;
36-
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.MONITOR;
37-
import static com.alipay.oceanbase.rpc.util.TraceUtil.formatTraceMessage;
38-
3936
public class ObTableClientQueryImpl extends AbstractTableQueryImpl {
4037

4138
private final String tableName;
@@ -118,15 +115,14 @@ public ObTableClientQueryStreamResult executeInternal() throws Exception {
118115
}
119116
final long startTime = System.currentTimeMillis();
120117
Map<Long, ObPair<Long, ObTable>> partitionObTables = new HashMap<Long, ObPair<Long, ObTable>>();
121-
List<Object> params = new ArrayList<>();
122118
// fill a whole range if no range is added explicitly.
123119
if (tableQuery.getKeyRanges().isEmpty()) {
124120
tableQuery.addKeyRange(ObNewRange.getWholeRange());
125121
}
126122
if (obTableClient.isOdpMode()) {
127123
if (tableQuery.getScanRangeColumns().isEmpty()) {
128-
if (tableQuery.getIndexName() != null &&
129-
!tableQuery.getIndexName().equalsIgnoreCase("primary")) {
124+
if (tableQuery.getIndexName() != null
125+
&& !tableQuery.getIndexName().equalsIgnoreCase("primary")) {
130126
throw new ObTableException("key range columns must be specified when use index");
131127
}
132128
}
@@ -141,12 +137,10 @@ public ObTableClientQueryStreamResult executeInternal() throws Exception {
141137
Object[] end = new Object[endKeySize];
142138
for (int i = 0; i < startKeySize; i++) {
143139
start[i] = startKey.getObj(i).getValue();
144-
params.add(start[i]);
145140
}
146141

147142
for (int i = 0; i < endKeySize; i++) {
148143
end[i] = endKey.getObj(i).getValue();
149-
params.add(end[i]);
150144
}
151145
ObBorderFlag borderFlag = rang.getBorderFlag();
152146
List<ObPair<Long, ObTable>> pairs = obTableClient.getTables(tableName, start,
@@ -160,7 +154,8 @@ public ObTableClientQueryStreamResult executeInternal() throws Exception {
160154

161155
StringBuilder stringBuilder = new StringBuilder();
162156
for (Map.Entry<Long, ObPair<Long, ObTable>> entry : partitionObTables.entrySet()) {
163-
stringBuilder.append("#").append(entry.getValue().getRight().getIp()).append(":").append(entry.getValue().getRight().getPort());
157+
stringBuilder.append("#").append(entry.getValue().getRight().getIp()).append(":")
158+
.append(entry.getValue().getRight().getPort());
164159
}
165160
String endpoint = stringBuilder.toString();
166161
long getTableTime = System.currentTimeMillis();
@@ -175,32 +170,14 @@ public ObTableClientQueryStreamResult executeInternal() throws Exception {
175170
obTableClientQueryStreamResult.setReadConsistency(obTableClient.getReadConsistency());
176171
obTableClientQueryStreamResult.init();
177172

178-
MONITOR.info(logMessage(formatTraceMessage(obTableClientQueryStreamResult), tableName, "QUERY",
179-
endpoint, params, obTableClientQueryStreamResult, getTableTime - startTime, System.currentTimeMillis() - getTableTime));
180-
173+
MonitorUtil.info(obTableClientQueryStreamResult, obTableClient.getDatabase(), tableName,
174+
"QUERY", endpoint, tableQuery, obTableClientQueryStreamResult,
175+
getTableTime - startTime, System.currentTimeMillis() - getTableTime,
176+
obTableClient.getslowQueryMonitorThreshold());
181177

182178
return obTableClientQueryStreamResult;
183179
}
184180

185-
private String logMessage(String traceId, String tableName, String methodName, String endpoint,
186-
List<Object> params, ObTableClientQueryStreamResult result,
187-
long routeTableTime, long executeTime) {
188-
if (org.apache.commons.lang.StringUtils.isNotBlank(endpoint)) {
189-
endpoint = endpoint.replaceAll(",", "#");
190-
}
191-
192-
String argsValue = buildParamsString(params);
193-
194-
String res = String.valueOf(result.getCacheRows().size());
195-
196-
StringBuilder stringBuilder = new StringBuilder();
197-
stringBuilder.append(traceId).append(",").append(obTableClient.getDatabase()).append(",")
198-
.append(tableName).append(",").append(methodName).append(",").append(endpoint)
199-
.append(",").append(argsValue).append(",").append(res).append(",")
200-
.append(routeTableTime).append(",").append(executeTime);
201-
return stringBuilder.toString();
202-
}
203-
204181
/*
205182
* Clear.
206183
*/

0 commit comments

Comments
 (0)