Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/main/java/com/actiontech/dble/config/model/SystemConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,17 @@ private SystemConfig() {
//unit: ms
private long releaseTimeout = 10L;

private int appendTraceId = 1;


public int getAppendTraceId() {
return appendTraceId;
}

public void setAppendTraceId(int appendTraceId) {
this.appendTraceId = appendTraceId;
}

public int getEnableAsyncRelease() {
return enableAsyncRelease;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@

import com.actiontech.dble.config.model.user.UserName;
import com.actiontech.dble.route.util.RouterUtil;
import com.actiontech.dble.server.trace.TraceResult;
import com.actiontech.dble.server.trace.ITraceResult;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

public class SlowQueryLogEntry {
private TraceResult trace;
private ITraceResult trace;
private long timeStamp;
private String sql;
private UserName user;
private String clientIp;
private long connID;

SlowQueryLogEntry(String sql, TraceResult traceResult, UserName user, String clientIp, long connID) {
SlowQueryLogEntry(String sql, ITraceResult traceResult, UserName user, String clientIp, long connID) {
this.timeStamp = System.currentTimeMillis();
this.sql = RouterUtil.getFixedSql(sql);
this.trace = traceResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.log.DailyRotateLogStore;
import com.actiontech.dble.server.status.SlowQueryLog;
import com.actiontech.dble.server.trace.TraceResult;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.server.trace.ITraceResult;
import com.actiontech.dble.services.BusinessService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -136,9 +136,9 @@ public void run() {
};
}

public void putSlowQueryLog(ShardingService service, TraceResult log) {
if (log.isCompleted() && log.getOverAllMilliSecond() > SlowQueryLog.getInstance().getSlowTime()) {
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(service.getExecuteSql(), log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
public void putSlowQueryLog(BusinessService service, ITraceResult log, String executeSql) {
if (log.isCompleted() && log.getOverAllMilliSecond() >= SlowQueryLog.getInstance().getSlowTime()) {
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(executeSql, log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
final boolean enQueue = queue.offer(logEntry);
if (!enQueue) {
//abort
Expand All @@ -147,4 +147,15 @@ public void putSlowQueryLog(ShardingService service, TraceResult log) {
}
}
}


public void putSlowQueryLogForce(BusinessService service, ITraceResult log, String executeSql) {
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(executeSql, log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
final boolean enQueue = queue.offer(logEntry);
if (!enQueue) {
//abort
String errorMsg = "since there are too many slow query logs to be written, some slow query logs will be discarded so as not to affect business requirements. Discard log entry: {" + logEntry.toString() + "}";
LOGGER.warn(errorMsg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ private ManagerParseOnOff() {
public static final int CUSTOM_MYSQL_HA = 3;
public static final int CAP_CLIENT_FOUND_ROWS = 4;


public static final int APPEND_TRACE_ID = 5;

public static int parse(String stmt, int offset) {
int i = offset;
for (; i < stmt.length(); i++) {
Expand Down Expand Up @@ -64,6 +67,9 @@ private static int aCheck(String stmt, int offset) {
if (prefix.startsWith("ALERT") && (stmt.length() == offset + 5 || ParseUtil.isEOF(stmt, offset + 5))) {
return ALERT;
}
if (prefix.startsWith("APPENDTRACEID") && (stmt.length() == offset + 13 || ParseUtil.isEOF(stmt, offset + 13))) {
return APPEND_TRACE_ID;
}
}
return OTHER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.mysql.ByteUtil;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.mysql.MySQLPacket;
import com.actiontech.dble.server.SessionStage;
import com.actiontech.dble.server.status.SlowQueryLog;
import com.actiontech.dble.server.trace.RwTraceResult;
import com.actiontech.dble.server.trace.TraceRecord;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.services.rwsplit.*;
import com.actiontech.dble.singleton.RouteService;
import com.actiontech.dble.util.StringUtil;
Expand All @@ -16,6 +22,8 @@
import java.io.IOException;
import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class RWSplitNonBlockingSession {

Expand All @@ -24,6 +32,9 @@ public class RWSplitNonBlockingSession {
private volatile BackendConnection conn;
private final RWSplitService rwSplitService;
private PhysicalDbGroup rwGroup;
private volatile RwTraceResult traceResult = new RwTraceResult();

private volatile SessionStage sessionStage = SessionStage.Init;

public RWSplitNonBlockingSession(RWSplitService service) {
this.rwSplitService = service;
Expand Down Expand Up @@ -62,6 +73,10 @@ public void execute(Boolean master, byte[] originPacket, Callback callback, bool
if (handler == null) return;
PhysicalDbInstance instance = rwGroup.rwSelect(canRunOnMaster(master), isWriteStatistical(writeStatistical), localRead);
checkDest(!instance.isReadInstance());
endRoute();
setPreExecuteEnd(RwTraceResult.SqlTraceType.RWSPLIT_QUERY);
setTraceSimpleHandler((ResponseHandler) handler);
traceResult.setDBInstance(instance);
instance.getConnection(rwSplitService.getSchema(), handler, null, false);
} catch (IOException e) {
LOGGER.warn("select conn error", e);
Expand All @@ -78,6 +93,10 @@ private RWSplitHandler getRwSplitHandler(byte[] originPacket, Callback callback)
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("select bind conn[id={}]", conn.getId());
}
endRoute();
setPreExecuteEnd(RwTraceResult.SqlTraceType.RWSPLIT_QUERY);
setTraceSimpleHandler(handler);
traceResult.setDBInstance((PhysicalDbInstance) conn.getInstance());
// for ps needs to send master
if ((originPacket != null && originPacket.length > 4 && originPacket[4] == MySQLPacket.COM_STMT_EXECUTE)) {
long statementId = ByteUtil.readUB4(originPacket, 5);
Expand Down Expand Up @@ -204,4 +223,105 @@ public RWSplitService getService() {
public BackendConnection getConn() {
return conn;
}


public void setRequestTime() {
sessionStage = SessionStage.Read_SQL;
long requestTime = 0;

if (SlowQueryLog.getInstance().isEnableSlowLog()) {
requestTime = System.nanoTime();
traceResult.setVeryStartPrepare(requestTime);
}

}

public void startProcess() {
sessionStage = SessionStage.Parse_SQL;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setParseStartPrepare(new TraceRecord(System.nanoTime()));
}
}

public void endParse() {
sessionStage = SessionStage.Route_Calculation;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.ready();
// traceResult.setRouteStart(new TraceRecord(System.nanoTime()));
}
}


public void endRoute() {
sessionStage = SessionStage.Prepare_to_Push;
}


public void setPreExecuteEnd(RwTraceResult.SqlTraceType type) {
sessionStage = SessionStage.Execute_SQL;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setType(type);
traceResult.setPreExecuteEnd(new TraceRecord(System.nanoTime()));
traceResult.clearConnReceivedMap();
traceResult.clearConnFlagMap();
}
}

public void setTraceSimpleHandler(ResponseHandler simpleHandler) {
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setSimpleHandler(simpleHandler);
}
}


public void setResponseTime(boolean isSuccess) {
sessionStage = SessionStage.Finished;
long responseTime = 0;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
responseTime = System.nanoTime();
traceResult.setVeryEnd(responseTime);
if (isSuccess && SlowQueryLog.getInstance().isEnableSlowLog()) {
SlowQueryLog.getInstance().putSlowQueryLog(this.rwSplitService, (RwTraceResult) traceResult);
traceResult = new RwTraceResult();
}
}
}

public void setBackendResponseEndTime(MySQLResponseService service) {
sessionStage = SessionStage.First_Node_Fetched_Result;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
ResponseHandler responseHandler = service.getResponseHandler();
if (responseHandler != null) {
TraceRecord record = new TraceRecord(System.nanoTime());
Map<String, TraceRecord> connMap = new ConcurrentHashMap<>();
String key = String.valueOf(service.getConnection().getId());
connMap.put(key, record);
traceResult.addToConnFinishedMap(responseHandler, connMap);
}
}

}



public void setBackendResponseTime(MySQLResponseService service) {
sessionStage = SessionStage.Fetching_Result;
long responseTime = 0;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
ResponseHandler responseHandler = service.getResponseHandler();
String key = String.valueOf(service.getConnection().getId());
if (responseHandler != null && traceResult.addToConnFlagMap(key) == null) {
responseTime = System.nanoTime();
TraceRecord record = new TraceRecord(responseTime);
Map<String, TraceRecord> connMap = new ConcurrentHashMap<>();
connMap.put(key, record);
traceResult.addToConnReceivedMap(responseHandler, connMap);
}
}


}



}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.log.slow.SlowQueryLogProcessor;
import com.actiontech.dble.server.trace.TraceResult;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.server.trace.ITraceResult;
import com.actiontech.dble.services.BusinessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -84,7 +84,11 @@ public void setFlushSize(int flushSize) {
this.flushSize = flushSize;
}

public void putSlowQueryLog(ShardingService service, TraceResult log) {
processor.putSlowQueryLog(service, log);
public void putSlowQueryLog(BusinessService service, ITraceResult log) {
processor.putSlowQueryLog(service, log, service.getExecuteSql());
}

public void putSlowQueryLogForce(BusinessService service, ITraceResult log, String sql) {
processor.putSlowQueryLogForce(service, log, sql);
}
}
29 changes: 29 additions & 0 deletions src/main/java/com/actiontech/dble/server/trace/ITraceResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2016-2023 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/

package com.actiontech.dble.server.trace;

import java.util.List;

/**
* @author dcy
* Create Date: 2025-04-17
*/
public interface ITraceResult {
public enum SqlTraceType {
SINGLE_NODE_QUERY, MULTI_NODE_QUERY, MULTI_NODE_GROUP, COMPLEX_QUERY, RWSPLIT_QUERY;
}

boolean isCompleted();

RwTraceResult.SqlTraceType getType();

List<String[]> genLogResult();

double getOverAllMilliSecond();

String getOverAllSecond();
}
Loading
Loading