diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..a0ccf77 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,5 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Environment-dependent path to Maven home directory +/mavenHomeManager.xml diff --git a/.idea/compiler.xml b/.idea/compiler.xml new file mode 100644 index 0000000..156f7ea --- /dev/null +++ b/.idea/compiler.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/jarRepositories.xml b/.idea/jarRepositories.xml new file mode 100644 index 0000000..712ab9d --- /dev/null +++ b/.idea/jarRepositories.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/libraries/starrocks_fe.xml b/.idea/libraries/starrocks_fe.xml new file mode 100644 index 0000000..9e23ba1 --- /dev/null +++ b/.idea/libraries/starrocks_fe.xml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..41cb282 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,12 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/README.md b/README.md index e752b51..32a7414 100644 --- a/README.md +++ b/README.md @@ -25,47 +25,43 @@ create database starrocks_audit_db__; ```SQL CREATE TABLE starrocks_audit_db__.starrocks_audit_tbl__ ( - `queryId` VARCHAR(64) COMMENT "查询的唯一ID", - `timestamp` DATETIME NOT NULL COMMENT "查询开始时间", - `queryType` VARCHAR(12) COMMENT "查询类型(query, slow_query, connection)", - `clientIp` VARCHAR(32) COMMENT "客户端IP", - `user` VARCHAR(64) COMMENT "查询用户名", - `authorizedUser` VARCHAR(64) COMMENT "用户唯一标识,既user_identity", - `resourceGroup` VARCHAR(64) COMMENT "资源组名", - `catalog` VARCHAR(32) COMMENT "数据目录名", - `db` VARCHAR(96) COMMENT "查询所在数据库", - `state` VARCHAR(8) COMMENT "查询状态(EOF,ERR,OK)", - `errorCode` VARCHAR(512) COMMENT "错误码", - `queryTime` BIGINT COMMENT "查询执行时间(毫秒)", - `scanBytes` BIGINT COMMENT "查询扫描的字节数", - `scanRows` BIGINT COMMENT "查询扫描的记录行数", - `returnRows` BIGINT COMMENT "查询返回的结果行数", - `cpuCostNs` BIGINT COMMENT "查询CPU耗时(纳秒)", - `memCostBytes` BIGINT COMMENT "查询消耗内存(字节)", - `stmtId` INT COMMENT "SQL语句增量ID", - `isQuery` TINYINT COMMENT "SQL是否为查询(1或0)", - `feIp` VARCHAR(128) COMMENT "执行该语句的FE IP", - `stmt` VARCHAR(1048576) COMMENT "SQL原始语句", - `digest` VARCHAR(32) COMMENT "慢SQL指纹", - `planCpuCosts` DOUBLE COMMENT "查询规划阶段CPU占用(纳秒)", - `planMemCosts` DOUBLE COMMENT "查询规划阶段内存占用(字节)", - `pendingTimeMs` BIGINT COMMENT "查询在队列中等待的时间(毫秒)", - `candidateMVs` varchar(65533) NULL COMMENT "候选MV列表", - `hitMvs` varchar(65533) NULL COMMENT "命中MV列表", - `warehouse` VARCHAR(128) NULL COMMENT "仓库名称" + `queryId` VARCHAR(64) COMMENT "查询的唯一ID", + `timestamp` DATETIME NOT NULL COMMENT "查询开始时间", + `queryType` VARCHAR(12) COMMENT "查询类型(query, slow_query, connection)", + `clientIp` VARCHAR(32) COMMENT "客户端IP", + `user` VARCHAR(64) COMMENT "查询用户名", + `authorizedUser` VARCHAR(64) COMMENT "用户唯一标识,既user_identity", + `resourceGroup` VARCHAR(64) COMMENT "资源组名", + `catalog` VARCHAR(32) COMMENT "Catalog名", + `db` VARCHAR(96) COMMENT "查询所在数据库", + `state` VARCHAR(8) COMMENT "查询状态(EOF,ERR,OK)", + `errorCode` VARCHAR(512) COMMENT "错误码", + `queryTime` BIGINT COMMENT "查询执行时间(毫秒)", + `scanBytes` BIGINT COMMENT "查询扫描的字节数", + `scanRows` BIGINT COMMENT "查询扫描的记录行数", + `returnRows` BIGINT COMMENT "查询返回的结果行数", + `cpuCostNs` BIGINT COMMENT "查询CPU耗时(纳秒)", + `memCostBytes` BIGINT COMMENT "查询消耗内存(字节)", + `stmtId` INT COMMENT "SQL语句增量ID", + `isQuery` TINYINT COMMENT "SQL是否为查询(1或0)", + `feIp` VARCHAR(128) COMMENT "执行该语句的FE IP", + `stmt` VARCHAR(1048576) COMMENT "原始SQL语句", + `digest` VARCHAR(32) COMMENT "慢SQL指纹", + `planCpuCosts` DOUBLE COMMENT "查询规划阶段CPU占用(纳秒)", + `planMemCosts` DOUBLE COMMENT "查询规划阶段内存占用(字节)", + `pendingTimeMs` BIGINT COMMENT "查询在队列中等待的时间(毫秒)", + `candidateMVs` VARCHAR(65533) COMMENT "候选MV列表", + `hitMvs` VARCHAR(65533) COMMENT "命中MV列表", + `warehouse` VARCHAR(128) COMMENT "仓库名称", + `isForwardToLeader` BOOLEAN COMMENT "是否转发leader (1或0)", + `queryFeMemory` BIGINT COMMENT "查询在 FE 的内存消耗" ) ENGINE = OLAP DUPLICATE KEY (`queryId`, `timestamp`, `queryType`) COMMENT "审计日志表" -PARTITION BY RANGE (`timestamp`) () -DISTRIBUTED BY HASH (`queryId`) BUCKETS 3 +PARTITION BY date_trunc('day', `timestamp`) PROPERTIES ( - "dynamic_partition.time_unit" = "DAY", - "dynamic_partition.start" = "-30", --表示只保留最近30天的审计信息,可视需求调整 - "dynamic_partition.end" = "3", - "dynamic_partition.prefix" = "p", - "dynamic_partition.buckets" = "3", - "dynamic_partition.enable" = "true", - "replication_num" = "3" --若集群中BE个数不大于3,可调整副本数为1,生产集群不推荐调整 + "replication_num" = "1", + "partition_live_number"="30" ); ``` diff --git a/lib/starrocks-fe.jar b/lib/starrocks-fe.jar index c1c05d6..2896ae7 100644 Binary files a/lib/starrocks-fe.jar and b/lib/starrocks-fe.jar differ diff --git a/pom.xml b/pom.xml index 4345210..1b3de52 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ com.starrocks starrocks-fe - 3.0.6 + 3.3.15 system ${basedir}/lib/starrocks-fe.jar diff --git a/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java b/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java index 805f946..53348c5 100644 --- a/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java +++ b/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java @@ -26,8 +26,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import javax.crypto.Cipher; -import javax.crypto.spec.SecretKeySpec; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; @@ -78,6 +76,14 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { * 是否包含新字段 hitMVsExists,如果旧版本没有该字段则值为空 */ private boolean hitMVsExists; + /** + * 是否包含新字段isForwardToLeader,如果旧版本没有该字段则值为空 + */ + private boolean isForwardToLeaderExists; + /** + * 是否包含新字段queryFeMemory,如果旧版本没有该字段则值为空 + */ + private boolean queryFeMemoryExists; @Override public void init(PluginInfo info, PluginContext ctx) throws PluginException { @@ -97,6 +103,8 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException { candidateMvsExists = hasField(AuditEvent.class, "candidateMvs"); hitMVsExists = hasField(AuditEvent.class, "hitMVs"); + isForwardToLeaderExists = hasField(AuditEvent.class, "isForwardToLeader"); + queryFeMemoryExists = hasField(AuditEvent.class,"queryFeMemory"); isInit = true; } @@ -170,6 +178,8 @@ private void assembleAudit(AuditEvent event) { } String candidateMvsVal = candidateMvsExists ? event.candidateMvs : ""; String hitMVsVal = hitMVsExists ? event.hitMVs : ""; + Boolean isForwardToLeaderVal = isForwardToLeaderExists ? event.isForwardToLeader : null; + Long queryFeMemoryVal = queryFeMemoryExists ? event.queryFeMemory : null; String content = "{\"queryId\":\"" + getQueryId(queryType, event) + "\"," + "\"timestamp\":\"" + longToTimeString(event.timestamp) + "\"," + "\"queryType\":\"" + queryType + "\"," + @@ -197,7 +207,9 @@ private void assembleAudit(AuditEvent event) { "\"pendingTimeMs\":" + event.pendingTimeMs + "," + "\"candidateMVs\":\"" + candidateMvsVal + "\"," + "\"hitMvs\":\"" + hitMVsVal + "\"," + - "\"warehouse\":\"" + event.warehouse + "\"}"; + "\"warehouse\":\"" + event.warehouse + "\"," + + "\"isForwardToLeader\":" + isForwardToLeaderVal + "," + + "\"queryFeMemory\":" + queryFeMemoryVal + "}"; if (auditBuffer.length() > 0) { auditBuffer.append(","); } diff --git a/src/main/java/com/starrocks/plugin/audit/StarrocksStreamLoader.java b/src/main/java/com/starrocks/plugin/audit/StarrocksStreamLoader.java index 8f2a62c..3bc6ba7 100644 --- a/src/main/java/com/starrocks/plugin/audit/StarrocksStreamLoader.java +++ b/src/main/java/com/starrocks/plugin/audit/StarrocksStreamLoader.java @@ -89,7 +89,7 @@ private HttpURLConnection getConnection(String urlStr, String label) throws IOEx conn.addRequestProperty("label", label); conn.addRequestProperty("max_filter_ratio", "1.0"); - conn.addRequestProperty("columns", "queryId,timestamp,queryType,clientIp,user,authorizedUser,resourceGroup,catalog,db,state,errorCode,queryTime,scanBytes,scanRows,returnRows,cpuCostNs,memCostBytes,stmtId,isQuery,feIp,stmt,digest,planCpuCosts,planMemCosts,pendingTimeMs,candidateMVs,hitMvs,warehouse"); + conn.addRequestProperty("columns", "queryId,timestamp,queryType,clientIp,user,authorizedUser,resourceGroup,catalog,db,state,errorCode,queryTime,scanBytes,scanRows,returnRows,cpuCostNs,memCostBytes,stmtId,isQuery,feIp,stmt,digest,planCpuCosts,planMemCosts,pendingTimeMs,candidateMVs,hitMvs,warehouse,isForwardToLeader,queryFeMemory"); if(!StringUtils.isBlank(this.streamLoadFilter)) { conn.addRequestProperty("where", streamLoadFilter); } @@ -115,7 +115,7 @@ private String toCurl(HttpURLConnection conn) { sb.append("-H \"").append("where\":").append(streamLoadFilter).append(" \\\n "); } sb.append("-H \"").append("columns\":").append("\"queryId, timestamp, queryType, clientIp, user, authorizedUser, resourceGroup, catalog, db, state, errorCode," + - "queryTime, scanBytes, scanRows, returnRows, cpuCostNs, memCostBytes, stmtId, isQuery, feIp, stmt, digest, planCpuCosts, planMemCosts, pendingTimeMs, candidateMVs, hitMvs, warehouse\" \\\n "); + "queryTime, scanBytes, scanRows, returnRows, cpuCostNs, memCostBytes, stmtId, isQuery, feIp, stmt, digest, planCpuCosts, planMemCosts, pendingTimeMs, candidateMVs, hitMvs, warehouse, isForwardToLeader, queryFeMemory\" \\\n "); sb.append("\"").append(conn.getURL()).append("\""); return sb.toString(); }