diff --git a/README.md b/README.md index e752b51..672e161 100644 --- a/README.md +++ b/README.md @@ -50,9 +50,9 @@ CREATE TABLE starrocks_audit_db__.starrocks_audit_tbl__ ( `planCpuCosts` DOUBLE COMMENT "查询规划阶段CPU占用(纳秒)", `planMemCosts` DOUBLE COMMENT "查询规划阶段内存占用(字节)", `pendingTimeMs` BIGINT COMMENT "查询在队列中等待的时间(毫秒)", - `candidateMVs` varchar(65533) NULL COMMENT "候选MV列表", - `hitMvs` varchar(65533) NULL COMMENT "命中MV列表", - `warehouse` VARCHAR(128) NULL COMMENT "仓库名称" + `candidateMVs` VARCHAR(65533) NULL COMMENT "候选MV列表", + `hitMvs` VARCHAR(65533) NULL COMMENT "命中MV列表", + `warehouse` VARCHAR(32) NULL COMMENT "warehouse name" ) ENGINE = OLAP DUPLICATE KEY (`queryId`, `timestamp`, `queryType`) COMMENT "审计日志表" @@ -136,18 +136,17 @@ password= # StarRocks password encryption key, with a length not exceeding 16 bytes secret_key= -# Whether to generate sql digest for all queries -enable_compute_all_query_digest=false - # Filter conditions when importing audit information filter= + +# Timeout for uninstalling the plugin, default is 5000 ms +uninstall_timeout=5000 ``` **说明**: 1. 推荐使用参数 `frontend_host_port` 的默认配置,即 `127.0.0.1:8030` 。StarRocks 中各个 FE 是独立管理各自的审计信息的,在安装审计插件后,各个 FE 分别会启动各自的后台线程进行审计信息的获取攒批和 Stream Load 写入。 `frontend_host_port` 配置项用于为插件后台 Stream Load 任务提供 http 协议的 IP 和端口,该参数不支持配置为多个值。其中,参数 IP 部分可以使用集群内任意某个 FE 的 IP,但并不推荐这样配置,因为若对应的 FE 出现异常,其他 FE 后台的审计信息写入任务也会因无法通信导致写入失败。推荐配置为默认的 `127.0.0.1:8030`,让各个 FE 均使用自身的 http 端口进行通信,以此规避其他 FE 异常时对通信的影响(当然,所有的写入任务最终都会被自动转发到 FE Leader 节点执行)。 2. `secret_key` 参数用于配置"加密密码的 key 字符串",在审计插件中其长度不得超过 16 个字节。如果该参数留空,表示不对 `plugin.conf` 中的密码进行加解密,在 password 处直接配置明文密码即可。如果该参数不为空,表示需要对密码进行加解密,password 处需配置为加密后的字符串,加密后的密码可在 StarRocks 中通过 `AES_ENCRYPT` 函数生成:`SELECT TO_BASE64(AES_ENCRYPT('password','secret_key'));`。 -3. `enable_compute_all_query_digest` 参数表示是否对所有查询都生成 Hash SQL 指纹(StarRocks 默认只为慢查询开启 SQL 指纹,注意插件中的指纹计算方法与 FE 内部的方法不一致,FE 会对 SQL 语句[规范化处理](https://docs.mirrorship.cn/zh/docs/administration/Query_planning/#%E6%9F%A5%E7%9C%8B-sql-%E6%8C%87%E7%BA%B9),而插件不会,且如果开启该参数,指纹计算会额外占用集群内的计算资源)。 -4. `filter` 参数可以配置审计信息入库的过滤条件,该处使用 Stream Load 中 [where 参数](https://docs.mirrorship.cn/zh/docs/sql-reference/sql-statements/data-manipulation/STREAM_LOAD/#opt_properties)实现,即`-H "where: "`,默认为空,配置示例:`filter=isQuery=1 and clientIp like '127.0.0.1%' and user='root'`。 +3. `filter` 参数可以配置审计信息入库的过滤条件,该处使用 Stream Load 中 [where 参数](https://docs.mirrorship.cn/zh/docs/sql-reference/sql-statements/data-manipulation/STREAM_LOAD/#opt_properties)实现,即`-H "where: "`,默认为空,配置示例:`filter=isQuery=1 and clientIp like '127.0.0.1%' and user='root'`。 修改完成后,再将上面的三个文件重新打包为 zip 包: @@ -183,7 +182,7 @@ INSTALL PLUGIN FROM "/location/plugindemo.zip"; 若通过网络路径安装,还需要在安装命令的属性中提供插件压缩包的 md5 信息,语法示例: ```sql -INSTALL PLUGIN FROM "http://192.168.141.203/extra/auditloader.zip" PROPERTIES("md5sum" = "3975F7B880C9490FE95F42E2B2A28E2D"); +INSTALL PLUGIN FROM "http://127.0.0.1/extra/auditloader.zip" PROPERTIES("md5sum" = "3975F7B880C9490FE95F42E2B2A28E2D"); ``` 以安装本地插件包为例,根据上文分发文件的路径修改命令后执行: @@ -210,9 +209,9 @@ JavaVersion: 1.8.31 *************************** 2. row *************************** Name: AuditLoader Type: AUDIT -Description: Available for versions 2.5+. Load audit log to starrocks, and user can view the statistic of queries - Version: 4.2.1 -JavaVersion: 1.8.0 +Description: Available for versions 3.3.11+. Load audit log to starrocks, and user can view the statistic of queries + Version: 5.0.0 +JavaVersion: 11 ClassName: com.starrocks.plugin.audit.AuditLoaderPlugin SoName: NULL Sources: /opt/module/starrocks/auditloader.zip @@ -267,19 +266,3 @@ StarRocks审计表中支持的 `queryType` 类型包括:query、slow_query 和 对于 connection,StarRocks 3.0.6+ 版本支持在 fe.audit.log 中打印客户端连接时成功/失败的 connection 信息,您可以在 `fe.conf` 里配置 `audit_log_modules=slow_query,query,connection`,然后重启 FE 来进行启用。在启用 connection 信息后,AuditLoader 插件同样能采集到这类客户端连接信息并入库到表 `starrocks_audit_tbl__` 中,入库后该类信息对应的审计表的 `queryType` 字段即为 connection,您可以以此进行用户登录信息的审计。 - - -### 更新说明: - -##### AuditLoader v4.2.1 - -1)新增在 plugin.conf 中配置密文密码的功能 - -2)在审计日志表中预留增加了 candidateMVs 和 hitMvs 两个重要监测字段 - -3)新增在 plugin.conf 中通过 filter 参数进行审计信息的入库条件筛选功能 - -4)调整插件攒批逻辑为 Json,规避 StarRocks 3.2.12+ 等版本 FE netty 依赖升级导致的原 CSV 攒批逻辑在写入时报错 `Validation failed for header 'column_separator'` 的问题 - -5)其他细节优化 - diff --git a/lib/starrocks-fe.jar b/lib/starrocks-fe.jar index c1c05d6..8f15b7e 100644 Binary files a/lib/starrocks-fe.jar and b/lib/starrocks-fe.jar differ diff --git a/plugin.conf.example b/plugin.conf.example new file mode 100644 index 0000000..67bd2f1 --- /dev/null +++ b/plugin.conf.example @@ -0,0 +1,204 @@ +### ============================================ +### StarRocks Audit Loader Plugin Configuration +### with Kafka Integration Support +### ============================================ + +### -------------------------------------------- +### Basic Stream Load Settings (Legacy) +### -------------------------------------------- + +# The max size of a batch, default is 50MB +max_batch_size=52428800 + +# The max interval of batch loaded, default is 60 seconds +max_batch_interval_sec=60 + +# StarRocks FE host for loading the audit, default is 127.0.0.1:8030 +# This should be the host port for stream load +frontend_host_port=127.0.0.1:8030 + +# If the response time of a query exceed this threshold, it will be recorded in audit table as slow_query +qe_slow_log_ms=5000 + +# The capacity of audit queue, default is 1000 +max_queue_size=1000 + +# Database of the audit table +database=starrocks_audit_db__ + +# Audit table name, to save the audit data +table=starrocks_audit_tbl__ + +# StarRocks user. This user must have import permissions for the audit table +user=root + +# StarRocks user's password +password= + +# StarRocks password encryption key, with a length not exceeding 16 bytes +secret_key= + +# The max stmt length to be loaded in audit table, default is 1048576 +max_stmt_length=1048576 + +# Whether to generate sql digest for all queries +enable_compute_all_query_digest=false + +# Filter conditions when importing audit information +filter= + +# Connection timeout (milliseconds) +connect_timeout=1000 + +# Read timeout (milliseconds) +read_timeout=1000 + + +### ============================================ +### NEW: Output Routing Configuration +### ============================================ + +# Output mode: streamload, kafka, dual, fallback +# - streamload: Only use StarRocks Stream Load (default, backward compatible) +# - kafka: Only use Kafka output +# - dual: Use both Stream Load and Kafka simultaneously (parallel) +# - fallback: Use primary output, fallback to secondary on failure +output_mode=streamload + +# Primary output handler (for fallback mode) +# Options: kafka, streamload +primary_output=kafka + +# Secondary output handler (for fallback mode) +# Options: kafka, streamload +secondary_output=streamload + + +### ============================================ +### Kafka Producer Configuration +### ============================================ + +# Enable Kafka output +kafka.enabled=false + +# Kafka broker addresses (comma-separated) +kafka.bootstrap.servers=localhost:9092 + +# Kafka topic name +kafka.topic=starrocks_audit_logs + +# Async mode (true: asynchronous, false: synchronous) +# Async mode provides better performance but may lose messages on crash +# Sync mode ensures delivery but has higher latency +kafka.async_mode=true + +### -------------------------------------------- +### Kafka Performance Settings +### -------------------------------------------- + +# Batch size in bytes (default: 16384 = 16KB) +# Larger batches improve throughput but increase latency +kafka.batch.size=16384 + +# Linger time in milliseconds (default: 10ms) +# How long to wait for additional messages before sending a batch +kafka.linger.ms=10 + +# Compression type: none, gzip, snappy, lz4, zstd +# snappy: Good balance of compression and CPU usage (recommended) +# lz4: Fast compression with moderate ratio +# gzip: Best compression but high CPU usage +kafka.compression.type=snappy + +# Producer buffer memory in bytes (default: 33554432 = 32MB) +kafka.buffer.memory=33554432 + +### -------------------------------------------- +### Kafka Reliability Settings +### -------------------------------------------- + +# Acknowledgment level: +# 0: No acknowledgment (fastest, may lose data) +# 1: Leader acknowledgment only (balanced, recommended) +# all/-1: All in-sync replicas acknowledgment (safest, slowest) +kafka.acks=1 + +# Number of retries on failure +kafka.retries=3 + +# Maximum number of unacknowledged requests per connection +kafka.max.in.flight.requests.per.connection=5 + +### -------------------------------------------- +### Kafka Timeout Settings +### -------------------------------------------- + +# Request timeout in milliseconds +kafka.request.timeout.ms=30000 + +# Delivery timeout in milliseconds +kafka.delivery.timeout.ms=120000 + +### -------------------------------------------- +### Kafka Security Settings (Optional) +### -------------------------------------------- + +# SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 +# kafka.sasl.mechanism=PLAIN + +# SASL JAAS configuration +# Example for PLAIN: +# kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password"; +# Example for SCRAM-SHA-256: +# kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password"; + +# Security protocol: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL +# kafka.security.protocol=SASL_SSL + +# SSL truststore location +# kafka.ssl.truststore.location=/path/to/truststore.jks + +# SSL truststore password +# kafka.ssl.truststore.password=password + + +### ============================================ +### Configuration Examples +### ============================================ + +### Example 1: Kafka Only (High Performance) +# output_mode=kafka +# kafka.enabled=true +# kafka.bootstrap.servers=kafka-broker1:9092,kafka-broker2:9092 +# kafka.topic=starrocks_audit_logs +# kafka.async_mode=true +# kafka.acks=1 +# kafka.compression.type=snappy + +### Example 2: Dual Mode (Both Outputs) +# output_mode=dual +# kafka.enabled=true +# kafka.bootstrap.servers=kafka-broker:9092 +# kafka.topic=starrocks_audit_logs +# frontend_host_port=127.0.0.1:8030 +# database=starrocks_audit_db__ +# table=starrocks_audit_tbl__ + +### Example 3: Fallback Mode (Fault Tolerance) +# output_mode=fallback +# primary_output=kafka +# secondary_output=streamload +# kafka.enabled=true +# kafka.bootstrap.servers=kafka-broker:9092 +# kafka.topic=starrocks_audit_logs +# frontend_host_port=127.0.0.1:8030 +# database=starrocks_audit_db__ +# table=starrocks_audit_tbl__ + +### Example 4: Stream Load Only (Default/Legacy) +# output_mode=streamload +# frontend_host_port=127.0.0.1:8030 +# database=starrocks_audit_db__ +# table=starrocks_audit_tbl__ +# user=root +# password= diff --git a/pom.xml b/pom.xml index 4345210..64f9655 100644 --- a/pom.xml +++ b/pom.xml @@ -6,17 +6,19 @@ com.starrocks fe-plugins-auditloader - 4.2.1 + 4.2.2 2.24.1 github + 11 + 11 com.starrocks starrocks-fe - 3.0.6 + 3.3.11 system ${basedir}/lib/starrocks-fe.jar @@ -48,6 +50,12 @@ commons-codec 1.17.1 + + + org.apache.kafka + kafka-clients + 3.6.0 + @@ -69,8 +77,8 @@ maven-compiler-plugin 3.13.0 - 1.8 - 1.8 + 11 + 11 diff --git a/src/main/assembly/plugin.conf b/src/main/assembly/plugin.conf index b003bf7..2944529 100644 --- a/src/main/assembly/plugin.conf +++ b/src/main/assembly/plugin.conf @@ -51,14 +51,42 @@ password= # StarRocks password encryption key secret_key= -# Whether to generate sql digest for all queries -enable_compute_all_query_digest=false +# Filter conditions when importing audit information +filter= -# Http connectTimeout ms -connect_timeout=1000 +### ============================================ +### Output Routing Configuration +### ============================================ -# Http readTimeout ms -read_timeout=1000 +# Output mode: streamload, kafka, dual, fallback +# Default: streamload (backward compatible) +output_mode=streamload -# Filter conditions when importing audit information -filter= +# Primary output handler (for fallback mode) +# primary_output=kafka + +# Secondary output handler (for fallback mode) +# secondary_output=streamload + +### ============================================ +### Kafka Configuration (Optional) +### See plugin.conf.example for detailed configuration +### ============================================ + +# Enable Kafka output +kafka.enabled=true + +# Kafka broker addresses (comma-separated) +kafka.bootstrap.servers=localhost:9092, + +# Kafka topic name +kafka.topic=starrocks_audit_logs + +# Async mode (recommended for performance) +kafka.async_mode=true + +# For more Kafka settings (batch size, compression, security, etc.), +# see plugin.conf.example file + +# Timeout for uninstalling the plugin, default is 5000 ms +uninstall_timeout=5000 \ No newline at end of file diff --git a/src/main/assembly/plugin.properties b/src/main/assembly/plugin.properties index d9685ba..0e74984 100644 --- a/src/main/assembly/plugin.properties +++ b/src/main/assembly/plugin.properties @@ -17,7 +17,7 @@ name=AuditLoader type=AUDIT -description=Available for versions 2.5+. Load audit log to starrocks, and user can view the statistic of queries -version=4.2.1 -java.version=1.8.0 +description=Available for versions 3.3.11+. Load audit log to starrocks, and user can view the statistic of queries +version=6.0.0 +java.version=11.0.23 classname=com.starrocks.plugin.audit.AuditLoaderPlugin diff --git a/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java b/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java index 805f946..93b706e 100644 --- a/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java +++ b/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java @@ -18,16 +18,13 @@ package com.starrocks.plugin.audit; import com.starrocks.plugin.*; -import com.starrocks.sql.ast.StatementBase; -import com.starrocks.sql.common.SqlDigestBuilder; -import com.starrocks.sql.parser.SqlParser; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.lang3.StringUtils; +import com.starrocks.plugin.audit.output.KafkaOutputHandler; +import com.starrocks.plugin.audit.output.OutputHandler; +import com.starrocks.plugin.audit.output.StreamLoadOutputHandler; +import com.starrocks.plugin.audit.routing.OutputRouter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import javax.crypto.Cipher; -import javax.crypto.spec.SecretKeySpec; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; @@ -40,16 +37,12 @@ import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; /* @@ -60,10 +53,14 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { private static final SimpleDateFormat DATETIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - private StringBuilder auditBuffer = new StringBuilder(); + // CSV format constants for Stream Load + public static final char COLUMN_SEPARATOR = '\t'; + public static final char ROW_DELIMITER = '\n'; + private long lastLoadTime = 0; private BlockingQueue auditEventQueue; - private StarrocksStreamLoader streamLoader; + private List eventBatch; + private OutputRouter outputRouter; private Thread loadThread; private AuditLoaderConf conf; @@ -91,8 +88,13 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException { loadConfig(ctx, info.getProperties()); this.auditEventQueue = new LinkedBlockingQueue<>(conf.maxQueueSize); - this.streamLoader = new StarrocksStreamLoader(conf); - this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread"); + this.eventBatch = new ArrayList<>(); + + // Initialize output router + initializeOutputRouter(); + + this.loadThread = new Thread(new LoadWorker(), "audit loader thread"); + this.loadThread.setDaemon(true); this.loadThread.start(); candidateMvsExists = hasField(AuditEvent.class, "candidateMvs"); @@ -102,6 +104,101 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException { } } + /** + * Initialize output router based on configuration + */ + private void initializeOutputRouter() throws PluginException { + String outputMode = conf.properties.getOrDefault("output_mode", "streamload"); + + OutputRouter.RoutingMode routingMode; + switch (outputMode.toLowerCase()) { + case "kafka": + routingMode = OutputRouter.RoutingMode.SINGLE; + break; + case "dual": + routingMode = OutputRouter.RoutingMode.DUAL; + break; + case "fallback": + routingMode = OutputRouter.RoutingMode.FALLBACK; + break; + case "streamload": + default: + routingMode = OutputRouter.RoutingMode.SINGLE; + break; + } + + this.outputRouter = new OutputRouter(routingMode); + + // Initialize handlers based on mode + try { + if ("kafka".equalsIgnoreCase(outputMode)) { + // Kafka only + boolean kafkaEnabled = Boolean.parseBoolean( + conf.properties.getOrDefault("kafka.enabled", "false")); + if (kafkaEnabled) { + KafkaOutputHandler kafkaHandler = new KafkaOutputHandler(); + kafkaHandler.init(conf.properties); + outputRouter.addHandler(kafkaHandler); + LOG.info("Initialized Kafka output handler"); + } else { + throw new PluginException("Kafka output mode selected but kafka.enabled=false"); + } + } else if ("dual".equalsIgnoreCase(outputMode)) { + // Both Stream Load and Kafka + boolean kafkaEnabled = Boolean.parseBoolean( + conf.properties.getOrDefault("kafka.enabled", "false")); + + // Add Stream Load handler + StreamLoadOutputHandler streamHandler = new StreamLoadOutputHandler(); + streamHandler.init(conf.properties); + outputRouter.addHandler(streamHandler); + LOG.info("Initialized Stream Load output handler"); + + // Add Kafka handler if enabled + if (kafkaEnabled) { + KafkaOutputHandler kafkaHandler = new KafkaOutputHandler(); + kafkaHandler.init(conf.properties); + outputRouter.addHandler(kafkaHandler); + LOG.info("Initialized Kafka output handler"); + } + } else if ("fallback".equals(outputMode.toLowerCase())) { + // Kafka primary, Stream Load fallback + String primaryOutput = conf.properties.getOrDefault("primary_output", "kafka"); + String secondaryOutput = conf.properties.getOrDefault("secondary_output", "streamload"); + + // Add primary handler + if ("kafka".equals(primaryOutput)) { + boolean kafkaEnabled = Boolean.parseBoolean( + conf.properties.getOrDefault("kafka.enabled", "false")); + if (kafkaEnabled) { + KafkaOutputHandler kafkaHandler = new KafkaOutputHandler(); + kafkaHandler.init(conf.properties); + outputRouter.addHandler(kafkaHandler); + LOG.info("Initialized Kafka as primary output handler"); + } + } + + // Add secondary handler + if ("streamload".equals(secondaryOutput)) { + StreamLoadOutputHandler streamHandler = new StreamLoadOutputHandler(); + streamHandler.init(conf.properties); + outputRouter.addHandler(streamHandler); + LOG.info("Initialized Stream Load as fallback output handler"); + } + } else { + // Default: Stream Load only + StreamLoadOutputHandler streamHandler = new StreamLoadOutputHandler(); + streamHandler.init(conf.properties); + outputRouter.addHandler(streamHandler); + LOG.info("Initialized Stream Load output handler (default)"); + } + } catch (Exception e) { + throw new PluginException("Failed to initialize output handlers: " + e.getMessage(), e); + } + + LOG.info("Output router initialized with mode: {}", outputMode); + } + private void loadConfig(PluginContext ctx, Map pluginInfoProperties) throws PluginException { Path pluginPath = FileSystems.getDefault().getPath(ctx.getPluginPath()); if (!Files.exists(pluginPath)) { @@ -137,11 +234,16 @@ public void close() throws IOException { isClosed = true; if (loadThread != null) { try { - loadThread.join(60000); + LOG.info("waiting for the audit loader thread to complete execution"); + loadThread.join(conf.uninstallTimeout); } catch (InterruptedException e) { LOG.debug("encounter exception when closing the audit loader", e); } } + if (outputRouter != null) { + outputRouter.close(); + } + LOG.info("AuditLoader plugin is closed"); } public boolean eventFilter(AuditEvent.EventType type) { @@ -161,47 +263,8 @@ public void exec(AuditEvent event) { } private void assembleAudit(AuditEvent event) { - String queryType = getQueryType(event); - int isQuery = event.isQuery ? 1 : 0; - // Compute digest for all queries - if (conf.enableComputeAllQueryDigest && (event.digest == null || StringUtils.isBlank(event.digest))) { - event.digest = computeStatementDigest(event.stmt); - LOG.debug("compute stmt digest, queryId: {} digest: {}", event.queryId, event.digest); - } - String candidateMvsVal = candidateMvsExists ? event.candidateMvs : ""; - String hitMVsVal = hitMVsExists ? event.hitMVs : ""; - String content = "{\"queryId\":\"" + getQueryId(queryType, event) + "\"," + - "\"timestamp\":\"" + longToTimeString(event.timestamp) + "\"," + - "\"queryType\":\"" + queryType + "\"," + - "\"clientIp\":\"" + event.clientIp + "\"," + - "\"user\":\"" + event.user + "\"," + - "\"authorizedUser\":\"" + event.authorizedUser + "\"," + - "\"resourceGroup\":\"" + event.resourceGroup + "\"," + - "\"catalog\":\"" + event.catalog + "\"," + - "\"db\":\"" + event.db + "\"," + - "\"state\":\"" + event.state + "\"," + - "\"errorCode\":\"" + event.errorCode + "\"," + - "\"queryTime\":" + event.queryTime + "," + - "\"scanBytes\":" + event.scanBytes + "," + - "\"scanRows\":" + event.scanRows + "," + - "\"returnRows\":" + event.returnRows + "," + - "\"cpuCostNs\":" + event.cpuCostNs + "," + - "\"memCostBytes\":" + event.memCostBytes + "," + - "\"stmtId\":" + event.stmtId + "," + - "\"isQuery\":" + isQuery + "," + - "\"feIp\":\"" + event.feIp + "\"," + - "\"stmt\":\"" + truncateByBytes(event.stmt) + "\"," + - "\"digest\":\"" + event.digest + "\"," + - "\"planCpuCosts\":" + event.planCpuCosts + "," + - "\"planMemCosts\":" + event.planMemCosts + "," + - "\"pendingTimeMs\":" + event.pendingTimeMs + "," + - "\"candidateMVs\":\"" + candidateMvsVal + "\"," + - "\"hitMvs\":\"" + hitMVsVal + "\"," + - "\"warehouse\":\"" + event.warehouse + "\"}"; - if (auditBuffer.length() > 0) { - auditBuffer.append(","); - } - auditBuffer.append(content); + // Add event to batch for output router processing + eventBatch.add(event); } private String getQueryId(String prefix, AuditEvent event) { @@ -223,24 +286,6 @@ private String getQueryType(AuditEvent event) { } } - private String computeStatementDigest(String stmt) { - List stmts = SqlParser.parse(stmt, 32); - StatementBase queryStmt = stmts.get(stmts.size() - 1); - - if (queryStmt == null) { - return ""; - } - String digest = SqlDigestBuilder.build(queryStmt); - try { - MessageDigest md = MessageDigest.getInstance("MD5"); - md.reset(); - md.update(digest.getBytes()); - return Hex.encodeHexString(md.digest()); - } catch (NoSuchAlgorithmException | NullPointerException e) { - return ""; - } - } - private String truncateByBytes(String str) { int maxLen = Math.min(conf.maxStmtLength, str.getBytes().length); if (maxLen >= str.getBytes().length) { @@ -257,24 +302,24 @@ private String truncateByBytes(String str) { return new String(charBuffer.array(), 0, charBuffer.position()); } - private void loadIfNecessary(StarrocksStreamLoader loader) { - if (auditBuffer.length() < conf.maxBatchSize && System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) { + private void loadIfNecessary() { + if (eventBatch.size() < conf.maxBatchSize && System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) { return; } - if (auditBuffer.length() == 0) { + if (eventBatch.isEmpty()) { return; } lastLoadTime = System.currentTimeMillis(); // begin to load try { - StarrocksStreamLoader.LoadResponse response = loader.loadBatch(auditBuffer); - LOG.debug("audit loader response: {}", response); + outputRouter.route(new ArrayList<>(eventBatch)); + LOG.debug("audit loader batch sent successfully"); } catch (Exception e) { - LOG.error("encounter exception when putting current audit batch, discard current batch", e); + LOG.error("encounter exception when routing current audit batch, discard current batch", e); } finally { - // make a new string builder to receive following events. - this.auditBuffer = new StringBuilder(); + // clear the batch for next round + eventBatch.clear(); } } @@ -307,11 +352,9 @@ public static class AuditLoaderConf { public static final String MAX_STMT_LENGTH = "max_stmt_length"; public static final String QE_SLOW_LOG_MS = "qe_slow_log_ms"; public static final String MAX_QUEUE_SIZE = "max_queue_size"; - public static final String ENABLE_COMPUTE_ALL_QUERY_DIGEST = "enable_compute_all_query_digest"; - public static final String CONNECT_TIMEOUT = "connect_timeout"; - public static final String READ_TIMEOUT = "read_timeout"; - public static final String STREAM_LOAD_FILTER = "filter"; + public static final String PROP_SECRET_KEY = "secret_key"; + public static final String PROP_UNINSTALL_TIMEOUT = "uninstall_timeout"; public long maxBatchSize = 50 * 1024 * 1024; public long maxBatchIntervalSec = 60; @@ -325,17 +368,13 @@ public static class AuditLoaderConf { public int maxStmtLength = 1048576; public int qeSlowLogMs = 5000; public int maxQueueSize = 1000; - - public boolean enableComputeAllQueryDigest = false; - - public int connectTimeout = 1000; - public int readTimeout = 1000; public String streamLoadFilter = ""; - - public static final String PROP_SECRET_KEY = "secret_key"; public String secretKey = ""; + public long uninstallTimeout = 5; + public Map properties; public void init(Map properties) throws PluginException { + this.properties = properties; try { if (properties.containsKey(PROP_MAX_BATCH_SIZE)) { maxBatchSize = Long.parseLong(properties.get(PROP_MAX_BATCH_SIZE)); @@ -373,17 +412,11 @@ public void init(Map properties) throws PluginException { if (properties.containsKey(MAX_QUEUE_SIZE)) { maxQueueSize = Integer.parseInt(properties.get(MAX_QUEUE_SIZE)); } - if (properties.containsKey(ENABLE_COMPUTE_ALL_QUERY_DIGEST)) { - enableComputeAllQueryDigest = Boolean.parseBoolean(properties.get(ENABLE_COMPUTE_ALL_QUERY_DIGEST)); - } if (properties.containsKey(STREAM_LOAD_FILTER)) { streamLoadFilter = properties.get(STREAM_LOAD_FILTER); } - if (properties.containsKey(CONNECT_TIMEOUT)) { - connectTimeout = Integer.parseInt(properties.get(CONNECT_TIMEOUT)); - } - if (properties.containsKey(READ_TIMEOUT)) { - readTimeout = Integer.parseInt(properties.get(READ_TIMEOUT)); + if (properties.containsKey(PROP_UNINSTALL_TIMEOUT)) { + uninstallTimeout = Long.parseLong(properties.get(PROP_UNINSTALL_TIMEOUT)); } } catch (Exception e) { throw new PluginException(e.getMessage()); @@ -392,10 +425,7 @@ public void init(Map properties) throws PluginException { } private class LoadWorker implements Runnable { - private StarrocksStreamLoader loader; - - public LoadWorker(StarrocksStreamLoader loader) { - this.loader = loader; + public LoadWorker() { } public void run() { @@ -405,13 +435,14 @@ public void run() { if (event != null) { assembleAudit(event); } - loadIfNecessary(loader); + loadIfNecessary(); } catch (InterruptedException ie) { LOG.debug("encounter exception when loading current audit batch", ie); } catch (Exception e) { LOG.error("run audit logger error:", e); } } + LOG.info("audit loader thread run ends"); } } diff --git a/src/main/java/com/starrocks/plugin/audit/StarrocksStreamLoader.java b/src/main/java/com/starrocks/plugin/audit/StarrocksStreamLoader.java index 8f2a62c..ff4263a 100644 --- a/src/main/java/com/starrocks/plugin/audit/StarrocksStreamLoader.java +++ b/src/main/java/com/starrocks/plugin/audit/StarrocksStreamLoader.java @@ -44,10 +44,6 @@ public class StarrocksStreamLoader { private String loadUrlStr; private String authEncoding; private String feIdentity; - - private int connectTimeout; - private int readTimeout; - private String streamLoadFilter; public StarrocksStreamLoader(AuditLoaderPlugin.AuditLoaderConf conf) { @@ -69,10 +65,6 @@ public StarrocksStreamLoader(AuditLoaderPlugin.AuditLoaderConf conf) { this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); // currently, FE identity is FE's IP, so we replace the "." in IP to make it suitable for label this.feIdentity = conf.feIdentity.replaceAll("\\.", "_"); - - this.connectTimeout = conf.connectTimeout; - this.readTimeout = conf.readTimeout; - this.streamLoadFilter = conf.streamLoadFilter; } @@ -84,11 +76,12 @@ private HttpURLConnection getConnection(String urlStr, String label) throws IOEx conn.setRequestProperty("Authorization", "Basic " + authEncoding); conn.addRequestProperty("Expect", "100-continue"); conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8"); - conn.addRequestProperty("format", "json"); - conn.addRequestProperty("strip_outer_array", "true"); conn.addRequestProperty("label", label); conn.addRequestProperty("max_filter_ratio", "1.0"); + conn.addRequestProperty("column_separator", String.valueOf(AuditLoaderPlugin.COLUMN_SEPARATOR)); + conn.addRequestProperty("row_delimiter", String.valueOf(AuditLoaderPlugin.ROW_DELIMITER)); + conn.addRequestProperty("columns", "queryId,timestamp,queryType,clientIp,user,authorizedUser,resourceGroup,catalog,db,state,errorCode,queryTime,scanBytes,scanRows,returnRows,cpuCostNs,memCostBytes,stmtId,isQuery,feIp,stmt,digest,planCpuCosts,planMemCosts,pendingTimeMs,candidateMVs,hitMvs,warehouse"); if(!StringUtils.isBlank(this.streamLoadFilter)) { conn.addRequestProperty("where", streamLoadFilter); @@ -97,9 +90,6 @@ private HttpURLConnection getConnection(String urlStr, String label) throws IOEx conn.setDoOutput(true); conn.setDoInput(true); - conn.setConnectTimeout(connectTimeout); - conn.setReadTimeout(readTimeout); - return conn; } @@ -109,8 +99,9 @@ private String toCurl(HttpURLConnection conn) { sb.append("-H \"").append("Authorization\":").append("\"Basic " + authEncoding).append("\" \\\n "); sb.append("-H \"").append("Expect\":").append("\"100-continue\" \\\n "); sb.append("-H \"").append("Content-Type\":").append("\"text/plain; charset=UTF-8\" \\\n "); - sb.append("-H \"").append("format\":").append("\"json \\\n "); - sb.append("-H \"").append("strip_outer_array\":").append("\"true \\\n "); + sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n "); + sb.append("-H \"").append("column_separator\":").append(AuditLoaderPlugin.COLUMN_SEPARATOR).append(" \\\n "); + sb.append("-H \"").append("row_delimiter\":").append(AuditLoaderPlugin.ROW_DELIMITER).append(" \\\n "); if(!StringUtils.isBlank(this.streamLoadFilter)) { sb.append("-H \"").append("where\":").append(streamLoadFilter).append(" \\\n "); } @@ -170,8 +161,7 @@ public LoadResponse loadBatch(StringBuilder sb) { beConn = status == TEMPORARY_REDIRECT_CODE ? getConnection(location, label) : getConnection(loadUrlStr, label); // send data to be BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream()); - String content = "[" + sb.toString() + "]"; - bos.write(content.getBytes()); + bos.write(sb.toString().getBytes()); bos.close(); // get respond diff --git a/src/main/java/com/starrocks/plugin/audit/kafka/AuditEventSerializer.java b/src/main/java/com/starrocks/plugin/audit/kafka/AuditEventSerializer.java new file mode 100644 index 0000000..b050b29 --- /dev/null +++ b/src/main/java/com/starrocks/plugin/audit/kafka/AuditEventSerializer.java @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.plugin.audit.kafka; + +import com.starrocks.plugin.AuditEvent; + +import java.lang.reflect.Field; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.UUID; + +/** + * Serializer for AuditEvent to JSON string + */ +public class AuditEventSerializer { + private static final SimpleDateFormat DATETIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + // Check if new fields exist (for backward compatibility) + private final boolean candidateMvsExists; + private final boolean hitMvsExists; + + public AuditEventSerializer() { + this.candidateMvsExists = hasField(AuditEvent.class, "candidateMvs"); + this.hitMvsExists = hasField(AuditEvent.class, "hitMVs"); + } + + /** + * Serialize AuditEvent to JSON string + */ + public String serialize(AuditEvent event) { + String queryType = determineQueryType(event); + String queryId = getQueryId(queryType, event); + int isQuery = event.isQuery ? 1 : 0; + + String candidateMvsVal = candidateMvsExists ? getFieldValue(event, "candidateMvs", "") : ""; + String hitMvsVal = hitMvsExists ? getFieldValue(event, "hitMVs", "") : ""; + + return "{\"queryId\":\"" + escapeJson(queryId) + "\"," + + "\"timestamp\":\"" + formatTimestamp(event.timestamp) + "\"," + + "\"queryType\":\"" + queryType + "\"," + + "\"clientIp\":\"" + escapeJson(event.clientIp) + "\"," + + "\"user\":\"" + escapeJson(event.user) + "\"," + + "\"authorizedUser\":\"" + escapeJson(event.authorizedUser) + "\"," + + "\"resourceGroup\":\"" + escapeJson(event.resourceGroup) + "\"," + + "\"catalog\":\"" + escapeJson(event.catalog) + "\"," + + "\"db\":\"" + escapeJson(event.db) + "\"," + + "\"state\":\"" + escapeJson(event.state) + "\"," + + "\"errorCode\":\"" + escapeJson(event.errorCode) + "\"," + + "\"queryTime\":" + event.queryTime + "," + + "\"scanBytes\":" + event.scanBytes + "," + + "\"scanRows\":" + event.scanRows + "," + + "\"returnRows\":" + event.returnRows + "," + + "\"cpuCostNs\":" + event.cpuCostNs + "," + + "\"memCostBytes\":" + event.memCostBytes + "," + + "\"stmtId\":" + event.stmtId + "," + + "\"isQuery\":" + isQuery + "," + + "\"feIp\":\"" + escapeJson(event.feIp) + "\"," + + "\"stmt\":\"" + escapeJson(event.stmt) + "\"," + + "\"digest\":\"" + escapeJson(event.digest) + "\"," + + "\"planCpuCosts\":" + event.planCpuCosts + "," + + "\"planMemCosts\":" + event.planMemCosts + "," + + "\"pendingTimeMs\":" + event.pendingTimeMs + "," + + "\"candidateMVs\":\"" + escapeJson(candidateMvsVal) + "\"," + + "\"hitMvs\":\"" + escapeJson(hitMvsVal) + "\"," + + "\"warehouse\":\"" + escapeJson(event.warehouse) + "\"}"; + } + + private String getQueryId(String prefix, AuditEvent event) { + if (event.queryId == null || event.queryId.isEmpty()) { + return prefix + "-" + UUID.randomUUID(); + } + return event.queryId; + } + + private String determineQueryType(AuditEvent event) { + try { + switch (event.type) { + case CONNECTION: + return "connection"; + case DISCONNECTION: + return "disconnection"; + default: + return "query"; + } + } catch (Exception e) { + return "query"; + } + } + + private String formatTimestamp(long timestamp) { + if (timestamp <= 0L) { + return DATETIME_FORMAT.format(new Date()); + } + return DATETIME_FORMAT.format(new Date(timestamp)); + } + + /** + * Escape special characters for JSON + */ + private String escapeJson(String str) { + if (str == null) { + return ""; + } + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < str.length(); i++) { + char ch = str.charAt(i); + switch (ch) { + case '"': + sb.append("\\\""); + break; + case '\\': + sb.append("\\\\"); + break; + case '\b': + sb.append("\\b"); + break; + case '\f': + sb.append("\\f"); + break; + case '\n': + sb.append("\\n"); + break; + case '\r': + sb.append("\\r"); + break; + case '\t': + sb.append("\\t"); + break; + default: + if (ch < ' ') { + String ss = Integer.toHexString(ch); + sb.append("\\u"); + for (int k = 0; k < 4 - ss.length(); k++) { + sb.append('0'); + } + sb.append(ss.toUpperCase()); + } else { + sb.append(ch); + } + } + } + return sb.toString(); + } + + /** + * Check if a field exists in the class + */ + private boolean hasField(Class clazz, String fieldName) { + try { + Field[] fields = clazz.getDeclaredFields(); + for (Field field : fields) { + if (field.getName().equals(fieldName)) { + return true; + } + } + return false; + } catch (Exception e) { + return false; + } + } + + /** + * Get field value using reflection + */ + private String getFieldValue(AuditEvent event, String fieldName, String defaultValue) { + try { + Field field = AuditEvent.class.getDeclaredField(fieldName); + field.setAccessible(true); + Object value = field.get(event); + return value != null ? value.toString() : defaultValue; + } catch (Exception e) { + return defaultValue; + } + } +} diff --git a/src/main/java/com/starrocks/plugin/audit/kafka/KafkaConfig.java b/src/main/java/com/starrocks/plugin/audit/kafka/KafkaConfig.java new file mode 100644 index 0000000..4d3403c --- /dev/null +++ b/src/main/java/com/starrocks/plugin/audit/kafka/KafkaConfig.java @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.plugin.audit.kafka; + +import java.util.Map; + +/** + * Configuration for Kafka producer + */ +public class KafkaConfig { + // Connection + private String bootstrapServers; + private String topic; + + // Performance + private int batchSize = 16384; // 16KB + private int lingerMs = 10; + private String compressionType = "snappy"; + private long bufferMemory = 33554432; // 32MB + + // Reliability + private String acks = "1"; + private int retries = 3; + private int maxInFlightRequests = 5; + + // Timeouts + private int requestTimeout = 30000; + private int deliveryTimeout = 120000; + + // Security (optional) + private String saslMechanism; + private String saslJaasConfig; + private String securityProtocol; + private String sslTruststoreLocation; + private String sslTruststorePassword; + + public KafkaConfig(Map configMap) { + loadFromMap(configMap); + } + + private void loadFromMap(Map config) { + // Connection + this.bootstrapServers = config.getOrDefault("kafka.bootstrap.servers", "localhost:9092"); + this.topic = config.getOrDefault("kafka.topic", "starrocks_audit_logs"); + + // Performance + if (config.containsKey("kafka.batch.size")) { + this.batchSize = Integer.parseInt(config.get("kafka.batch.size")); + } + if (config.containsKey("kafka.linger.ms")) { + this.lingerMs = Integer.parseInt(config.get("kafka.linger.ms")); + } + this.compressionType = config.getOrDefault("kafka.compression.type", "snappy"); + if (config.containsKey("kafka.buffer.memory")) { + this.bufferMemory = Long.parseLong(config.get("kafka.buffer.memory")); + } + + // Reliability + this.acks = config.getOrDefault("kafka.acks", "1"); + if (config.containsKey("kafka.retries")) { + this.retries = Integer.parseInt(config.get("kafka.retries")); + } + if (config.containsKey("kafka.max.in.flight.requests.per.connection")) { + this.maxInFlightRequests = Integer.parseInt(config.get("kafka.max.in.flight.requests.per.connection")); + } + + // Timeouts + if (config.containsKey("kafka.request.timeout.ms")) { + this.requestTimeout = Integer.parseInt(config.get("kafka.request.timeout.ms")); + } + if (config.containsKey("kafka.delivery.timeout.ms")) { + this.deliveryTimeout = Integer.parseInt(config.get("kafka.delivery.timeout.ms")); + } + + // Security + this.saslMechanism = config.get("kafka.sasl.mechanism"); + this.saslJaasConfig = config.get("kafka.sasl.jaas.config"); + this.securityProtocol = config.get("kafka.security.protocol"); + this.sslTruststoreLocation = config.get("kafka.ssl.truststore.location"); + this.sslTruststorePassword = config.get("kafka.ssl.truststore.password"); + } + + // Getters + public String getBootstrapServers() { + return bootstrapServers; + } + + public String getTopic() { + return topic; + } + + public int getBatchSize() { + return batchSize; + } + + public int getLingerMs() { + return lingerMs; + } + + public String getCompressionType() { + return compressionType; + } + + public long getBufferMemory() { + return bufferMemory; + } + + public String getAcks() { + return acks; + } + + public int getRetries() { + return retries; + } + + public int getMaxInFlightRequests() { + return maxInFlightRequests; + } + + public int getRequestTimeout() { + return requestTimeout; + } + + public int getDeliveryTimeout() { + return deliveryTimeout; + } + + public String getSaslMechanism() { + return saslMechanism; + } + + public String getSaslJaasConfig() { + return saslJaasConfig; + } + + public String getSecurityProtocol() { + return securityProtocol; + } + + public String getSslTruststoreLocation() { + return sslTruststoreLocation; + } + + public String getSslTruststorePassword() { + return sslTruststorePassword; + } + + @Override + public String toString() { + return "KafkaConfig{" + + "bootstrapServers='" + bootstrapServers + '\'' + + ", topic='" + topic + '\'' + + ", batchSize=" + batchSize + + ", lingerMs=" + lingerMs + + ", compressionType='" + compressionType + '\'' + + ", bufferMemory=" + bufferMemory + + ", acks='" + acks + '\'' + + ", retries=" + retries + + ", maxInFlightRequests=" + maxInFlightRequests + + ", requestTimeout=" + requestTimeout + + ", deliveryTimeout=" + deliveryTimeout + + '}'; + } +} diff --git a/src/main/java/com/starrocks/plugin/audit/kafka/KafkaMetrics.java b/src/main/java/com/starrocks/plugin/audit/kafka/KafkaMetrics.java new file mode 100644 index 0000000..158c907 --- /dev/null +++ b/src/main/java/com/starrocks/plugin/audit/kafka/KafkaMetrics.java @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.plugin.audit.kafka; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Metrics collector for Kafka producer + */ +public class KafkaMetrics { + // Counters + private final AtomicLong successCount = new AtomicLong(0); + private final AtomicLong failureCount = new AtomicLong(0); + private final AtomicLong totalMessages = new AtomicLong(0); + + // Latency statistics + private final AtomicLong totalLatency = new AtomicLong(0); + private final AtomicLong maxLatency = new AtomicLong(0); + private final AtomicLong minLatency = new AtomicLong(Long.MAX_VALUE); + + // Batch statistics + private final AtomicLong totalBatches = new AtomicLong(0); + private final AtomicLong totalBytes = new AtomicLong(0); + + public void recordSuccess() { + successCount.incrementAndGet(); + totalMessages.incrementAndGet(); + } + + public void recordFailure() { + failureCount.incrementAndGet(); + totalMessages.incrementAndGet(); + } + + public void recordLatency(long latencyMs) { + totalLatency.addAndGet(latencyMs); + + // Update max + long current = maxLatency.get(); + while (latencyMs > current && !maxLatency.compareAndSet(current, latencyMs)) { + current = maxLatency.get(); + } + + // Update min + current = minLatency.get(); + while (latencyMs < current && !minLatency.compareAndSet(current, latencyMs)) { + current = minLatency.get(); + } + } + + public void recordBatch(int messageCount, int bytes) { + totalBatches.incrementAndGet(); + totalBytes.addAndGet(bytes); + } + + public MetricsSnapshot getSnapshot() { + long success = successCount.get(); + long failure = failureCount.get(); + long total = totalMessages.get(); + + double successRate = total > 0 ? (double) success / total * 100 : 0; + double avgLatency = success > 0 ? (double) totalLatency.get() / success : 0; + + long min = minLatency.get(); + if (min == Long.MAX_VALUE) { + min = 0; + } + + return new MetricsSnapshot( + success, failure, total, successRate, + avgLatency, min, maxLatency.get(), + totalBatches.get(), totalBytes.get() + ); + } + + public void reset() { + successCount.set(0); + failureCount.set(0); + totalMessages.set(0); + totalLatency.set(0); + maxLatency.set(0); + minLatency.set(Long.MAX_VALUE); + totalBatches.set(0); + totalBytes.set(0); + } + + /** + * Metrics snapshot + */ + public static class MetricsSnapshot { + public final long successCount; + public final long failureCount; + public final long totalCount; + public final double successRate; + public final double avgLatencyMs; + public final long minLatencyMs; + public final long maxLatencyMs; + public final long totalBatches; + public final long totalBytes; + + public MetricsSnapshot(long success, long failure, long total, + double successRate, double avgLatency, + long minLatency, long maxLatency, + long batches, long bytes) { + this.successCount = success; + this.failureCount = failure; + this.totalCount = total; + this.successRate = successRate; + this.avgLatencyMs = avgLatency; + this.minLatencyMs = minLatency; + this.maxLatencyMs = maxLatency; + this.totalBatches = batches; + this.totalBytes = bytes; + } + + @Override + public String toString() { + return String.format( + "Metrics[success=%d, failure=%d, rate=%.2f%%, " + + "latency(avg/min/max)=%.2f/%d/%d ms, " + + "batches=%d, bytes=%d]", + successCount, failureCount, successRate, + avgLatencyMs, minLatencyMs, maxLatencyMs, + totalBatches, totalBytes + ); + } + } +} diff --git a/src/main/java/com/starrocks/plugin/audit/kafka/KafkaProducerManager.java b/src/main/java/com/starrocks/plugin/audit/kafka/KafkaProducerManager.java new file mode 100644 index 0000000..8b60efa --- /dev/null +++ b/src/main/java/com/starrocks/plugin/audit/kafka/KafkaProducerManager.java @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.plugin.audit.kafka; + +import com.starrocks.plugin.AuditEvent; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.time.Duration; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Manager for Kafka producer lifecycle and message sending + */ +public class KafkaProducerManager { + private static final Logger LOG = LogManager.getLogger(KafkaProducerManager.class); + + private KafkaProducer producer; + private AuditEventSerializer serializer; + private KafkaConfig config; + private KafkaMetrics metrics; + + private final AtomicLong successCount = new AtomicLong(0); + private final AtomicLong failureCount = new AtomicLong(0); + + public KafkaProducerManager(Map configMap) { + this.config = new KafkaConfig(configMap); + this.serializer = new AuditEventSerializer(); + this.metrics = new KafkaMetrics(); + } + + /** + * Initialize Kafka producer + */ + public void init() { + Properties props = new Properties(); + + // Required settings + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + // Performance optimization + props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize()); + props.put(ProducerConfig.LINGER_MS_CONFIG, config.getLingerMs()); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.getCompressionType()); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.getBufferMemory()); + + // Reliability settings + props.put(ProducerConfig.ACKS_CONFIG, config.getAcks()); + props.put(ProducerConfig.RETRIES_CONFIG, config.getRetries()); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, config.getMaxInFlightRequests()); + + // Timeout settings + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.getRequestTimeout()); + props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.getDeliveryTimeout()); + + // Security settings (if configured) + if (config.getSaslMechanism() != null) { + props.put("sasl.mechanism", config.getSaslMechanism()); + } + if (config.getSaslJaasConfig() != null) { + props.put("sasl.jaas.config", config.getSaslJaasConfig()); + } + if (config.getSecurityProtocol() != null) { + props.put("security.protocol", config.getSecurityProtocol()); + } + if (config.getSslTruststoreLocation() != null) { + props.put("ssl.truststore.location", config.getSslTruststoreLocation()); + } + if (config.getSslTruststorePassword() != null) { + props.put("ssl.truststore.password", config.getSslTruststorePassword()); + } + + this.producer = new KafkaProducer<>(props); + + LOG.info("Kafka Producer initialized with config: {}", config); + } + + /** + * Send event asynchronously (recommended for performance) + */ + public void sendAsync(String topic, AuditEvent event) { + try { + String key = event.queryId; // Partitioning key + String value = serializer.serialize(event); + + ProducerRecord record = new ProducerRecord<>(topic, key, value); + + long startTime = System.currentTimeMillis(); + + producer.send(record, (metadata, exception) -> { + long latency = System.currentTimeMillis() - startTime; + metrics.recordLatency(latency); + + if (exception != null) { + failureCount.incrementAndGet(); + metrics.recordFailure(); + LOG.error("Failed to send audit event: queryId={}", event.queryId, exception); + } else { + successCount.incrementAndGet(); + metrics.recordSuccess(); + if (LOG.isDebugEnabled()) { + LOG.debug("Sent event to partition {} offset {}", + metadata.partition(), metadata.offset()); + } + } + }); + + } catch (Exception e) { + failureCount.incrementAndGet(); + metrics.recordFailure(); + LOG.error("Error sending audit event", e); + } + } + + /** + * Send event synchronously (for reliability priority) + */ + public RecordMetadata sendSync(String topic, AuditEvent event) throws Exception { + String key = event.queryId; + String value = serializer.serialize(event); + + ProducerRecord record = new ProducerRecord<>(topic, key, value); + + long startTime = System.currentTimeMillis(); + + try { + RecordMetadata metadata = producer.send(record).get( + config.getRequestTimeout(), TimeUnit.MILLISECONDS + ); + + long latency = System.currentTimeMillis() - startTime; + metrics.recordLatency(latency); + successCount.incrementAndGet(); + metrics.recordSuccess(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Sync sent event to partition {} offset {}", + metadata.partition(), metadata.offset()); + } + + return metadata; + + } catch (Exception e) { + failureCount.incrementAndGet(); + metrics.recordFailure(); + throw e; + } + } + + /** + * Flush pending messages + */ + public void flush() { + if (producer != null) { + producer.flush(); + } + } + + /** + * Close producer and cleanup + */ + public void close() { + if (producer != null) { + producer.flush(); + producer.close(Duration.ofMinutes(10)); + } + + LOG.info("Kafka Producer closed. Success: {}, Failures: {}", + successCount.get(), failureCount.get()); + } + + /** + * Check if producer is healthy + */ + public boolean isHealthy() { + if (producer == null) { + return false; + } + + // Check failure rate (unhealthy if > 10%) + long total = successCount.get() + failureCount.get(); + if (total > 100) { + double failureRate = (double) failureCount.get() / total; + return failureRate < 0.1; + } + + return true; + } + + /** + * Get metrics + */ + public KafkaMetrics getMetrics() { + return metrics; + } + + /** + * Get config + */ + public KafkaConfig getConfig() { + return config; + } +} diff --git a/src/main/java/com/starrocks/plugin/audit/output/KafkaOutputHandler.java b/src/main/java/com/starrocks/plugin/audit/output/KafkaOutputHandler.java new file mode 100644 index 0000000..fba87a9 --- /dev/null +++ b/src/main/java/com/starrocks/plugin/audit/output/KafkaOutputHandler.java @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.plugin.audit.output; + +import com.starrocks.plugin.AuditEvent; +import com.starrocks.plugin.audit.kafka.KafkaProducerManager; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; + +/** + * Kafka output handler + * Sends audit logs to Apache Kafka + */ +public class KafkaOutputHandler implements OutputHandler { + private static final Logger LOG = LogManager.getLogger(KafkaOutputHandler.class); + + private KafkaProducerManager producerManager; + private String topic; + private boolean asyncMode; + + @Override + public void init(Map config) throws Exception { + this.topic = config.getOrDefault("kafka.topic", "starrocks_audit_logs"); + this.asyncMode = Boolean.parseBoolean( + config.getOrDefault("kafka.async_mode", "true") + ); + + this.producerManager = new KafkaProducerManager(config); + this.producerManager.init(); + + LOG.info("KafkaOutputHandler initialized: topic={}, async={}", topic, asyncMode); + } + + @Override + public void send(List events) throws Exception { + if (events == null || events.isEmpty()) { + return; + } + + for (AuditEvent event : events) { + if (asyncMode) { + producerManager.sendAsync(topic, event); + } else { + producerManager.sendSync(topic, event); + } + } + + // Flush if not async to ensure delivery + if (!asyncMode) { + producerManager.flush(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Sent {} events to Kafka topic: {}", events.size(), topic); + } + } + + @Override + public void close() throws Exception { + if (producerManager != null) { + producerManager.close(); + } + LOG.info("KafkaOutputHandler closed"); + } + + @Override + public String getName() { + return "KafkaOutputHandler"; + } + + @Override + public boolean isHealthy() { + return producerManager != null && producerManager.isHealthy(); + } + + /** + * Get producer manager for metrics access + */ + public KafkaProducerManager getProducerManager() { + return producerManager; + } +} diff --git a/src/main/java/com/starrocks/plugin/audit/output/OutputHandler.java b/src/main/java/com/starrocks/plugin/audit/output/OutputHandler.java new file mode 100644 index 0000000..1c89c72 --- /dev/null +++ b/src/main/java/com/starrocks/plugin/audit/output/OutputHandler.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.plugin.audit.output; + +import com.starrocks.plugin.AuditEvent; + +import java.util.List; +import java.util.Map; + +/** + * Interface for audit log output handlers. + * Abstracts different output destinations (StarRocks, Kafka, etc.) + */ +public interface OutputHandler { + + /** + * Initialize the handler with configuration + * @param config Configuration map + * @throws Exception if initialization fails + */ + void init(Map config) throws Exception; + + /** + * Send a batch of events + * @param events List of events to send + * @throws Exception if sending fails + */ + void send(List events) throws Exception; + + /** + * Close resources and cleanup + * @throws Exception if closing fails + */ + void close() throws Exception; + + /** + * Get handler name + * @return Handler name + */ + String getName(); + + /** + * Check if handler is healthy + * @return true if healthy, false otherwise + */ + boolean isHealthy(); +} diff --git a/src/main/java/com/starrocks/plugin/audit/output/StreamLoadOutputHandler.java b/src/main/java/com/starrocks/plugin/audit/output/StreamLoadOutputHandler.java new file mode 100644 index 0000000..e1e13f4 --- /dev/null +++ b/src/main/java/com/starrocks/plugin/audit/output/StreamLoadOutputHandler.java @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.plugin.audit.output; + +import com.starrocks.plugin.AuditEvent; +import com.starrocks.plugin.audit.AuditLoaderPlugin; +import com.starrocks.plugin.audit.StarrocksStreamLoader; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.lang.reflect.Field; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +/** + * Stream Load output handler + * Wraps existing StarRocks Stream Load functionality + */ +public class StreamLoadOutputHandler implements OutputHandler { + private static final Logger LOG = LogManager.getLogger(StreamLoadOutputHandler.class); + private static final SimpleDateFormat DATETIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + private StarrocksStreamLoader streamLoader; + private StringBuilder auditBuffer; + private boolean candidateMvsExists; + private boolean hitMVsExists; + private int maxStmtLength; + private int qeSlowLogMs; + + @Override + public void init(Map config) throws Exception { + // Create AuditLoaderConf from config + AuditLoaderPlugin.AuditLoaderConf conf = new AuditLoaderPlugin.AuditLoaderConf(); + conf.init(config); + + this.streamLoader = new StarrocksStreamLoader(conf); + this.auditBuffer = new StringBuilder(); + this.maxStmtLength = conf.maxStmtLength; + this.qeSlowLogMs = conf.qeSlowLogMs; + + // Check if new fields exist + this.candidateMvsExists = hasField(AuditEvent.class, "candidateMvs"); + this.hitMVsExists = hasField(AuditEvent.class, "hitMVs"); + + LOG.info("StreamLoadOutputHandler initialized"); + } + + @Override + public void send(List events) throws Exception { + if (events == null || events.isEmpty()) { + return; + } + + // Assemble events into JSON array + for (AuditEvent event : events) { + assembleAudit(event); + } + + // Send batch + if (auditBuffer.length() > 0) { + try { + StarrocksStreamLoader.LoadResponse response = streamLoader.loadBatch(auditBuffer); + if (LOG.isDebugEnabled()) { + LOG.debug("Stream load response: {}", response); + } + } finally { + // Clear buffer for next batch + auditBuffer = new StringBuilder(); + } + } + } + + @Override + public void close() throws Exception { + LOG.info("StreamLoadOutputHandler closed"); + } + + @Override + public String getName() { + return "StreamLoadOutputHandler"; + } + + @Override + public boolean isHealthy() { + return streamLoader != null; + } + + /** + * Assemble audit event to JSON format (same logic as original plugin) + */ + private void assembleAudit(AuditEvent event) { + String queryType = getQueryType(event); + int isQuery = event.isQuery ? 1 : 0; + + String candidateMvsVal = candidateMvsExists ? getFieldValue(event, "candidateMvs", "") : ""; + String hitMVsVal = hitMVsExists ? getFieldValue(event, "hitMVs", "") : ""; + + String content = "{\"queryId\":\"" + getQueryId(queryType, event) + "\"," + + "\"timestamp\":\"" + longToTimeString(event.timestamp) + "\"," + + "\"queryType\":\"" + queryType + "\"," + + "\"clientIp\":\"" + escapeJson(event.clientIp) + "\"," + + "\"user\":\"" + escapeJson(event.user) + "\"," + + "\"authorizedUser\":\"" + escapeJson(event.authorizedUser) + "\"," + + "\"resourceGroup\":\"" + escapeJson(event.resourceGroup) + "\"," + + "\"catalog\":\"" + escapeJson(event.catalog) + "\"," + + "\"db\":\"" + escapeJson(event.db) + "\"," + + "\"state\":\"" + escapeJson(event.state) + "\"," + + "\"errorCode\":\"" + escapeJson(event.errorCode) + "\"," + + "\"queryTime\":" + event.queryTime + "," + + "\"scanBytes\":" + event.scanBytes + "," + + "\"scanRows\":" + event.scanRows + "," + + "\"returnRows\":" + event.returnRows + "," + + "\"cpuCostNs\":" + event.cpuCostNs + "," + + "\"memCostBytes\":" + event.memCostBytes + "," + + "\"stmtId\":" + event.stmtId + "," + + "\"isQuery\":" + isQuery + "," + + "\"feIp\":\"" + escapeJson(event.feIp) + "\"," + + "\"stmt\":\"" + escapeJson(truncateByBytes(event.stmt)) + "\"," + + "\"digest\":\"" + escapeJson(event.digest) + "\"," + + "\"planCpuCosts\":" + event.planCpuCosts + "," + + "\"planMemCosts\":" + event.planMemCosts + "," + + "\"pendingTimeMs\":" + event.pendingTimeMs + "," + + "\"candidateMVs\":\"" + escapeJson(candidateMvsVal) + "\"," + + "\"hitMvs\":\"" + escapeJson(hitMVsVal) + "\"," + + "\"warehouse\":\"" + escapeJson(event.warehouse) + "\"}"; + + if (auditBuffer.length() > 0) { + auditBuffer.append(","); + } + auditBuffer.append(content); + } + + private String getQueryId(String prefix, AuditEvent event) { + return (Objects.isNull(event.queryId) || event.queryId.isEmpty()) ? + prefix + "-" + UUID.randomUUID() : event.queryId; + } + + private String getQueryType(AuditEvent event) { + try { + switch (event.type) { + case CONNECTION: + return "connection"; + case DISCONNECTION: + return "disconnection"; + default: + return (event.queryTime > qeSlowLogMs) ? "slow_query" : "query"; + } + } catch (Exception e) { + return (event.queryTime > qeSlowLogMs) ? "slow_query" : "query"; + } + } + + private String truncateByBytes(String str) { + if (str == null) { + return ""; + } + int maxLen = Math.min(maxStmtLength, str.getBytes().length); + if (maxLen >= str.getBytes().length) { + return str; + } + // Simple truncation (original has more complex logic) + return str.substring(0, Math.min(str.length(), maxStmtLength)); + } + + private String longToTimeString(long timeStamp) { + if (timeStamp <= 0L) { + return DATETIME_FORMAT.format(new Date()); + } + return DATETIME_FORMAT.format(new Date(timeStamp)); + } + + private String escapeJson(String str) { + if (str == null) { + return ""; + } + return str.replace("\\", "\\\\") + .replace("\"", "\\\"") + .replace("\n", "\\n") + .replace("\r", "\\r") + .replace("\t", "\\t"); + } + + private boolean hasField(Class clazz, String fieldName) { + Field[] fields = clazz.getDeclaredFields(); + for (Field field : fields) { + if (field.getName().equals(fieldName)) { + return true; + } + } + return false; + } + + private String getFieldValue(AuditEvent event, String fieldName, String defaultValue) { + try { + Field field = AuditEvent.class.getDeclaredField(fieldName); + field.setAccessible(true); + Object value = field.get(event); + return value != null ? value.toString() : defaultValue; + } catch (Exception e) { + return defaultValue; + } + } +} diff --git a/src/main/java/com/starrocks/plugin/audit/routing/OutputRouter.java b/src/main/java/com/starrocks/plugin/audit/routing/OutputRouter.java new file mode 100644 index 0000000..d83d5ca --- /dev/null +++ b/src/main/java/com/starrocks/plugin/audit/routing/OutputRouter.java @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.plugin.audit.routing; + +import com.starrocks.plugin.AuditEvent; +import com.starrocks.plugin.audit.output.OutputHandler; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +/** + * Router for distributing events to multiple output handlers + */ +public class OutputRouter { + private static final Logger LOG = LogManager.getLogger(OutputRouter.class); + + private List handlers; + private RoutingMode mode; + + /** + * Routing modes + */ + public enum RoutingMode { + SINGLE, // Use only one handler + DUAL, // Use all handlers in parallel + FALLBACK // Use primary, fallback to secondary on failure + } + + public OutputRouter(RoutingMode mode) { + this.mode = mode; + this.handlers = new ArrayList<>(); + } + + /** + * Add an output handler + */ + public void addHandler(OutputHandler handler) { + handlers.add(handler); + LOG.info("Added output handler: {}", handler.getName()); + } + + /** + * Route events to handlers based on mode + */ + public void route(List events) { + if (events == null || events.isEmpty()) { + return; + } + + switch (mode) { + case SINGLE: + routeSingle(events); + break; + case DUAL: + routeDual(events); + break; + case FALLBACK: + routeFallback(events); + break; + } + } + + /** + * Route to first handler only + */ + private void routeSingle(List events) { + if (handlers.isEmpty()) { + LOG.warn("No handlers configured"); + return; + } + + OutputHandler handler = handlers.get(0); + try { + handler.send(events); + } catch (Exception e) { + LOG.error("Failed to send events via {}", handler.getName(), e); + } + } + + /** + * Route to all handlers in parallel + */ + private void routeDual(List events) { + int successCount = 0; + int failureCount = 0; + + for (OutputHandler handler : handlers) { + try { + handler.send(events); + successCount++; + } catch (Exception e) { + failureCount++; + LOG.error("Failed to send events via {}", handler.getName(), e); + // Continue to next handler + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Dual routing completed: {} succeeded, {} failed", + successCount, failureCount); + } + } + + /** + * Route to handlers with fallback + * Try primary first, fallback to secondary on failure + */ + private void routeFallback(List events) { + for (int i = 0; i < handlers.size(); i++) { + OutputHandler handler = handlers.get(i); + try { + handler.send(events); + if (i > 0) { + LOG.info("Successfully sent via fallback handler: {}", handler.getName()); + } + return; // Success, stop trying + } catch (Exception e) { + LOG.warn("Failed to send via {}, trying next handler", + handler.getName(), e); + } + } + + LOG.error("All handlers failed to send events"); + } + + /** + * Close all handlers + */ + public void close() { + for (OutputHandler handler : handlers) { + try { + handler.close(); + } catch (Exception e) { + LOG.error("Error closing handler: {}", handler.getName(), e); + } + } + } + + /** + * Check if any handler is healthy + */ + public boolean isHealthy() { + for (OutputHandler handler : handlers) { + if (handler.isHealthy()) { + return true; + } + } + return false; + } + + /** + * Get all handlers + */ + public List getHandlers() { + return handlers; + } + + /** + * Get routing mode + */ + public RoutingMode getMode() { + return mode; + } +}