From f59016b28eedd75a735e07f9c7df3897c8bc2766 Mon Sep 17 00:00:00 2001 From: Marjori24 Date: Fri, 29 Nov 2024 11:00:35 -0500 Subject: [PATCH 1/8] Create QueryEventData.java Signed-off-by: Marjori Martinez --- .../plugin/audit/model/QueryEventData.java | 147 ++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 src/main/java/com/starrocks/plugin/audit/model/QueryEventData.java diff --git a/src/main/java/com/starrocks/plugin/audit/model/QueryEventData.java b/src/main/java/com/starrocks/plugin/audit/model/QueryEventData.java new file mode 100644 index 0000000..b820ed4 --- /dev/null +++ b/src/main/java/com/starrocks/plugin/audit/model/QueryEventData.java @@ -0,0 +1,147 @@ +package com.starrocks.plugin.audit.model; + +public class QueryEventData { + + public String id; + public String instanceName; + public String timestamp; + public String queryType; + public String clientIp; + public String user; + public String authorizedUser; + public String resourceGroup; + public String catalog; + public String db; + public String state; + public String errorCode; + public long queryTime; + public long scanBytes; + public long scanRows; + public long returnRows; + public long cpuCostNs; + public long memCostBytes; + public long stmtId; + public boolean isQuery; + public String feIp; + public String stmt; + public String digest; + public double planCpuCosts; + public double planMemCosts; + public String candidateMvs; + public String hitMVs; + + public QueryEventData() { + + } + + public void setId(String id) { + this.id = id; + } + + public void setInstanceName(String instanceName) { + this.instanceName = instanceName; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public void setQueryType(String queryType) { + this.queryType = queryType; + } + + public void setClientIp(String clientIp) { + this.clientIp = clientIp; + } + + public void setUser(String user) { + this.user = user; + } + + public void setAuthorizedUser(String authorizedUser) { + this.authorizedUser = authorizedUser; + } + + public void setResourceGroup(String resourceGroup) { + this.resourceGroup = resourceGroup; + } + + public void setCatalog(String catalog) { + this.catalog = catalog; + } + + public void setDb(String db) { + this.db = db; + } + + public void setState(String state) { + this.state = state; + } + + public void setErrorCode(String errorCode) { + this.errorCode = errorCode; + } + + public void setQueryTime(long queryTime) { + this.queryTime = queryTime; + } + + public void setScanBytes(long scanBytes) { + this.scanBytes = scanBytes; + } + + public void setScanRows(long scanRows) { + this.scanRows = scanRows; + } + + public void setReturnRows(long returnRows) { + this.returnRows = returnRows; + } + + public void setCpuCostNs(long cpuCostNs) { + this.cpuCostNs = cpuCostNs; + } + + public void setMemCostBytes(long memCostBytes) { + this.memCostBytes = memCostBytes; + } + + public void setStmtId(long stmtId) { + this.stmtId = stmtId; + } + + public void setIsQuery(boolean isQuery) { + this.isQuery = isQuery; + } + + public void setFeIp(String feIp) { + this.feIp = feIp; + } + + public void setStmt(String stmt) { + this.stmt = stmt; + } + + public void setDigest(String digest) { + this.digest = digest; + } + + public void setPlanCpuCosts(double planCpuCosts) { + this.planCpuCosts = planCpuCosts; + } + + public void setPlanMemCosts(double planMemCosts) { + this.planMemCosts = planMemCosts; + } + + public void setCandidateMvs(String candidateMvs) { + this.candidateMvs = candidateMvs; + } + + public void setHitMVs(String hitMVs) { + this.hitMVs = hitMVs; + } + + + +} From a4eece0dd13b414e5f5e2044a9b1ffa759df4426 Mon Sep 17 00:00:00 2001 From: Marjori24 Date: Fri, 29 Nov 2024 11:07:40 -0500 Subject: [PATCH 2/8] Update AuditLoaderPlugin.java Signed-off-by: Marjori Martinez --- .../plugin/audit/AuditLoaderPlugin.java | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java b/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java index 3007828..e0d1748 100644 --- a/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java +++ b/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java @@ -52,6 +52,16 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +//ADDED PUMA +import java.util.concurrent.Future; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.starrocks.plugin.audit.model.QueryEventData; + /* * This plugin will load audit log to specified starrocks table at specified interval */ @@ -329,6 +339,11 @@ public static class AuditLoaderConf { public static final String PROP_SECRET_KEY = "secret_key"; public String secretKey = ""; + //ADDED PUMA + public static String PUMA_KAFKA_BOOTSTRAP_SERVERS_CONFIG = "fe.plugins.auditloader.bootstrapServer"; + public static String PUMA_KAFKA_TOPIC = "fe.plugins.auditloader.kafkaTopic"; + public static String PUMA_KAFKA_INSTANCE_NAME = "fe.plugins.auditloader.instanceName"; + public void init(Map properties) throws PluginException { try { if (properties.containsKey(PROP_MAX_BATCH_SIZE)) { @@ -373,6 +388,16 @@ public void init(Map properties) throws PluginException { if (properties.containsKey(STREAM_LOAD_FILTER)) { streamLoadFilter = properties.get(STREAM_LOAD_FILTER); } + //ADDED PUMA + if (properties.containsKey(PUMA_KAFKA_BOOTSTRAP_SERVERS_CONFIG)) { + PUMA_KAFKA_BOOTSTRAP_SERVERS_CONFIG = properties.get(PUMA_KAFKA_BOOTSTRAP_SERVERS_CONFIG); + } + if (properties.containsKey(PUMA_KAFKA_TOPIC)) { + PUMA_KAFKA_TOPIC = properties.get(PUMA_KAFKA_TOPIC); + } + if (properties.containsKey(PUMA_KAFKA_INSTANCE_NAME)) { + PUMA_KAFKA_INSTANCE_NAME = properties.get(PUMA_KAFKA_INSTANCE_NAME); + } } catch (Exception e) { throw new PluginException(e.getMessage()); } @@ -392,6 +417,8 @@ public void run() { AuditEvent event = auditEventQueue.poll(5, TimeUnit.SECONDS); if (event != null) { assembleAudit(event); + //ADDED PUMA + sendToKafka(event); } loadIfNecessary(loader); } catch (InterruptedException ie) { @@ -410,4 +437,87 @@ public static synchronized String longToTimeString(long timeStamp) { return DATETIME_FORMAT.format(new Date(timeStamp)); } + //ADDED PUMA + + public void sendToKafka(AuditEvent event){ + + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", AuditLoaderConf.PUMA_KAFKA_BOOTSTRAP_SERVERS_CONFIG); + properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.setProperty("max.request.size", "52428800"); + properties.setProperty("buffer.memory", "36700160"); + properties.setProperty("max.block.ms", "180000"); + properties.setProperty("batch.size", "102400"); + properties.setProperty("compression.type", "snappy"); + properties.setProperty("linger.ms", "20"); + properties.setProperty("security.protocol","SASL_SSL"); + properties.setProperty("sasl.mechanism","AWS_MSK_IAM"); + properties.setProperty("sasl.jaas.config","software.amazon.msk.auth.iam.IAMLoginModule required;"); + properties.setProperty("sasl.client.callback.handler.class","software.amazon.msk.auth.iam.IAMClientCallbackHandler"); + + String queryType = getQueryType(event); + String eventAuditId = getQueryId(queryType,event); + + QueryEventData eventAuditEG = new QueryEventData(); + eventAuditEG.setId(eventAuditId); + eventAuditEG.setInstanceName(AuditLoaderConf.PUMA_KAFKA_INSTANCE_NAME); + eventAuditEG.setTimestamp(longToTimeString(event.timestamp)); + eventAuditEG.setQueryType(queryType); + eventAuditEG.setClientIp(event.clientIp); + eventAuditEG.setUser(event.user); + eventAuditEG.setAuthorizedUser(event.authorizedUser); + eventAuditEG.setResourceGroup(event.resourceGroup); + eventAuditEG.setCatalog(event.catalog); + eventAuditEG.setDb(event.db); + eventAuditEG.setState(event.state); + eventAuditEG.setErrorCode(event.errorCode); + eventAuditEG.setQueryTime(event.queryTime); + eventAuditEG.setScanBytes(event.scanBytes); + eventAuditEG.setScanRows(event.scanRows); + eventAuditEG.setReturnRows(event.returnRows); + eventAuditEG.setCpuCostNs(event.cpuCostNs); + eventAuditEG.setMemCostBytes(event.memCostBytes); + eventAuditEG.setStmtId(event.stmtId); + eventAuditEG.setIsQuery(event.isQuery ? true : false); + eventAuditEG.setFeIp(event.feIp); + eventAuditEG.setStmt(truncateByBytes(event.stmt)); + // Compute digest for all queries + if (conf.enableComputeAllQueryDigest && (event.digest == null || StringUtils.isBlank(event.digest))) { + event.digest = computeStatementDigest(event.stmt); + LOG.debug("compute stmt digest, queryId: {} digest: {}", event.queryId, event.digest); + } + eventAuditEG.setDigest(event.digest); + eventAuditEG.setPlanCpuCosts(event.planCpuCosts); + eventAuditEG.setPlanMemCosts(event.planMemCosts); + eventAuditEG.setCandidateMvs(event.candidateMvs); + eventAuditEG.setHitMVs(event.hitMVs); + + ObjectMapper mapperEventAuditEG = new ObjectMapper(); + mapperEventAuditEG.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + + try { + Producer producer = new KafkaProducer<>(properties); + Future res = producer.send( + new ProducerRecord<>( + AuditLoaderConf.PUMA_KAFKA_TOPIC, + eventAuditId, + mapperEventAuditEG.writeValueAsString(eventAuditEG))); + try { + RecordMetadata metadata = res.get(); + if (metadata.hasOffset()){ + LOG.info("Query created event with id: " + eventAuditId + " in partition: "+ String.valueOf(metadata.partition()) + " with offset: " + metadata.offset()); + } else { + LOG.error("Query created event with id: " + eventAuditId + " doesn't have offset. It wasn't sent to the topic. "); + } + } catch (InterruptedException | ExecutionException e) { + LOG.error(String.format("Query id: "+ eventAuditId + " Not written to kafka topic - Error of interrupted execution on sendToKafka method: %s", e.getMessage())); + } + producer.close(); + } catch (Exception e) { + LOG.error(String.format("Error on sending to kafka: %s", e.getMessage())); + } + + } + } From 38da64fc290a6402fe1d811d97c7d354c5f7ee72 Mon Sep 17 00:00:00 2001 From: Marjori24 Date: Fri, 29 Nov 2024 11:09:09 -0500 Subject: [PATCH 3/8] Update plugin.conf Signed-off-by: Marjori Martinez --- src/main/assembly/plugin.conf | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/main/assembly/plugin.conf b/src/main/assembly/plugin.conf index fbb54de..2235533 100644 --- a/src/main/assembly/plugin.conf +++ b/src/main/assembly/plugin.conf @@ -56,3 +56,11 @@ enable_compute_all_query_digest=false # Filter conditions when importing audit information filter= + + +### ADD PUMA +plugin.name={{ event_listener_name }} +fe.plugins.auditloader.instanceName={{ instance_name }} +fe.plugins.auditloader.kafkaTopic={{ kafka_query_event_topic }} +fe.plugins.auditloader.enableMSK={{ enable_msk_cluster }} +fe.plugins.auditloader.bootstrapServer={{ kafka_bootstrap_server }} From a6fb61e120b50e19d36790721061e5deb964b4dc Mon Sep 17 00:00:00 2001 From: Marjori24 Date: Fri, 29 Nov 2024 11:12:40 -0500 Subject: [PATCH 4/8] Update pom.xml Signed-off-by: Marjori Martinez --- pom.xml | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4345210..3fd4bbf 100644 --- a/pom.xml +++ b/pom.xml @@ -11,6 +11,13 @@ 2.24.1 github + + 3.3.2 + 214 + 1.1.5 + UTF-8 + 11 + 2.18.0 @@ -48,6 +55,27 @@ commons-codec 1.17.1 + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + io.airlift + log + ${airlift.version} + + + software.amazon.msk + aws-msk-iam-auth + ${aws-msk-iam-auth.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + @@ -95,4 +123,4 @@ - \ No newline at end of file + From becb0333d4b332b4a9a3344921b8e43bb1767ed6 Mon Sep 17 00:00:00 2001 From: Marjori24 Date: Mon, 2 Dec 2024 14:01:01 -0500 Subject: [PATCH 5/8] Update plugin.conf Signed-off-by: Marjori Martinez --- src/main/assembly/plugin.conf | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/main/assembly/plugin.conf b/src/main/assembly/plugin.conf index 2235533..a76a0d9 100644 --- a/src/main/assembly/plugin.conf +++ b/src/main/assembly/plugin.conf @@ -58,9 +58,16 @@ enable_compute_all_query_digest=false filter= -### ADD PUMA -plugin.name={{ event_listener_name }} -fe.plugins.auditloader.instanceName={{ instance_name }} -fe.plugins.auditloader.kafkaTopic={{ kafka_query_event_topic }} -fe.plugins.auditloader.enableMSK={{ enable_msk_cluster }} -fe.plugins.auditloader.bootstrapServer={{ kafka_bootstrap_server }} +### kafka configuration + +# Set to True if you want to enable Kafka process +fe.plugins.auditloader.enableMSK= + +# Name of the current Starrocks instance. +fe.plugins.auditloader.instanceName= + +# The name of the kafka topic that data will be written. +fe.plugins.auditloader.kafkaTopic= + +# String with all bootstrap servers related to the Kafka cluster. +fe.plugins.auditloader.bootstrapServer= From 71106e662c0d85c0ddce42a3acf301a6308df6f6 Mon Sep 17 00:00:00 2001 From: Marjori24 Date: Mon, 2 Dec 2024 15:40:12 -0500 Subject: [PATCH 6/8] Update plugin.conf Signed-off-by: Marjori Martinez --- src/main/assembly/plugin.conf | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/main/assembly/plugin.conf b/src/main/assembly/plugin.conf index a76a0d9..50d9cc7 100644 --- a/src/main/assembly/plugin.conf +++ b/src/main/assembly/plugin.conf @@ -61,13 +61,27 @@ filter= ### kafka configuration # Set to True if you want to enable Kafka process -fe.plugins.auditloader.enableMSK= +kafka_enableMSK=false -# Name of the current Starrocks instance. -fe.plugins.auditloader.instanceName= +# Name of current Starrocks instance/cluster +kafka_instanceName= -# The name of the kafka topic that data will be written. -fe.plugins.auditloader.kafkaTopic= +# Name of the kafka topic that data will be written. +kafka_topic= # String with all bootstrap servers related to the Kafka cluster. -fe.plugins.auditloader.bootstrapServer= +kafka_bootstrapServer= + +# Properties for kafka producer +kafka_conf_keySerializer=org.apache.kafka.common.serialization.StringSerializer +kafka_conf_valueSerializer=org.apache.kafka.common.serialization.StringSerializer +kafka_conf_maxRequestSize=52428800 +kafka_conf_bufferMemory=36700160 +kafka_conf_maxBlockMs=180000 +kafka_conf_batchSize=102400 +kafka_conf_compressionType=snappy +kafka_conf_lingerMs=20 +kafka_conf_securityProtocol=SASL_SSL +kafka_conf_saslMechanism=AWS_MSK_IAM +kafka_conf_saslJaasConfig=software.amazon.msk.auth.iam.IAMLoginModule required; +kafka_conf_saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMClientCallbackHandler From 90a6a2628772108314eb93b782d767c29a1b3669 Mon Sep 17 00:00:00 2001 From: Marjori24 Date: Mon, 2 Dec 2024 15:41:07 -0500 Subject: [PATCH 7/8] Update AuditLoaderPlugin.java Signed-off-by: Marjori Martinez --- .../plugin/audit/AuditLoaderPlugin.java | 114 +++++++++++++----- 1 file changed, 82 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java b/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java index e0d1748..03ffe50 100644 --- a/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java +++ b/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java @@ -52,7 +52,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; -//ADDED PUMA import java.util.concurrent.Future; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.KafkaProducer; @@ -61,6 +60,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.starrocks.plugin.audit.model.QueryEventData; +import java.util.concurrent.ExecutionException; /* * This plugin will load audit log to specified starrocks table at specified interval @@ -80,6 +80,8 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { private volatile boolean isClosed = false; private volatile boolean isInit = false; + public static boolean kafkaEnable = false; + /** * 是否包含新字段 candidateMvs,如果旧版本没有该字段则值为空 */ @@ -339,10 +341,22 @@ public static class AuditLoaderConf { public static final String PROP_SECRET_KEY = "secret_key"; public String secretKey = ""; - //ADDED PUMA - public static String PUMA_KAFKA_BOOTSTRAP_SERVERS_CONFIG = "fe.plugins.auditloader.bootstrapServer"; - public static String PUMA_KAFKA_TOPIC = "fe.plugins.auditloader.kafkaTopic"; - public static String PUMA_KAFKA_INSTANCE_NAME = "fe.plugins.auditloader.instanceName"; + public static String PROP_KAFKA_ENABLE = "kafka_enableMSK"; + public static String PROP_KAFKA_INSTANCE_NAME = "kafka_instanceName"; + public static String PROP_KAFKA_TOPIC = "kafka_topic"; + public static String PROP_KAFKA_BOOTSTRAP_SERVERS = "kafka_bootstrapServer"; + public static String PROP_KAFKA_CONF_KEYSERIALIZER = "kafka_conf_keySerializer"; + public static String PROP_KAFKA_CONF_VALUESERIALIZER = "kafka_conf_valueSerializer"; + public static String PROP_KAFKA_CONF_MAXREQUESTSIZE = "kafka_conf_maxRequestSize"; + public static String PROP_KAFKA_CONF_BUFFERMEMORY = "kafka_conf_bufferMemory"; + public static String PROP_KAFKA_CONF_MAXBLOCKMS = "kafka_conf_maxBlockMs"; + public static String PROP_KAFKA_CONF_BATCHSIZE = "kafka_conf_batchSize"; + public static String PROP_KAFKA_CONF_COMPRESSIONTYPE = "kafka_conf_compressionType"; + public static String PROP_KAFKA_CONF_LINGERMS = "kafka_conf_lingerMs"; + public static String PROP_KAFKA_CONF_SECURITYPROTOCOL = "kafka_conf_securityProtocol"; + public static String PROP_KAFKA_CONF_SASLMECHANISM = "kafka_conf_saslMechanism"; + public static String PROP_KAFKA_CONF_SASLJAASCONFIG = "kafka_conf_saslJaasConfig"; + public static String PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS = "kafka_conf_saslClientCallbackHandlerClass"; public void init(Map properties) throws PluginException { try { @@ -388,15 +402,53 @@ public void init(Map properties) throws PluginException { if (properties.containsKey(STREAM_LOAD_FILTER)) { streamLoadFilter = properties.get(STREAM_LOAD_FILTER); } - //ADDED PUMA - if (properties.containsKey(PUMA_KAFKA_BOOTSTRAP_SERVERS_CONFIG)) { - PUMA_KAFKA_BOOTSTRAP_SERVERS_CONFIG = properties.get(PUMA_KAFKA_BOOTSTRAP_SERVERS_CONFIG); + if (properties.containsKey(PROP_KAFKA_ENABLE)) { + kafkaEnable = Boolean.parseBoolean(properties.get(PROP_KAFKA_ENABLE)); + } + if (properties.containsKey(PROP_KAFKA_INSTANCE_NAME)) { + PROP_KAFKA_INSTANCE_NAME = properties.get(PROP_KAFKA_INSTANCE_NAME); + } + if (properties.containsKey(PROP_KAFKA_TOPIC)) { + PROP_KAFKA_TOPIC = properties.get(PROP_KAFKA_TOPIC); + } + if (properties.containsKey(PROP_KAFKA_BOOTSTRAP_SERVERS)) { + PROP_KAFKA_BOOTSTRAP_SERVERS = properties.get(PROP_KAFKA_BOOTSTRAP_SERVERS); + } + if (properties.containsKey(PROP_KAFKA_CONF_KEYSERIALIZER)) { + PROP_KAFKA_CONF_KEYSERIALIZER = properties.get(PROP_KAFKA_CONF_KEYSERIALIZER); + } + if (properties.containsKey(PROP_KAFKA_CONF_VALUESERIALIZER)) { + PROP_KAFKA_CONF_VALUESERIALIZER = properties.get(PROP_KAFKA_CONF_VALUESERIALIZER); + } + if (properties.containsKey(PROP_KAFKA_CONF_MAXREQUESTSIZE)) { + PROP_KAFKA_CONF_MAXREQUESTSIZE = properties.get(PROP_KAFKA_CONF_MAXREQUESTSIZE); + } + if (properties.containsKey(PROP_KAFKA_CONF_BUFFERMEMORY)) { + PROP_KAFKA_CONF_BUFFERMEMORY = properties.get(PROP_KAFKA_CONF_BUFFERMEMORY); } - if (properties.containsKey(PUMA_KAFKA_TOPIC)) { - PUMA_KAFKA_TOPIC = properties.get(PUMA_KAFKA_TOPIC); + if (properties.containsKey(PROP_KAFKA_CONF_MAXBLOCKMS)) { + PROP_KAFKA_CONF_MAXBLOCKMS = properties.get(PROP_KAFKA_CONF_MAXBLOCKMS); } - if (properties.containsKey(PUMA_KAFKA_INSTANCE_NAME)) { - PUMA_KAFKA_INSTANCE_NAME = properties.get(PUMA_KAFKA_INSTANCE_NAME); + if (properties.containsKey(PROP_KAFKA_CONF_BATCHSIZE)) { + PROP_KAFKA_CONF_BATCHSIZE = properties.get(PROP_KAFKA_CONF_BATCHSIZE); + } + if (properties.containsKey(PROP_KAFKA_CONF_COMPRESSIONTYPE)) { + PROP_KAFKA_CONF_COMPRESSIONTYPE = properties.get(PROP_KAFKA_CONF_COMPRESSIONTYPE); + } + if (properties.containsKey(PROP_KAFKA_CONF_LINGERMS)) { + PROP_KAFKA_CONF_LINGERMS = properties.get(PROP_KAFKA_CONF_LINGERMS); + } + if (properties.containsKey(PROP_KAFKA_CONF_SECURITYPROTOCOL)) { + PROP_KAFKA_CONF_SECURITYPROTOCOL = properties.get(PROP_KAFKA_CONF_SECURITYPROTOCOL); + } + if (properties.containsKey(PROP_KAFKA_CONF_SASLMECHANISM)) { + PROP_KAFKA_CONF_SASLMECHANISM = properties.get(PROP_KAFKA_CONF_SASLMECHANISM); + } + if (properties.containsKey(PROP_KAFKA_CONF_SASLJAASCONFIG)) { + PROP_KAFKA_CONF_SASLJAASCONFIG = properties.get(PROP_KAFKA_CONF_SASLJAASCONFIG); + } + if (properties.containsKey(PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS)) { + PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS = properties.get(PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS); } } catch (Exception e) { throw new PluginException(e.getMessage()); @@ -417,8 +469,9 @@ public void run() { AuditEvent event = auditEventQueue.poll(5, TimeUnit.SECONDS); if (event != null) { assembleAudit(event); - //ADDED PUMA - sendToKafka(event); + if (kafkaEnable) { + sendToKafka(event); + } } loadIfNecessary(loader); } catch (InterruptedException ie) { @@ -437,31 +490,29 @@ public static synchronized String longToTimeString(long timeStamp) { return DATETIME_FORMAT.format(new Date(timeStamp)); } - //ADDED PUMA - public void sendToKafka(AuditEvent event){ Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", AuditLoaderConf.PUMA_KAFKA_BOOTSTRAP_SERVERS_CONFIG); - properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - properties.setProperty("max.request.size", "52428800"); - properties.setProperty("buffer.memory", "36700160"); - properties.setProperty("max.block.ms", "180000"); - properties.setProperty("batch.size", "102400"); - properties.setProperty("compression.type", "snappy"); - properties.setProperty("linger.ms", "20"); - properties.setProperty("security.protocol","SASL_SSL"); - properties.setProperty("sasl.mechanism","AWS_MSK_IAM"); - properties.setProperty("sasl.jaas.config","software.amazon.msk.auth.iam.IAMLoginModule required;"); - properties.setProperty("sasl.client.callback.handler.class","software.amazon.msk.auth.iam.IAMClientCallbackHandler"); + properties.setProperty("bootstrap.servers", AuditLoaderConf.PROP_KAFKA_BOOTSTRAP_SERVERS); + properties.setProperty("key.serializer", AuditLoaderConf.PROP_KAFKA_CONF_KEYSERIALIZER); + properties.setProperty("value.serializer", AuditLoaderConf.PROP_KAFKA_CONF_VALUESERIALIZER); + properties.setProperty("max.request.size", AuditLoaderConf.PROP_KAFKA_CONF_MAXREQUESTSIZE); + properties.setProperty("buffer.memory", AuditLoaderConf.PROP_KAFKA_CONF_BUFFERMEMORY); + properties.setProperty("max.block.ms", AuditLoaderConf.PROP_KAFKA_CONF_MAXBLOCKMS); + properties.setProperty("batch.size", AuditLoaderConf.PROP_KAFKA_CONF_BATCHSIZE); + properties.setProperty("compression.type", AuditLoaderConf.PROP_KAFKA_CONF_COMPRESSIONTYPE); + properties.setProperty("linger.ms", AuditLoaderConf.PROP_KAFKA_CONF_LINGERMS); + properties.setProperty("security.protocol",AuditLoaderConf.PROP_KAFKA_CONF_SECURITYPROTOCOL); + properties.setProperty("sasl.mechanism",AuditLoaderConf.PROP_KAFKA_CONF_SASLMECHANISM); + properties.setProperty("sasl.jaas.config",AuditLoaderConf.PROP_KAFKA_CONF_SASLJAASCONFIG); + properties.setProperty("sasl.client.callback.handler.class",AuditLoaderConf.PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS); String queryType = getQueryType(event); String eventAuditId = getQueryId(queryType,event); QueryEventData eventAuditEG = new QueryEventData(); eventAuditEG.setId(eventAuditId); - eventAuditEG.setInstanceName(AuditLoaderConf.PUMA_KAFKA_INSTANCE_NAME); + eventAuditEG.setInstanceName(AuditLoaderConf.PROP_KAFKA_INSTANCE_NAME); eventAuditEG.setTimestamp(longToTimeString(event.timestamp)); eventAuditEG.setQueryType(queryType); eventAuditEG.setClientIp(event.clientIp); @@ -482,7 +533,6 @@ public void sendToKafka(AuditEvent event){ eventAuditEG.setIsQuery(event.isQuery ? true : false); eventAuditEG.setFeIp(event.feIp); eventAuditEG.setStmt(truncateByBytes(event.stmt)); - // Compute digest for all queries if (conf.enableComputeAllQueryDigest && (event.digest == null || StringUtils.isBlank(event.digest))) { event.digest = computeStatementDigest(event.stmt); LOG.debug("compute stmt digest, queryId: {} digest: {}", event.queryId, event.digest); @@ -500,7 +550,7 @@ public void sendToKafka(AuditEvent event){ Producer producer = new KafkaProducer<>(properties); Future res = producer.send( new ProducerRecord<>( - AuditLoaderConf.PUMA_KAFKA_TOPIC, + AuditLoaderConf.PROP_KAFKA_TOPIC, eventAuditId, mapperEventAuditEG.writeValueAsString(eventAuditEG))); try { From 8fee8e5b023f6f7fa259caeb6b739529316a0cfc Mon Sep 17 00:00:00 2001 From: Marjori24 Date: Mon, 2 Dec 2024 15:42:09 -0500 Subject: [PATCH 8/8] Update pom.xml Signed-off-by: Marjori Martinez --- pom.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/pom.xml b/pom.xml index 3fd4bbf..a734d88 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,6 @@ 2.24.1 github - 3.3.2 214 1.1.5 @@ -55,7 +54,6 @@ commons-codec 1.17.1 - org.apache.kafka kafka-clients