Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions .idea/compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions .idea/jarRepositories.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/libraries/starrocks_fe.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 33 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,47 +25,43 @@ create database starrocks_audit_db__;

```SQL
CREATE TABLE starrocks_audit_db__.starrocks_audit_tbl__ (
`queryId` VARCHAR(64) COMMENT "查询的唯一ID",
`timestamp` DATETIME NOT NULL COMMENT "查询开始时间",
`queryType` VARCHAR(12) COMMENT "查询类型(query, slow_query, connection)",
`clientIp` VARCHAR(32) COMMENT "客户端IP",
`user` VARCHAR(64) COMMENT "查询用户名",
`authorizedUser` VARCHAR(64) COMMENT "用户唯一标识,既user_identity",
`resourceGroup` VARCHAR(64) COMMENT "资源组名",
`catalog` VARCHAR(32) COMMENT "数据目录名",
`db` VARCHAR(96) COMMENT "查询所在数据库",
`state` VARCHAR(8) COMMENT "查询状态(EOF,ERR,OK)",
`errorCode` VARCHAR(512) COMMENT "错误码",
`queryTime` BIGINT COMMENT "查询执行时间(毫秒)",
`scanBytes` BIGINT COMMENT "查询扫描的字节数",
`scanRows` BIGINT COMMENT "查询扫描的记录行数",
`returnRows` BIGINT COMMENT "查询返回的结果行数",
`cpuCostNs` BIGINT COMMENT "查询CPU耗时(纳秒)",
`memCostBytes` BIGINT COMMENT "查询消耗内存(字节)",
`stmtId` INT COMMENT "SQL语句增量ID",
`isQuery` TINYINT COMMENT "SQL是否为查询(1或0)",
`feIp` VARCHAR(128) COMMENT "执行该语句的FE IP",
`stmt` VARCHAR(1048576) COMMENT "SQL原始语句",
`digest` VARCHAR(32) COMMENT "慢SQL指纹",
`planCpuCosts` DOUBLE COMMENT "查询规划阶段CPU占用(纳秒)",
`planMemCosts` DOUBLE COMMENT "查询规划阶段内存占用(字节)",
`pendingTimeMs` BIGINT COMMENT "查询在队列中等待的时间(毫秒)",
`candidateMVs` varchar(65533) NULL COMMENT "候选MV列表",
`hitMvs` varchar(65533) NULL COMMENT "命中MV列表",
`warehouse` VARCHAR(128) NULL COMMENT "仓库名称"
`queryId` VARCHAR(64) COMMENT "查询的唯一ID",
`timestamp` DATETIME NOT NULL COMMENT "查询开始时间",
`queryType` VARCHAR(12) COMMENT "查询类型(query, slow_query, connection)",
`clientIp` VARCHAR(32) COMMENT "客户端IP",
`user` VARCHAR(64) COMMENT "查询用户名",
`authorizedUser` VARCHAR(64) COMMENT "用户唯一标识,既user_identity",
`resourceGroup` VARCHAR(64) COMMENT "资源组名",
`catalog` VARCHAR(32) COMMENT "Catalog名",
`db` VARCHAR(96) COMMENT "查询所在数据库",
`state` VARCHAR(8) COMMENT "查询状态(EOF,ERR,OK)",
`errorCode` VARCHAR(512) COMMENT "错误码",
`queryTime` BIGINT COMMENT "查询执行时间(毫秒)",
`scanBytes` BIGINT COMMENT "查询扫描的字节数",
`scanRows` BIGINT COMMENT "查询扫描的记录行数",
`returnRows` BIGINT COMMENT "查询返回的结果行数",
`cpuCostNs` BIGINT COMMENT "查询CPU耗时(纳秒)",
`memCostBytes` BIGINT COMMENT "查询消耗内存(字节)",
`stmtId` INT COMMENT "SQL语句增量ID",
`isQuery` TINYINT COMMENT "SQL是否为查询(1或0)",
`feIp` VARCHAR(128) COMMENT "执行该语句的FE IP",
`stmt` VARCHAR(1048576) COMMENT "原始SQL语句",
`digest` VARCHAR(32) COMMENT "慢SQL指纹",
`planCpuCosts` DOUBLE COMMENT "查询规划阶段CPU占用(纳秒)",
`planMemCosts` DOUBLE COMMENT "查询规划阶段内存占用(字节)",
`pendingTimeMs` BIGINT COMMENT "查询在队列中等待的时间(毫秒)",
`candidateMVs` VARCHAR(65533) COMMENT "候选MV列表",
`hitMvs` VARCHAR(65533) COMMENT "命中MV列表",
`warehouse` VARCHAR(128) COMMENT "仓库名称",
`isForwardToLeader` BOOLEAN COMMENT "是否转发leader (1或0)",
`queryFeMemory` BIGINT COMMENT "查询在 FE 的内存消耗"
) ENGINE = OLAP
DUPLICATE KEY (`queryId`, `timestamp`, `queryType`)
COMMENT "审计日志表"
PARTITION BY RANGE (`timestamp`) ()
DISTRIBUTED BY HASH (`queryId`) BUCKETS 3
PARTITION BY date_trunc('day', `timestamp`)
PROPERTIES (
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30", --表示只保留最近30天的审计信息,可视需求调整
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "3",
"dynamic_partition.enable" = "true",
"replication_num" = "3" --若集群中BE个数不大于3,可调整副本数为1,生产集群不推荐调整
"replication_num" = "1",
"partition_live_number"="30"
);
```

Expand Down
Binary file modified lib/starrocks-fe.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-fe</artifactId>
<version>3.0.6</version>
<version>3.3.15</version>
<scope>system</scope>
<systemPath>${basedir}/lib/starrocks-fe.jar</systemPath>
</dependency>
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -78,6 +76,14 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
* 是否包含新字段 hitMVsExists,如果旧版本没有该字段则值为空
*/
private boolean hitMVsExists;
/**
* 是否包含新字段isForwardToLeader,如果旧版本没有该字段则值为空
*/
private boolean isForwardToLeaderExists;
/**
* 是否包含新字段queryFeMemory,如果旧版本没有该字段则值为空
*/
private boolean queryFeMemoryExists;

@Override
public void init(PluginInfo info, PluginContext ctx) throws PluginException {
Expand All @@ -97,6 +103,8 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException {

candidateMvsExists = hasField(AuditEvent.class, "candidateMvs");
hitMVsExists = hasField(AuditEvent.class, "hitMVs");
isForwardToLeaderExists = hasField(AuditEvent.class, "isForwardToLeader");
queryFeMemoryExists = hasField(AuditEvent.class,"queryFeMemory");

isInit = true;
}
Expand Down Expand Up @@ -170,6 +178,8 @@ private void assembleAudit(AuditEvent event) {
}
String candidateMvsVal = candidateMvsExists ? event.candidateMvs : "";
String hitMVsVal = hitMVsExists ? event.hitMVs : "";
Boolean isForwardToLeaderVal = isForwardToLeaderExists ? event.isForwardToLeader : null;
Long queryFeMemoryVal = queryFeMemoryExists ? event.queryFeMemory : null;
String content = "{\"queryId\":\"" + getQueryId(queryType, event) + "\"," +
"\"timestamp\":\"" + longToTimeString(event.timestamp) + "\"," +
"\"queryType\":\"" + queryType + "\"," +
Expand Down Expand Up @@ -197,7 +207,9 @@ private void assembleAudit(AuditEvent event) {
"\"pendingTimeMs\":" + event.pendingTimeMs + "," +
"\"candidateMVs\":\"" + candidateMvsVal + "\"," +
"\"hitMvs\":\"" + hitMVsVal + "\"," +
"\"warehouse\":\"" + event.warehouse + "\"}";
"\"warehouse\":\"" + event.warehouse + "\"," +
"\"isForwardToLeader\":" + isForwardToLeaderVal + "," +
"\"queryFeMemory\":" + queryFeMemoryVal + "}";
if (auditBuffer.length() > 0) {
auditBuffer.append(",");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private HttpURLConnection getConnection(String urlStr, String label) throws IOEx

conn.addRequestProperty("label", label);
conn.addRequestProperty("max_filter_ratio", "1.0");
conn.addRequestProperty("columns", "queryId,timestamp,queryType,clientIp,user,authorizedUser,resourceGroup,catalog,db,state,errorCode,queryTime,scanBytes,scanRows,returnRows,cpuCostNs,memCostBytes,stmtId,isQuery,feIp,stmt,digest,planCpuCosts,planMemCosts,pendingTimeMs,candidateMVs,hitMvs,warehouse");
conn.addRequestProperty("columns", "queryId,timestamp,queryType,clientIp,user,authorizedUser,resourceGroup,catalog,db,state,errorCode,queryTime,scanBytes,scanRows,returnRows,cpuCostNs,memCostBytes,stmtId,isQuery,feIp,stmt,digest,planCpuCosts,planMemCosts,pendingTimeMs,candidateMVs,hitMvs,warehouse,isForwardToLeader,queryFeMemory");
if(!StringUtils.isBlank(this.streamLoadFilter)) {
conn.addRequestProperty("where", streamLoadFilter);
}
Expand All @@ -115,7 +115,7 @@ private String toCurl(HttpURLConnection conn) {
sb.append("-H \"").append("where\":").append(streamLoadFilter).append(" \\\n ");
}
sb.append("-H \"").append("columns\":").append("\"queryId, timestamp, queryType, clientIp, user, authorizedUser, resourceGroup, catalog, db, state, errorCode," +
"queryTime, scanBytes, scanRows, returnRows, cpuCostNs, memCostBytes, stmtId, isQuery, feIp, stmt, digest, planCpuCosts, planMemCosts, pendingTimeMs, candidateMVs, hitMvs, warehouse\" \\\n ");
"queryTime, scanBytes, scanRows, returnRows, cpuCostNs, memCostBytes, stmtId, isQuery, feIp, stmt, digest, planCpuCosts, planMemCosts, pendingTimeMs, candidateMVs, hitMvs, warehouse, isForwardToLeader, queryFeMemory\" \\\n ");
sb.append("\"").append(conn.getURL()).append("\"");
return sb.toString();
}
Expand Down