diff --git a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java index a6b4de5635..3b2abf0277 100644 --- a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java @@ -180,6 +180,17 @@ private SystemConfig() { //unit: ms private long releaseTimeout = 10L; + private int appendTraceId = 1; + + + public int getAppendTraceId() { + return appendTraceId; + } + + public void setAppendTraceId(int appendTraceId) { + this.appendTraceId = appendTraceId; + } + public int getEnableAsyncRelease() { return enableAsyncRelease; } diff --git a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogEntry.java b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogEntry.java index 824b7855b9..d890e534bd 100644 --- a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogEntry.java +++ b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogEntry.java @@ -7,21 +7,21 @@ import com.actiontech.dble.config.model.user.UserName; import com.actiontech.dble.route.util.RouterUtil; -import com.actiontech.dble.server.trace.TraceResult; +import com.actiontech.dble.server.trace.ITraceResult; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; public class SlowQueryLogEntry { - private TraceResult trace; + private ITraceResult trace; private long timeStamp; private String sql; private UserName user; private String clientIp; private long connID; - SlowQueryLogEntry(String sql, TraceResult traceResult, UserName user, String clientIp, long connID) { + SlowQueryLogEntry(String sql, ITraceResult traceResult, UserName user, String clientIp, long connID) { this.timeStamp = System.currentTimeMillis(); this.sql = RouterUtil.getFixedSql(sql); this.trace = traceResult; diff --git a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java index b9a0daeb0d..b469ed9bbc 100644 --- a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java +++ b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java @@ -9,8 +9,8 @@ import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.log.DailyRotateLogStore; import com.actiontech.dble.server.status.SlowQueryLog; -import com.actiontech.dble.server.trace.TraceResult; -import com.actiontech.dble.services.mysqlsharding.ShardingService; +import com.actiontech.dble.server.trace.ITraceResult; +import com.actiontech.dble.services.BusinessService; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,9 +136,9 @@ public void run() { }; } - public void putSlowQueryLog(ShardingService service, TraceResult log) { - if (log.isCompleted() && log.getOverAllMilliSecond() > SlowQueryLog.getInstance().getSlowTime()) { - SlowQueryLogEntry logEntry = new SlowQueryLogEntry(service.getExecuteSql(), log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId()); + public void putSlowQueryLog(BusinessService service, ITraceResult log, String executeSql) { + if (log.isCompleted() && log.getOverAllMilliSecond() >= SlowQueryLog.getInstance().getSlowTime()) { + SlowQueryLogEntry logEntry = new SlowQueryLogEntry(executeSql, log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId()); final boolean enQueue = queue.offer(logEntry); if (!enQueue) { //abort @@ -147,4 +147,15 @@ public void putSlowQueryLog(ShardingService service, TraceResult log) { } } } + + + public void putSlowQueryLogForce(BusinessService service, ITraceResult log, String executeSql) { + SlowQueryLogEntry logEntry = new SlowQueryLogEntry(executeSql, log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId()); + final boolean enQueue = queue.offer(logEntry); + if (!enQueue) { + //abort + String errorMsg = "since there are too many slow query logs to be written, some slow query logs will be discarded so as not to affect business requirements. Discard log entry: {" + logEntry.toString() + "}"; + LOGGER.warn(errorMsg); + } + } } diff --git a/src/main/java/com/actiontech/dble/route/parser/ManagerParseOnOff.java b/src/main/java/com/actiontech/dble/route/parser/ManagerParseOnOff.java index 52bbc95a92..fa889031b8 100644 --- a/src/main/java/com/actiontech/dble/route/parser/ManagerParseOnOff.java +++ b/src/main/java/com/actiontech/dble/route/parser/ManagerParseOnOff.java @@ -19,6 +19,9 @@ private ManagerParseOnOff() { public static final int CUSTOM_MYSQL_HA = 3; public static final int CAP_CLIENT_FOUND_ROWS = 4; + + public static final int APPEND_TRACE_ID = 5; + public static int parse(String stmt, int offset) { int i = offset; for (; i < stmt.length(); i++) { @@ -64,6 +67,9 @@ private static int aCheck(String stmt, int offset) { if (prefix.startsWith("ALERT") && (stmt.length() == offset + 5 || ParseUtil.isEOF(stmt, offset + 5))) { return ALERT; } + if (prefix.startsWith("APPENDTRACEID") && (stmt.length() == offset + 13 || ParseUtil.isEOF(stmt, offset + 13))) { + return APPEND_TRACE_ID; + } } return OTHER; } diff --git a/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java b/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java index 4181da8670..5ae19edaef 100644 --- a/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java @@ -3,9 +3,15 @@ import com.actiontech.dble.backend.datasource.PhysicalDbGroup; import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.mysql.ByteUtil; +import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; import com.actiontech.dble.config.ErrorCode; import com.actiontech.dble.net.connection.BackendConnection; import com.actiontech.dble.net.mysql.MySQLPacket; +import com.actiontech.dble.server.SessionStage; +import com.actiontech.dble.server.status.SlowQueryLog; +import com.actiontech.dble.server.trace.RwTraceResult; +import com.actiontech.dble.server.trace.TraceRecord; +import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; import com.actiontech.dble.services.rwsplit.*; import com.actiontech.dble.singleton.RouteService; import com.actiontech.dble.util.StringUtil; @@ -16,6 +22,8 @@ import java.io.IOException; import java.sql.SQLException; import java.sql.SQLSyntaxErrorException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class RWSplitNonBlockingSession { @@ -24,6 +32,9 @@ public class RWSplitNonBlockingSession { private volatile BackendConnection conn; private final RWSplitService rwSplitService; private PhysicalDbGroup rwGroup; + private volatile RwTraceResult traceResult = new RwTraceResult(); + + private volatile SessionStage sessionStage = SessionStage.Init; public RWSplitNonBlockingSession(RWSplitService service) { this.rwSplitService = service; @@ -62,6 +73,10 @@ public void execute(Boolean master, byte[] originPacket, Callback callback, bool if (handler == null) return; PhysicalDbInstance instance = rwGroup.rwSelect(canRunOnMaster(master), isWriteStatistical(writeStatistical), localRead); checkDest(!instance.isReadInstance()); + endRoute(); + setPreExecuteEnd(RwTraceResult.SqlTraceType.RWSPLIT_QUERY); + setTraceSimpleHandler((ResponseHandler) handler); + traceResult.setDBInstance(instance); instance.getConnection(rwSplitService.getSchema(), handler, null, false); } catch (IOException e) { LOGGER.warn("select conn error", e); @@ -78,6 +93,10 @@ private RWSplitHandler getRwSplitHandler(byte[] originPacket, Callback callback) if (LOGGER.isDebugEnabled()) { LOGGER.debug("select bind conn[id={}]", conn.getId()); } + endRoute(); + setPreExecuteEnd(RwTraceResult.SqlTraceType.RWSPLIT_QUERY); + setTraceSimpleHandler(handler); + traceResult.setDBInstance((PhysicalDbInstance) conn.getInstance()); // for ps needs to send master if ((originPacket != null && originPacket.length > 4 && originPacket[4] == MySQLPacket.COM_STMT_EXECUTE)) { long statementId = ByteUtil.readUB4(originPacket, 5); @@ -204,4 +223,105 @@ public RWSplitService getService() { public BackendConnection getConn() { return conn; } + + + public void setRequestTime() { + sessionStage = SessionStage.Read_SQL; + long requestTime = 0; + + if (SlowQueryLog.getInstance().isEnableSlowLog()) { + requestTime = System.nanoTime(); + traceResult.setVeryStartPrepare(requestTime); + } + + } + + public void startProcess() { + sessionStage = SessionStage.Parse_SQL; + if (SlowQueryLog.getInstance().isEnableSlowLog()) { + traceResult.setParseStartPrepare(new TraceRecord(System.nanoTime())); + } + } + + public void endParse() { + sessionStage = SessionStage.Route_Calculation; + if (SlowQueryLog.getInstance().isEnableSlowLog()) { + traceResult.ready(); + // traceResult.setRouteStart(new TraceRecord(System.nanoTime())); + } + } + + + public void endRoute() { + sessionStage = SessionStage.Prepare_to_Push; + } + + + public void setPreExecuteEnd(RwTraceResult.SqlTraceType type) { + sessionStage = SessionStage.Execute_SQL; + if (SlowQueryLog.getInstance().isEnableSlowLog()) { + traceResult.setType(type); + traceResult.setPreExecuteEnd(new TraceRecord(System.nanoTime())); + traceResult.clearConnReceivedMap(); + traceResult.clearConnFlagMap(); + } + } + + public void setTraceSimpleHandler(ResponseHandler simpleHandler) { + if (SlowQueryLog.getInstance().isEnableSlowLog()) { + traceResult.setSimpleHandler(simpleHandler); + } + } + + + public void setResponseTime(boolean isSuccess) { + sessionStage = SessionStage.Finished; + long responseTime = 0; + if (SlowQueryLog.getInstance().isEnableSlowLog()) { + responseTime = System.nanoTime(); + traceResult.setVeryEnd(responseTime); + if (isSuccess && SlowQueryLog.getInstance().isEnableSlowLog()) { + SlowQueryLog.getInstance().putSlowQueryLog(this.rwSplitService, (RwTraceResult) traceResult); + traceResult = new RwTraceResult(); + } + } + } + + public void setBackendResponseEndTime(MySQLResponseService service) { + sessionStage = SessionStage.First_Node_Fetched_Result; + if (SlowQueryLog.getInstance().isEnableSlowLog()) { + ResponseHandler responseHandler = service.getResponseHandler(); + if (responseHandler != null) { + TraceRecord record = new TraceRecord(System.nanoTime()); + Map connMap = new ConcurrentHashMap<>(); + String key = String.valueOf(service.getConnection().getId()); + connMap.put(key, record); + traceResult.addToConnFinishedMap(responseHandler, connMap); + } + } + + } + + + + public void setBackendResponseTime(MySQLResponseService service) { + sessionStage = SessionStage.Fetching_Result; + long responseTime = 0; + if (SlowQueryLog.getInstance().isEnableSlowLog()) { + ResponseHandler responseHandler = service.getResponseHandler(); + String key = String.valueOf(service.getConnection().getId()); + if (responseHandler != null && traceResult.addToConnFlagMap(key) == null) { + responseTime = System.nanoTime(); + TraceRecord record = new TraceRecord(responseTime); + Map connMap = new ConcurrentHashMap<>(); + connMap.put(key, record); + traceResult.addToConnReceivedMap(responseHandler, connMap); + } + } + + + } + + + } diff --git a/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java b/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java index 750df92ed3..069283249f 100644 --- a/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java +++ b/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java @@ -7,8 +7,8 @@ import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.log.slow.SlowQueryLogProcessor; -import com.actiontech.dble.server.trace.TraceResult; -import com.actiontech.dble.services.mysqlsharding.ShardingService; +import com.actiontech.dble.server.trace.ITraceResult; +import com.actiontech.dble.services.BusinessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +84,11 @@ public void setFlushSize(int flushSize) { this.flushSize = flushSize; } - public void putSlowQueryLog(ShardingService service, TraceResult log) { - processor.putSlowQueryLog(service, log); + public void putSlowQueryLog(BusinessService service, ITraceResult log) { + processor.putSlowQueryLog(service, log, service.getExecuteSql()); + } + + public void putSlowQueryLogForce(BusinessService service, ITraceResult log, String sql) { + processor.putSlowQueryLogForce(service, log, sql); } } diff --git a/src/main/java/com/actiontech/dble/server/trace/ITraceResult.java b/src/main/java/com/actiontech/dble/server/trace/ITraceResult.java new file mode 100644 index 0000000000..b2c82935bd --- /dev/null +++ b/src/main/java/com/actiontech/dble/server/trace/ITraceResult.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2016-2023 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.server.trace; + +import java.util.List; + +/** + * @author dcy + * Create Date: 2025-04-17 + */ +public interface ITraceResult { + public enum SqlTraceType { + SINGLE_NODE_QUERY, MULTI_NODE_QUERY, MULTI_NODE_GROUP, COMPLEX_QUERY, RWSPLIT_QUERY; + } + + boolean isCompleted(); + + RwTraceResult.SqlTraceType getType(); + + List genLogResult(); + + double getOverAllMilliSecond(); + + String getOverAllSecond(); +} diff --git a/src/main/java/com/actiontech/dble/server/trace/RwTraceResult.java b/src/main/java/com/actiontech/dble/server/trace/RwTraceResult.java new file mode 100644 index 0000000000..e84642a009 --- /dev/null +++ b/src/main/java/com/actiontech/dble/server/trace/RwTraceResult.java @@ -0,0 +1,275 @@ +/* + * Copyright (C) 2016-2020 ActionTech. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.server.trace; + +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class RwTraceResult implements Cloneable, ITraceResult { + + + private static final Logger LOGGER = LoggerFactory.getLogger(RwTraceResult.class); + // private boolean prepareFinished = false; + private long veryStartPrepare; + private long veryStart; + private TraceRecord requestStartPrepare; + private TraceRecord requestStart; + private TraceRecord parseStartPrepare; //requestEnd + private TraceRecord parseStart; //requestEnd + // private TraceRecord routeStart; //parseEnd + // private TraceRecord preExecuteStart; //routeEnd + private TraceRecord preExecuteEnd; + + + private ResponseHandler simpleHandler = null; + private ConcurrentMap connFlagMap = new ConcurrentHashMap<>(); + private ConcurrentMap> connReceivedMap = new ConcurrentHashMap<>(); + private ConcurrentMap> connFinishedMap = new ConcurrentHashMap<>(); + private ConcurrentMap recordStartMap = new ConcurrentHashMap<>(); + private ConcurrentMap recordEndMap = new ConcurrentHashMap<>(); + private long veryEnd; + private SqlTraceType type = SqlTraceType.RWSPLIT_QUERY; + private PhysicalDbInstance dbInstance = null; + + public void setVeryStartPrepare(long veryStartPrepare) { + // prepareFinished = false; + this.veryStartPrepare = veryStartPrepare; + this.requestStartPrepare = new TraceRecord(veryStartPrepare); + } + + public void setDBInstance(PhysicalDbInstance dbInstance) { + this.dbInstance = dbInstance; + } + + public void setParseStartPrepare(TraceRecord parseStartPrepare) { + this.parseStartPrepare = parseStartPrepare; + } + + + public void setPreExecuteEnd(TraceRecord preExecuteEnd) { + this.preExecuteEnd = preExecuteEnd; + } + + + public void setSimpleHandler(ResponseHandler simpleHandler) { + this.simpleHandler = simpleHandler; + } + + + public void setType(SqlTraceType type) { + this.type = type; + } + + + public Boolean addToConnFlagMap(String item) { + return connFlagMap.putIfAbsent(item, true); + } + + public void clearConnFlagMap() { + connFlagMap.clear(); + } + + public void addToConnReceivedMap(ResponseHandler responseHandler, Map connMap) { + Map existReceivedMap = connReceivedMap.putIfAbsent(responseHandler, connMap); + if (existReceivedMap != null) { + existReceivedMap.putAll(connMap); + } + } + + public void clearConnReceivedMap() { + connReceivedMap.clear(); + } + + public void addToConnFinishedMap(ResponseHandler responseHandler, Map connMap) { + Map existReceivedMap = connFinishedMap.putIfAbsent(responseHandler, connMap); + if (existReceivedMap != null) { + existReceivedMap.putAll(connMap); + } + } + + public void addToRecordStartMap(ResponseHandler handler, TraceRecord traceRecord) { + recordStartMap.putIfAbsent(handler, traceRecord); + } + + public void addToRecordEndMap(ResponseHandler handler, TraceRecord traceRecord) { + recordEndMap.putIfAbsent(handler, traceRecord); + } + + public void setVeryEnd(long veryEnd) { + this.veryEnd = veryEnd; + } + + public void ready() { + // prepareFinished = true; + clear(); + veryStart = veryStartPrepare; + requestStart = requestStartPrepare; + parseStart = parseStartPrepare; + veryStartPrepare = 0; + requestStartPrepare = null; + parseStartPrepare = null; + } + + private void clear() { + veryStart = 0; + requestStart = null; + parseStart = null; + preExecuteEnd = null; + this.type = null; + simpleHandler = null; + connFlagMap.clear(); + for (Map connReceived : connReceivedMap.values()) { + connReceived.clear(); + } + connReceivedMap.clear(); + for (Map connReceived : connFinishedMap.values()) { + connReceived.clear(); + } + connFinishedMap.clear(); + recordStartMap.clear(); + recordEndMap.clear(); + veryEnd = 0; + } + + + @Override + public boolean isCompleted() { + return veryStart != 0 && veryEnd != 0 && connFlagMap.size() != 0 && connReceivedMap.size() == connFinishedMap.size() && recordStartMap.size() == recordEndMap.size(); + } + + @Override + public SqlTraceType getType() { + return this.type; + } + + @Override + public List genLogResult() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("start genLogResult"); + } + if (!isCompleted()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("collect info not in pairs,veryEnd:" + veryEnd + ",connFlagMap.size:" + connFlagMap.size() + + ",connReceivedMap.size:" + connReceivedMap.size() + ",connFinishedMap.size:" + connFinishedMap.size() + + ",recordStartMap.size:" + connReceivedMap.size() + ",recordEndMap.size:" + connFinishedMap.size()); + } + return null; + } + List lst = new ArrayList<>(); + lst.add(genLogRecord("Read_SQL", requestStart.getTimestamp(), parseStart.getTimestamp())); + lst.add(genLogRecord("Prepare_Push", parseStart.getTimestamp(), preExecuteEnd.getTimestamp())); + if (simpleHandler != null) { + if (genSimpleLogs(lst)) return null; + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("not support trace this query"); + } + return null; + } + + lst.add(genLogRecord("Group_Name", dbInstance.getDbGroup().getGroupName())); + lst.add(genLogRecord("Instance_Name", dbInstance.getName())); + lst.add(genLogRecord("Is_Master", String.valueOf(!dbInstance.isReadInstance()))); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("end genLogResult"); + } + return lst; + } + + + private boolean genSimpleLogs(List lst) { + Map connFetchStartMap = connReceivedMap.get(simpleHandler); + Map connFetchEndMap = connFinishedMap.get(simpleHandler); + List executeList = new ArrayList<>(connFetchStartMap.size()); + List fetchList = new ArrayList<>(connFetchStartMap.size()); + long minFetchStart = Long.MAX_VALUE; + long maxFetchEnd = 0; + for (Map.Entry fetchStart : connFetchStartMap.entrySet()) { + TraceRecord fetchStartRecord = fetchStart.getValue(); + minFetchStart = Math.min(minFetchStart, fetchStartRecord.getTimestamp()); + executeList.add(genLogRecord("First_Result_Fetch", preExecuteEnd.getTimestamp(), fetchStartRecord.getTimestamp())); + TraceRecord fetchEndRecord = connFetchEndMap.get(fetchStart.getKey()); + if (fetchEndRecord == null) { + LOGGER.debug("connection fetchEndRecord is null "); + return true; + } + fetchList.add(genLogRecord("Last_Result_Fetch", fetchStartRecord.getTimestamp(), fetchEndRecord.getTimestamp())); + maxFetchEnd = Math.max(maxFetchEnd, fetchEndRecord.getTimestamp()); + } + lst.addAll(executeList); + lst.addAll(fetchList); + lst.add(genLogRecord("Write_Client", minFetchStart, veryEnd)); + return false; + } + + private String[] genLogRecord(String operation, long start, long end) { + String[] readQuery = new String[2]; + readQuery[0] = operation; + readQuery[1] = nanoToSecond(end - start); + return readQuery; + } + + private String[] genLogRecord(String operation, String value) { + String[] readQuery = new String[2]; + readQuery[0] = operation; + readQuery[1] = value; + return readQuery; + } + + @Override + public double getOverAllMilliSecond() { + return (double) (veryEnd - veryStart) / 1000000; + } + + private String nanoToSecond(long nano) { + double milliSecond = (double) nano / 1000000000; + return String.format("%.6f", milliSecond); + } + + @Override + public String getOverAllSecond() { + return nanoToSecond(veryEnd - veryStart); + } + + @Override + public Object clone() { + RwTraceResult tr; + try { + tr = (RwTraceResult) super.clone(); + tr.simpleHandler = this.simpleHandler; + tr.connFlagMap = new ConcurrentHashMap<>(); + tr.connFlagMap.putAll(this.connFlagMap); + tr.connReceivedMap = new ConcurrentHashMap<>(); + for (Map.Entry> item : connReceivedMap.entrySet()) { + Map connMap = new ConcurrentHashMap<>(); + connMap.putAll(item.getValue()); + tr.connReceivedMap.put(item.getKey(), connMap); + } + tr.connFinishedMap = new ConcurrentHashMap<>(); + for (Map.Entry> item : connFinishedMap.entrySet()) { + Map connMap = new ConcurrentHashMap<>(); + connMap.putAll(item.getValue()); + tr.connFinishedMap.put(item.getKey(), connMap); + } + tr.recordStartMap = new ConcurrentHashMap<>(); + tr.recordStartMap.putAll(this.recordStartMap); + tr.recordEndMap = new ConcurrentHashMap<>(); + tr.recordEndMap.putAll(this.recordEndMap); + return tr; + } catch (Exception e) { + LOGGER.warn("clone TraceResult error", e); + throw new AssertionError(e.getMessage()); + } + } +} diff --git a/src/main/java/com/actiontech/dble/server/trace/TraceResult.java b/src/main/java/com/actiontech/dble/server/trace/TraceResult.java index 2a0267f21a..be871b7a29 100644 --- a/src/main/java/com/actiontech/dble/server/trace/TraceResult.java +++ b/src/main/java/com/actiontech/dble/server/trace/TraceResult.java @@ -20,12 +20,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -public class TraceResult implements Cloneable { +public class TraceResult implements Cloneable, ITraceResult { - public enum SqlTraceType { - SINGLE_NODE_QUERY, MULTI_NODE_QUERY, MULTI_NODE_GROUP, COMPLEX_QUERY; - } private static final Logger LOGGER = LoggerFactory.getLogger(TraceResult.class); private boolean prepareFinished = false; @@ -573,6 +570,7 @@ private String nanoToMilliSecond(long nano) { return String.valueOf(milliSecond); } + @Override public boolean isCompleted() { return veryStart != 0 && veryEnd != 0 && connFlagMap.size() != 0 && connReceivedMap.size() == connFinishedMap.size() && recordStartMap.size() == recordEndMap.size(); } diff --git a/src/main/java/com/actiontech/dble/services/manager/handler/DisableHandler.java b/src/main/java/com/actiontech/dble/services/manager/handler/DisableHandler.java index 3a69aab512..dd048e2499 100644 --- a/src/main/java/com/actiontech/dble/services/manager/handler/DisableHandler.java +++ b/src/main/java/com/actiontech/dble/services/manager/handler/DisableHandler.java @@ -6,12 +6,9 @@ package com.actiontech.dble.services.manager.handler; import com.actiontech.dble.config.ErrorCode; -import com.actiontech.dble.services.manager.ManagerService; -import com.actiontech.dble.services.manager.response.OnOffAlert; -import com.actiontech.dble.services.manager.response.OnOffCapClientFoundRows; -import com.actiontech.dble.services.manager.response.OnOffCustomMySQLHa; -import com.actiontech.dble.services.manager.response.OnOffSlowQueryLog; import com.actiontech.dble.route.parser.ManagerParseOnOff; +import com.actiontech.dble.services.manager.ManagerService; +import com.actiontech.dble.services.manager.response.*; public final class DisableHandler { private DisableHandler() { @@ -32,6 +29,9 @@ public static void handle(String stmt, ManagerService service, int offset) { case ManagerParseOnOff.CAP_CLIENT_FOUND_ROWS: OnOffCapClientFoundRows.execute(service, false); break; + case ManagerParseOnOff.APPEND_TRACE_ID: + OnOffAppendTraceId.execute(service, false); + break; default: service.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement"); } diff --git a/src/main/java/com/actiontech/dble/services/manager/handler/EnableHandler.java b/src/main/java/com/actiontech/dble/services/manager/handler/EnableHandler.java index 9d59d700b0..73037cd140 100644 --- a/src/main/java/com/actiontech/dble/services/manager/handler/EnableHandler.java +++ b/src/main/java/com/actiontech/dble/services/manager/handler/EnableHandler.java @@ -7,10 +7,7 @@ import com.actiontech.dble.config.ErrorCode; import com.actiontech.dble.services.manager.ManagerService; -import com.actiontech.dble.services.manager.response.OnOffAlert; -import com.actiontech.dble.services.manager.response.OnOffCapClientFoundRows; -import com.actiontech.dble.services.manager.response.OnOffCustomMySQLHa; -import com.actiontech.dble.services.manager.response.OnOffSlowQueryLog; +import com.actiontech.dble.services.manager.response.*; import com.actiontech.dble.route.parser.ManagerParseOnOff; public final class EnableHandler { @@ -32,6 +29,9 @@ public static void handle(String stmt, ManagerService service, int offset) { case ManagerParseOnOff.CAP_CLIENT_FOUND_ROWS: OnOffCapClientFoundRows.execute(service, true); break; + case ManagerParseOnOff.APPEND_TRACE_ID: + OnOffAppendTraceId.execute(service, true); + break; default: service.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement"); } diff --git a/src/main/java/com/actiontech/dble/services/manager/response/OnOffAppendTraceId.java b/src/main/java/com/actiontech/dble/services/manager/response/OnOffAppendTraceId.java new file mode 100644 index 0000000000..13f35548fe --- /dev/null +++ b/src/main/java/com/actiontech/dble/services/manager/response/OnOffAppendTraceId.java @@ -0,0 +1,43 @@ +package com.actiontech.dble.services.manager.response; + +import com.actiontech.dble.config.ErrorCode; +import com.actiontech.dble.net.mysql.OkPacket; +import com.actiontech.dble.services.manager.ManagerService; +import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap; +import com.actiontech.dble.singleton.AppendTraceId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public final class OnOffAppendTraceId { + + private static final Logger LOGGER = LoggerFactory.getLogger(OnOffAppendTraceId.class); + + private OnOffAppendTraceId() { + } + + public static void execute(ManagerService service, boolean isOn) { + String onOffStatus = isOn ? "enable" : "disable"; + try { + WriteDynamicBootstrap.getInstance().changeValue("appendTraceId", isOn ? "1" : "0"); + } catch (IOException e) { + String msg = onOffStatus + " appendTraceId failed"; + LOGGER.warn(String.valueOf(service) + " " + msg, e); + service.writeErrMessage(ErrorCode.ER_YES, msg); + return; + } + + AppendTraceId.getInstance().setValue(isOn ? 1 : 0); + LOGGER.info(String.valueOf(service) + " " + onOffStatus + " appendTraceId success by manager"); + + OkPacket ok = new OkPacket(); + ok.setPacketId(1); + ok.setAffectedRows(1); + ok.setServerStatus(2); + ok.setMessage((onOffStatus + " appendTraceId success").getBytes()); + ok.write(service.getConnection()); + + } + +} diff --git a/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java b/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java index ae1c596dad..dbac7694a9 100644 --- a/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java +++ b/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java @@ -17,6 +17,7 @@ import com.actiontech.dble.net.service.ServiceTask; import com.actiontech.dble.route.RouteResultsetNode; import com.actiontech.dble.route.parser.util.Pair; +import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession; import com.actiontech.dble.server.NonBlockingSession; import com.actiontech.dble.server.parser.ServerParse; import com.actiontech.dble.services.BusinessService; @@ -61,6 +62,7 @@ public class MySQLResponseService extends VariablesService { private volatile Object attachment; private volatile NonBlockingSession session; + private volatile RWSplitNonBlockingSession session2; private volatile boolean metaDataSynced = true; @@ -209,6 +211,11 @@ protected boolean beforeHandlingTask() { return false; } session.setBackendResponseTime(this); + } else if (session2 != null) { + // if (session2.isKilled()) { + // return false; + // } + session2.setBackendResponseTime(this); } return true; } @@ -783,6 +790,17 @@ public NonBlockingSession getSession() { public void setSession(NonBlockingSession session) { this.session = session; + this.session2 = null; + } + + + public void setSession2(RWSplitNonBlockingSession session2) { + this.session = null; + this.session2 = session2; + } + + public RWSplitNonBlockingSession getSession2() { + return session2; } public ResponseHandler getResponseHandler() { diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/PSHandler.java b/src/main/java/com/actiontech/dble/services/rwsplit/PSHandler.java index 829be817d1..ea3d5324fa 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/PSHandler.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/PSHandler.java @@ -42,6 +42,7 @@ public void execute(PhysicalDbGroup rwGroup) throws IOException { public void connectionAcquired(BackendConnection conn) { MySQLResponseService mysqlService = conn.getBackendService(); mysqlService.setResponseHandler(this); + mysqlService.setSession2(rwSplitService.getSession()); mysqlService.execute(rwSplitService, holder.getPrepareOrigin()); } diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java index e31cd41558..09f4012e03 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java @@ -50,6 +50,7 @@ public RWSplitHandler(RWSplitService service, byte[] originPacket, Callback call public void execute(final BackendConnection conn) { MySQLResponseService mysqlService = conn.getBackendService(); mysqlService.setResponseHandler(this); + mysqlService.setSession2(rwSplitService.getSession()); if (originPacket != null) { mysqlService.execute(rwSplitService, originPacket); } else if (isHint) { @@ -106,7 +107,7 @@ public void okResponse(byte[] data, AbstractService service) { MySQLResponseService mysqlService = (MySQLResponseService) service; boolean executeResponse = mysqlService.syncAndExecute(); if (executeResponse) { - + rwSplitService.getSession().setBackendResponseEndTime((MySQLResponseService) service); final OkPacket packet = new OkPacket(); packet.read(data); if ((packet.getServerStatus() & HAS_MORE_RESULTS) == 0) { @@ -118,6 +119,7 @@ public void okResponse(byte[] data, AbstractService service) { synchronized (this) { if (!write2Client) { + rwSplitService.getSession().setResponseTime(true); data[3] = (byte) rwSplitService.nextPacketId(); frontedConnection.write(data); if ((packet.getServerStatus() & HAS_MORE_RESULTS) == 0) { @@ -157,6 +159,7 @@ public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, @Override public void rowEofResponse(byte[] eof, boolean isLeft, AbstractService service) { synchronized (this) { + rwSplitService.getSession().setBackendResponseEndTime((MySQLResponseService) service); if (!write2Client) { eof[3] = (byte) rwSplitService.nextPacketId(); if ((eof[7] & HAS_MORE_RESULTS) == 0) { @@ -180,6 +183,7 @@ public void rowEofResponse(byte[] eof, boolean isLeft, AbstractService service) buffer = null; if ((eof[7] & HAS_MORE_RESULTS) == 0) { write2Client = true; + rwSplitService.getSession().setResponseTime(true); } } } @@ -217,6 +221,7 @@ public void connectionClose(AbstractService service, String reason) { @Override public void preparedOkResponse(byte[] ok, List fields, List params, MySQLResponseService service) { synchronized (this) { + rwSplitService.getSession().setBackendResponseEndTime((MySQLResponseService) service); if (buffer == null) { buffer = frontedConnection.allocate(); } @@ -239,6 +244,7 @@ public void preparedOkResponse(byte[] ok, List fields, List para callback.callback(true, ok, rwSplitService); } frontedConnection.write(buffer); + service.getSession2().setResponseTime(true); write2Client = true; buffer = null; } diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitQueryHandler.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitQueryHandler.java index fc593fed41..cfcea2f3f9 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitQueryHandler.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitQueryHandler.java @@ -3,17 +3,22 @@ import com.actiontech.dble.config.Capabilities; import com.actiontech.dble.config.ErrorCode; import com.actiontech.dble.net.handler.FrontendQueryHandler; +import com.actiontech.dble.net.mysql.CommandPacket; import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession; import com.actiontech.dble.server.ServerQueryHandler; import com.actiontech.dble.server.handler.SetHandler; import com.actiontech.dble.server.handler.UseHandler; import com.actiontech.dble.server.parser.RwSplitServerParse; +import com.actiontech.dble.singleton.AppendTraceId; import com.actiontech.dble.singleton.RouteService; import com.actiontech.dble.singleton.TraceManager; import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; + + public class RWSplitQueryHandler implements FrontendQueryHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ServerQueryHandler.class); @@ -41,11 +46,28 @@ public void query(String sql) { return; } int rs = RwSplitServerParse.parse(sql); + if (AppendTraceId.getInstance().isEnable()) { + sql = String.format("/*+ trace_id=%d-%d */ %s", session.getService().getConnection().getId(), session.getService().getSqlUniqueId().incrementAndGet(), sql); + } + + session.getService().setExecuteSql(sql); + session.endParse(); int hintLength = RouteService.isHintSql(sql); int sqlType = rs & 0xff; if (hintLength >= 0) { session.executeHint(sqlType, sql, null); } else { + + if (AppendTraceId.getInstance().isEnable()) { + CommandPacket packet = new CommandPacket(); + final byte COM_QUERY = 0x3; + packet.setCommand(COM_QUERY); + packet.setArg(sql.getBytes()); + packet.setPacketId(session.getService().getExecuteSqlBytes()[3]); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + packet.write(out); + session.getService().setExecuteSqlBytes(out.toByteArray()); + } if (sqlType != RwSplitServerParse.START && sqlType != RwSplitServerParse.BEGIN && sqlType != RwSplitServerParse.COMMIT && sqlType != RwSplitServerParse.ROLLBACK && sqlType != RwSplitServerParse.SET) { session.getService().singleTransactionsCount(); diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java index 701c39d702..c3ece8ae7a 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java @@ -6,6 +6,7 @@ import com.actiontech.dble.config.ErrorCode; import com.actiontech.dble.config.model.user.RwSplitUserConfig; import com.actiontech.dble.net.connection.AbstractConnection; +import com.actiontech.dble.net.mysql.CommandPacket; import com.actiontech.dble.net.mysql.MySQLPacket; import com.actiontech.dble.net.service.AuthResultInfo; import com.actiontech.dble.net.service.ServiceTask; @@ -13,15 +14,20 @@ import com.actiontech.dble.server.parser.ServerParse; import com.actiontech.dble.server.response.Heartbeat; import com.actiontech.dble.server.response.Ping; +import com.actiontech.dble.server.status.SlowQueryLog; +import com.actiontech.dble.server.trace.RwTraceResult; import com.actiontech.dble.server.variables.MysqlVariable; import com.actiontech.dble.services.BusinessService; +import com.actiontech.dble.singleton.AppendTraceId; import com.actiontech.dble.singleton.TsQueriesCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -44,6 +50,8 @@ public class RWSplitService extends BusinessService { public static final int LOCK_READ = 2; + AtomicInteger sqlUniqueId = new AtomicInteger(1); + // prepare statement private ConcurrentHashMap psHolder = new ConcurrentHashMap<>(); @@ -89,11 +97,13 @@ public void initFromAuthInfo(AuthResultInfo info) { @Override protected void taskToTotalQueue(ServiceTask task) { + session.setRequestTime(); DbleServer.getInstance().getFrontHandlerQueue().offer(task); } @Override protected void handleInnerData(byte[] data) { + session.startProcess(); // if the statement is load data, directly push down if (inLoadData) { session.execute(true, data, (isSuccess, resp, rwSplitService) -> { @@ -174,6 +184,9 @@ private void handleComInitDb(byte[] data) { try { switchSchema = mm.readString(getCharset().getClient()); session.execute(true, data, (isSuccess, resp, rwSplitService) -> { + if (isSuccess && SlowQueryLog.getInstance().isEnableSlowLog()) { + SlowQueryLog.getInstance().putSlowQueryLogForce(this.session.getService(), new RwTraceResult(), "use " + switchSchema); + } if (isSuccess) rwSplitService.setSchema(switchSchema); }); } catch (UnsupportedEncodingException e) { @@ -212,26 +225,45 @@ private void handleComStmtPrepare(byte[] data) { sql = sql.substring(0, sql.length() - 1).trim(); } sql = sql.trim(); - final String finalSql = sql; int rs = ServerParse.parse(sql); int sqlType = rs & 0xff; + + String tmpSql = sql; + byte[] tmpData = data; + if (AppendTraceId.getInstance().isEnable()) { + tmpSql = String.format("/*+ trace_id=%d-%d */ %s", session.getService().getConnection().getId(), getSqlUniqueId().incrementAndGet(), sql); + CommandPacket packet = new CommandPacket(); + final byte COM_STMT_PREPARE = 0x16; + packet.setCommand(COM_STMT_PREPARE); + packet.setArg(tmpSql.getBytes()); + packet.setPacketId(data[3]); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + packet.write(out); + tmpData = out.toByteArray(); + } + + final String finalSql = tmpSql; + setExecuteSql(finalSql); + final byte[] finalData = tmpData; + session.endParse(); + switch (sqlType) { case ServerParse.SELECT: int rs2 = ServerParse.parseSpecial(sqlType, sql); if (rs2 == LOCK_READ) { - session.execute(true, data, (isSuccess, resp, rwSplitService) -> { + session.execute(true, finalData, (isSuccess, resp, rwSplitService) -> { if (isSuccess) { long statementId = ByteUtil.readUB4(resp, 5); int paramCount = ByteUtil.readUB2(resp, 11); - psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql)); + psHolder.put(statementId, new PreparedStatementHolder(finalData, paramCount, true, finalSql)); } }, false); } else { - session.execute(null, data, (isSuccess, resp, rwSplitService) -> { + session.execute(null, finalData, (isSuccess, resp, rwSplitService) -> { if (isSuccess) { long statementId = ByteUtil.readUB4(resp, 5); int paramCount = ByteUtil.readUB2(resp, 11); - psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, false, finalSql)); + psHolder.put(statementId, new PreparedStatementHolder(finalData, paramCount, false, finalSql)); } }, false); } @@ -241,7 +273,7 @@ private void handleComStmtPrepare(byte[] data) { if (isSuccess) { long statementId = ByteUtil.readUB4(resp, 5); int paramCount = ByteUtil.readUB2(resp, 11); - psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql)); + psHolder.put(statementId, new PreparedStatementHolder(finalData, paramCount, true, finalSql)); } }); break; @@ -255,6 +287,10 @@ private void execute(byte[] data) { session.execute(true, data, null); } + public AtomicInteger getSqlUniqueId() { + return sqlUniqueId; + } + public RwSplitUserConfig getUserConfig() { return (RwSplitUserConfig) userConfig; } @@ -293,6 +329,10 @@ public byte[] getExecuteSqlBytes() { return executeSqlBytes; } + public void setExecuteSqlBytes(byte[] executeSqlBytes) { + this.executeSqlBytes = executeSqlBytes; + } + public boolean isInPrepare() { return inPrepare; } diff --git a/src/main/java/com/actiontech/dble/singleton/AppendTraceId.java b/src/main/java/com/actiontech/dble/singleton/AppendTraceId.java new file mode 100644 index 0000000000..8221d5c447 --- /dev/null +++ b/src/main/java/com/actiontech/dble/singleton/AppendTraceId.java @@ -0,0 +1,30 @@ +package com.actiontech.dble.singleton; + + +import com.actiontech.dble.config.model.SystemConfig; + +public final class AppendTraceId { + + private static final AppendTraceId INSTANCE = new AppendTraceId(); + private volatile int value; + + public static AppendTraceId getInstance() { + return INSTANCE; + } + + public AppendTraceId() { + this.value = SystemConfig.getInstance().getAppendTraceId(); + } + + public boolean isEnable() { + return value == 1; + } + + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } +} diff --git a/src/main/java/com/actiontech/dble/singleton/SystemParams.java b/src/main/java/com/actiontech/dble/singleton/SystemParams.java index c977f09152..6169c3c5dd 100644 --- a/src/main/java/com/actiontech/dble/singleton/SystemParams.java +++ b/src/main/java/com/actiontech/dble/singleton/SystemParams.java @@ -144,6 +144,7 @@ public List getVolatileParams() { params.add(new ParamInfo("flushSlowLogSize", SlowQueryLog.getInstance().getFlushSize() + "", "The max size for flushing log to disk, the default is 1000")); params.add(new ParamInfo("enableAlert", AlertUtil.isEnable() + "", "enable or disable alert")); params.add(new ParamInfo("capClientFoundRows", CapClientFoundRows.getInstance().isEnableCapClientFoundRows() + "", "Whether to turn on EOF_Packet to return found rows,The default value is false")); + params.add(new ParamInfo("appendTraceId", AppendTraceId.getInstance().getValue() + "", "append the trace id to the sql")); return params; } diff --git a/src/main/resources/bootstrap_template.cnf b/src/main/resources/bootstrap_template.cnf index bb205a963e..0b34a524c5 100644 --- a/src/main/resources/bootstrap_template.cnf +++ b/src/main/resources/bootstrap_template.cnf @@ -90,7 +90,7 @@ -DflushSlowLogSize=1000 # the threshold for judging if the query is slow , unit is millisecond -DsqlSlowTime=100 - +-DappendTraceId=0 #-DenableAsyncRelease=1 #-DreleaseTimeout=10