Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 11 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ CREATE TABLE starrocks_audit_db__.starrocks_audit_tbl__ (
`planCpuCosts` DOUBLE COMMENT "查询规划阶段CPU占用(纳秒)",
`planMemCosts` DOUBLE COMMENT "查询规划阶段内存占用(字节)",
`pendingTimeMs` BIGINT COMMENT "查询在队列中等待的时间(毫秒)",
`candidateMVs` varchar(65533) NULL COMMENT "候选MV列表",
`hitMvs` varchar(65533) NULL COMMENT "命中MV列表",
`warehouse` VARCHAR(128) NULL COMMENT "仓库名称"
`candidateMVs` VARCHAR(65533) NULL COMMENT "候选MV列表",
`hitMvs` VARCHAR(65533) NULL COMMENT "命中MV列表",
`warehouse` VARCHAR(32) NULL COMMENT "warehouse name"
) ENGINE = OLAP
DUPLICATE KEY (`queryId`, `timestamp`, `queryType`)
COMMENT "审计日志表"
Expand Down Expand Up @@ -136,18 +136,17 @@ password=
# StarRocks password encryption key, with a length not exceeding 16 bytes
secret_key=

# Whether to generate sql digest for all queries
enable_compute_all_query_digest=false

# Filter conditions when importing audit information
filter=

# Timeout for uninstalling the plugin, default is 5000 ms
uninstall_timeout=5000
```

**说明**:
1. 推荐使用参数 `frontend_host_port` 的默认配置,即 `127.0.0.1:8030` 。StarRocks 中各个 FE 是独立管理各自的审计信息的,在安装审计插件后,各个 FE 分别会启动各自的后台线程进行审计信息的获取攒批和 Stream Load 写入。 `frontend_host_port` 配置项用于为插件后台 Stream Load 任务提供 http 协议的 IP 和端口,该参数不支持配置为多个值。其中,参数 IP 部分可以使用集群内任意某个 FE 的 IP,但并不推荐这样配置,因为若对应的 FE 出现异常,其他 FE 后台的审计信息写入任务也会因无法通信导致写入失败。推荐配置为默认的 `127.0.0.1:8030`,让各个 FE 均使用自身的 http 端口进行通信,以此规避其他 FE 异常时对通信的影响(当然,所有的写入任务最终都会被自动转发到 FE Leader 节点执行)。
2. `secret_key` 参数用于配置"加密密码的 key 字符串",在审计插件中其长度不得超过 16 个字节。如果该参数留空,表示不对 `plugin.conf` 中的密码进行加解密,在 password 处直接配置明文密码即可。如果该参数不为空,表示需要对密码进行加解密,password 处需配置为加密后的字符串,加密后的密码可在 StarRocks 中通过 `AES_ENCRYPT` 函数生成:`SELECT TO_BASE64(AES_ENCRYPT('password','secret_key'));`。
3. `enable_compute_all_query_digest` 参数表示是否对所有查询都生成 Hash SQL 指纹(StarRocks 默认只为慢查询开启 SQL 指纹,注意插件中的指纹计算方法与 FE 内部的方法不一致,FE 会对 SQL 语句[规范化处理](https://docs.mirrorship.cn/zh/docs/administration/Query_planning/#%E6%9F%A5%E7%9C%8B-sql-%E6%8C%87%E7%BA%B9),而插件不会,且如果开启该参数,指纹计算会额外占用集群内的计算资源)。
4. `filter` 参数可以配置审计信息入库的过滤条件,该处使用 Stream Load 中 [where 参数](https://docs.mirrorship.cn/zh/docs/sql-reference/sql-statements/data-manipulation/STREAM_LOAD/#opt_properties)实现,即`-H "where: <condition>"`,默认为空,配置示例:`filter=isQuery=1 and clientIp like '127.0.0.1%' and user='root'`。
3. `filter` 参数可以配置审计信息入库的过滤条件,该处使用 Stream Load 中 [where 参数](https://docs.mirrorship.cn/zh/docs/sql-reference/sql-statements/data-manipulation/STREAM_LOAD/#opt_properties)实现,即`-H "where: <condition>"`,默认为空,配置示例:`filter=isQuery=1 and clientIp like '127.0.0.1%' and user='root'`。


修改完成后,再将上面的三个文件重新打包为 zip 包:
Expand Down Expand Up @@ -183,7 +182,7 @@ INSTALL PLUGIN FROM "/location/plugindemo.zip";
若通过网络路径安装,还需要在安装命令的属性中提供插件压缩包的 md5 信息,语法示例:

```sql
INSTALL PLUGIN FROM "http://192.168.141.203/extra/auditloader.zip" PROPERTIES("md5sum" = "3975F7B880C9490FE95F42E2B2A28E2D");
INSTALL PLUGIN FROM "http://127.0.0.1/extra/auditloader.zip" PROPERTIES("md5sum" = "3975F7B880C9490FE95F42E2B2A28E2D");
```

以安装本地插件包为例,根据上文分发文件的路径修改命令后执行:
Expand All @@ -210,9 +209,9 @@ JavaVersion: 1.8.31
*************************** 2. row ***************************
Name: AuditLoader
Type: AUDIT
Description: Available for versions 2.5+. Load audit log to starrocks, and user can view the statistic of queries
Version: 4.2.1
JavaVersion: 1.8.0
Description: Available for versions 3.3.11+. Load audit log to starrocks, and user can view the statistic of queries
Version: 5.0.0
JavaVersion: 11
ClassName: com.starrocks.plugin.audit.AuditLoaderPlugin
SoName: NULL
Sources: /opt/module/starrocks/auditloader.zip
Expand Down Expand Up @@ -267,19 +266,3 @@ StarRocks审计表中支持的 `queryType` 类型包括:query、slow_query 和

对于 connection,StarRocks 3.0.6+ 版本支持在 fe.audit.log 中打印客户端连接时成功/失败的 connection 信息,您可以在 `fe.conf` 里配置 `audit_log_modules=slow_query,query,connection`,然后重启 FE 来进行启用。在启用 connection 信息后,AuditLoader 插件同样能采集到这类客户端连接信息并入库到表 `starrocks_audit_tbl__` 中,入库后该类信息对应的审计表的 `queryType` 字段即为 connection,您可以以此进行用户登录信息的审计。



### 更新说明:

##### AuditLoader v4.2.1

1)新增在 plugin.conf 中配置密文密码的功能

2)在审计日志表中预留增加了 candidateMVs 和 hitMvs 两个重要监测字段

3)新增在 plugin.conf 中通过 filter 参数进行审计信息的入库条件筛选功能

4)调整插件攒批逻辑为 Json,规避 StarRocks 3.2.12+ 等版本 FE netty 依赖升级导致的原 CSV 攒批逻辑在写入时报错 `Validation failed for header 'column_separator'` 的问题

5)其他细节优化

Binary file modified lib/starrocks-fe.jar
Binary file not shown.
204 changes: 204 additions & 0 deletions plugin.conf.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
### ============================================
### StarRocks Audit Loader Plugin Configuration
### with Kafka Integration Support
### ============================================

### --------------------------------------------
### Basic Stream Load Settings (Legacy)
### --------------------------------------------

# The max size of a batch, default is 50MB
max_batch_size=52428800

# The max interval of batch loaded, default is 60 seconds
max_batch_interval_sec=60

# StarRocks FE host for loading the audit, default is 127.0.0.1:8030
# This should be the host port for stream load
frontend_host_port=127.0.0.1:8030

# If the response time of a query exceed this threshold, it will be recorded in audit table as slow_query
qe_slow_log_ms=5000

# The capacity of audit queue, default is 1000
max_queue_size=1000

# Database of the audit table
database=starrocks_audit_db__

# Audit table name, to save the audit data
table=starrocks_audit_tbl__

# StarRocks user. This user must have import permissions for the audit table
user=root

# StarRocks user's password
password=

# StarRocks password encryption key, with a length not exceeding 16 bytes
secret_key=

# The max stmt length to be loaded in audit table, default is 1048576
max_stmt_length=1048576

# Whether to generate sql digest for all queries
enable_compute_all_query_digest=false

# Filter conditions when importing audit information
filter=

# Connection timeout (milliseconds)
connect_timeout=1000

# Read timeout (milliseconds)
read_timeout=1000


### ============================================
### NEW: Output Routing Configuration
### ============================================

# Output mode: streamload, kafka, dual, fallback
# - streamload: Only use StarRocks Stream Load (default, backward compatible)
# - kafka: Only use Kafka output
# - dual: Use both Stream Load and Kafka simultaneously (parallel)
# - fallback: Use primary output, fallback to secondary on failure
output_mode=streamload

# Primary output handler (for fallback mode)
# Options: kafka, streamload
primary_output=kafka

# Secondary output handler (for fallback mode)
# Options: kafka, streamload
secondary_output=streamload


### ============================================
### Kafka Producer Configuration
### ============================================

# Enable Kafka output
kafka.enabled=false

# Kafka broker addresses (comma-separated)
kafka.bootstrap.servers=localhost:9092

# Kafka topic name
kafka.topic=starrocks_audit_logs

# Async mode (true: asynchronous, false: synchronous)
# Async mode provides better performance but may lose messages on crash
# Sync mode ensures delivery but has higher latency
kafka.async_mode=true

### --------------------------------------------
### Kafka Performance Settings
### --------------------------------------------

# Batch size in bytes (default: 16384 = 16KB)
# Larger batches improve throughput but increase latency
kafka.batch.size=16384

# Linger time in milliseconds (default: 10ms)
# How long to wait for additional messages before sending a batch
kafka.linger.ms=10

# Compression type: none, gzip, snappy, lz4, zstd
# snappy: Good balance of compression and CPU usage (recommended)
# lz4: Fast compression with moderate ratio
# gzip: Best compression but high CPU usage
kafka.compression.type=snappy

# Producer buffer memory in bytes (default: 33554432 = 32MB)
kafka.buffer.memory=33554432

### --------------------------------------------
### Kafka Reliability Settings
### --------------------------------------------

# Acknowledgment level:
# 0: No acknowledgment (fastest, may lose data)
# 1: Leader acknowledgment only (balanced, recommended)
# all/-1: All in-sync replicas acknowledgment (safest, slowest)
kafka.acks=1

# Number of retries on failure
kafka.retries=3

# Maximum number of unacknowledged requests per connection
kafka.max.in.flight.requests.per.connection=5

### --------------------------------------------
### Kafka Timeout Settings
### --------------------------------------------

# Request timeout in milliseconds
kafka.request.timeout.ms=30000

# Delivery timeout in milliseconds
kafka.delivery.timeout.ms=120000

### --------------------------------------------
### Kafka Security Settings (Optional)
### --------------------------------------------

# SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
# kafka.sasl.mechanism=PLAIN

# SASL JAAS configuration
# Example for PLAIN:
# kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";
# Example for SCRAM-SHA-256:
# kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password";

# Security protocol: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
# kafka.security.protocol=SASL_SSL

# SSL truststore location
# kafka.ssl.truststore.location=/path/to/truststore.jks

# SSL truststore password
# kafka.ssl.truststore.password=password


### ============================================
### Configuration Examples
### ============================================

### Example 1: Kafka Only (High Performance)
# output_mode=kafka
# kafka.enabled=true
# kafka.bootstrap.servers=kafka-broker1:9092,kafka-broker2:9092
# kafka.topic=starrocks_audit_logs
# kafka.async_mode=true
# kafka.acks=1
# kafka.compression.type=snappy

### Example 2: Dual Mode (Both Outputs)
# output_mode=dual
# kafka.enabled=true
# kafka.bootstrap.servers=kafka-broker:9092
# kafka.topic=starrocks_audit_logs
# frontend_host_port=127.0.0.1:8030
# database=starrocks_audit_db__
# table=starrocks_audit_tbl__

### Example 3: Fallback Mode (Fault Tolerance)
# output_mode=fallback
# primary_output=kafka
# secondary_output=streamload
# kafka.enabled=true
# kafka.bootstrap.servers=kafka-broker:9092
# kafka.topic=starrocks_audit_logs
# frontend_host_port=127.0.0.1:8030
# database=starrocks_audit_db__
# table=starrocks_audit_tbl__

### Example 4: Stream Load Only (Default/Legacy)
# output_mode=streamload
# frontend_host_port=127.0.0.1:8030
# database=starrocks_audit_db__
# table=starrocks_audit_tbl__
# user=root
# password=
16 changes: 12 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@

<groupId>com.starrocks</groupId>
<artifactId>fe-plugins-auditloader</artifactId>
<version>4.2.1</version>
<version>4.2.2</version>

<properties>
<log4j2.version>2.24.1</log4j2.version>
<project.scm.id>github</project.scm.id>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-fe</artifactId>
<version>3.0.6</version>
<version>3.3.11</version>
<scope>system</scope>
<systemPath>${basedir}/lib/starrocks-fe.jar</systemPath>
</dependency>
Expand Down Expand Up @@ -48,6 +50,12 @@
<artifactId>commons-codec</artifactId>
<version>1.17.1</version>
</dependency>
<!-- Kafka Client for audit log streaming -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
</dependencies>

<distributionManagement>
Expand All @@ -69,8 +77,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
Expand Down
44 changes: 36 additions & 8 deletions src/main/assembly/plugin.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,42 @@ password=
# StarRocks password encryption key
secret_key=

# Whether to generate sql digest for all queries
enable_compute_all_query_digest=false
# Filter conditions when importing audit information
filter=

# Http connectTimeout ms
connect_timeout=1000
### ============================================
### Output Routing Configuration
### ============================================

# Http readTimeout ms
read_timeout=1000
# Output mode: streamload, kafka, dual, fallback
# Default: streamload (backward compatible)
output_mode=streamload

# Filter conditions when importing audit information
filter=
# Primary output handler (for fallback mode)
# primary_output=kafka

# Secondary output handler (for fallback mode)
# secondary_output=streamload

### ============================================
### Kafka Configuration (Optional)
### See plugin.conf.example for detailed configuration
### ============================================

# Enable Kafka output
kafka.enabled=true

# Kafka broker addresses (comma-separated)
kafka.bootstrap.servers=localhost:9092,

# Kafka topic name
kafka.topic=starrocks_audit_logs

# Async mode (recommended for performance)
kafka.async_mode=true

# For more Kafka settings (batch size, compression, security, etc.),
# see plugin.conf.example file

# Timeout for uninstalling the plugin, default is 5000 ms
uninstall_timeout=5000
6 changes: 3 additions & 3 deletions src/main/assembly/plugin.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

name=AuditLoader
type=AUDIT
description=Available for versions 2.5+. Load audit log to starrocks, and user can view the statistic of queries
version=4.2.1
java.version=1.8.0
description=Available for versions 3.3.11+. Load audit log to starrocks, and user can view the statistic of queries
version=6.0.0
java.version=11.0.23
classname=com.starrocks.plugin.audit.AuditLoaderPlugin
Loading