@@ -2,34 +2,43 @@ package main
22
33import (
44 "flag"
5- kafka "github.com/bwNetFlow/kafkaconnector"
6- flow "github.com/bwNetFlow/protobuf/go"
75 "io"
86 "log"
97 "net"
108 "os"
119 "os/signal"
1210 "reflect"
1311 "strings"
12+ "time"
13+
14+ kafka "github.com/bwNetFlow/kafkaconnector"
15+ flow "github.com/bwNetFlow/protobuf/go"
1416)
1517
1618var (
17- LogFile = flag .String ("log" , "./processor_reducer.log" , "Location of the log file." )
19+ logFile = flag .String ("log" , "./processor_reducer.log" , "Location of the log file." )
20+
21+ kafkaBroker = flag .String ("kafka.brokers" , "127.0.0.1:9092,[::1]:9092" , "Kafka brokers list separated by commas" )
22+ kafkaConsumerGroup = flag .String ("kafka.consumer_group" , "reducer_debug" , "Kafka Consumer Group" )
23+ kafkaInTopic = flag .String ("kafka.in.topic" , "flow-messages" , "Kafka topic to consume from" )
24+ kafkaOutTopic = flag .String ("kafka.out.topic" , "flows-messages-anon" , "Kafka topic to produce to" )
1825
19- KafkaConsumerGroup = flag .String ("kafka.consumer_group" , "reducer_debug" , "Kafka Consumer Group" )
20- KafkaInTopic = flag .String ("kafka.in.topic" , "flow-messages" , "Kafka topic to consume from" )
21- KafkaOutTopic = flag .String ("kafka.out.topic" , "flows-messages-anon" , "Kafka topic to produce to" )
22- KafkaBroker = flag .String ("kafka.brokers" , "127.0.0.1:9092,[::1]:9092" , "Kafka brokers list separated by commas" )
23- LimitFields = flag .String ("fields.limit" , "Bytes,Packets,Etype,Proto" , "Fields which will be kept" )
24- AnonFields = flag .String ("fields.anon" , "" , "Fields which will be anonymized, if kept by fields.limit" )
26+ kafkaUser = flag .String ("kafka.user" , "" , "Kafka username to authenticate with" )
27+ kafkaPass = flag .String ("kafka.pass" , "" , "Kafka password to authenticate with" )
28+ kafkaAuthAnon = flag .Bool ("kafka.auth_anon" , false , "Set Kafka Auth Anon" )
29+ kafkaDisableTLS = flag .Bool ("kafka.disable_tls" , false , "Whether to use tls or not" )
30+ kafkaDisableAuth = flag .Bool ("kafka.disable_auth" , false , "Whether to use auth or not" )
31+
32+ limitFields = flag .String ("fields.limit" , "Bytes,Packets,Etype,Proto" , "Fields which will be kept" )
33+ anonFields = flag .String ("fields.anon" , "" , "Fields which will be anonymized, if kept by fields.limit" )
2534)
2635
2736func main () {
2837 flag .Parse ()
2938 var err error
3039
3140 // initialize logger
32- logfile , err := os .OpenFile (* LogFile , os .O_RDWR | os .O_CREATE | os .O_APPEND , 0666 )
41+ logfile , err := os .OpenFile (* logFile , os .O_RDWR | os .O_CREATE | os .O_APPEND , 0666 )
3342 if err != nil {
3443 println ("Error opening file for logging: %v" , err )
3544 return
@@ -45,20 +54,41 @@ func main() {
4554
4655 // connect to the Kafka cluster using bwNetFlow/kafkaconnector
4756 var kafkaConn = kafka.Connector {}
48- err = kafkaConn .SetAuthFromEnv ()
49- if err != nil {
50- log .Println (err )
51- log .Println ("Resuming as user anon." )
52- kafkaConn .SetAuthAnon ()
57+
58+ // disable TLS if requested
59+ if * kafkaDisableTLS {
60+ log .Println ("kafkaDisableTLS ..." )
61+ kafkaConn .DisableTLS ()
62+ }
63+ if * kafkaDisableAuth {
64+ log .Println ("kafkaDisableAuth ..." )
65+ kafkaConn .DisableAuth ()
66+ } else { // set Kafka auth
67+ if * kafkaAuthAnon {
68+ kafkaConn .SetAuthAnon ()
69+ } else if * kafkaUser != "" {
70+ kafkaConn .SetAuth (* kafkaUser , * kafkaPass )
71+ } else {
72+ log .Println ("No explicit credentials available, trying env." )
73+ err = kafkaConn .SetAuthFromEnv ()
74+ if err != nil {
75+ log .Println ("No credentials available, using 'anon:anon'." )
76+ kafkaConn .SetAuthAnon ()
77+ }
78+ }
5379 }
54- err = kafkaConn .StartConsumer (* KafkaBroker , []string {* KafkaInTopic }, * KafkaConsumerGroup , - 1 ) // offset -1 is the most recent flow
80+ err = kafkaConn .StartConsumer (* kafkaBroker , []string {* kafkaInTopic }, * kafkaConsumerGroup , - 1 ) // offset -1 is the most recent flow
5581 if err != nil {
5682 log .Println ("StartConsumer:" , err )
83+ // sleep to make auto restart not too fast and spamming connection retries
84+ time .Sleep (5 * time .Second )
5785 return
5886 }
59- err = kafkaConn .StartProducer (* KafkaBroker )
87+ err = kafkaConn .StartProducer (* kafkaBroker )
6088 if err != nil {
6189 log .Println ("StartProducer:" , err )
90+ // sleep to make auto restart not too fast and spamming connection retries
91+ time .Sleep (5 * time .Second )
6292 return
6393 }
6494 defer kafkaConn .Close ()
@@ -76,10 +106,10 @@ func main() {
76106 reduced := flow.FlowMessage {}
77107 reflected_reduced := reflect .ValueOf (& reduced ) // mutable
78108 // limit stuff
79- for _ , fieldname := range strings .Split (* LimitFields , "," ) {
109+ for _ , fieldname := range strings .Split (* limitFields , "," ) {
80110 if fieldname == "" {
81111 log .Println ("fields.limit unset, terminating..." )
82- return // case LimitFields == ''
112+ return // case limitFields == ''
83113 }
84114 original_field := reflect .Indirect (reflected_original ).FieldByName (fieldname )
85115 reduced_field := reflected_reduced .Elem ().FieldByName (fieldname )
@@ -90,9 +120,9 @@ func main() {
90120 }
91121 }
92122 // anon stuff
93- for _ , fieldname := range strings .Split (* AnonFields , "," ) {
123+ for _ , fieldname := range strings .Split (* anonFields , "," ) {
94124 if fieldname == "" {
95- continue // case AnonFields == ''
125+ continue // case anonFields == ''
96126 }
97127 reduced_field := reflected_reduced .Elem ().FieldByName (fieldname )
98128 if reduced_field .IsValid () {
@@ -113,7 +143,7 @@ func main() {
113143 log .Printf ("The reduced flow message did not have a field named '%s' to anonymize." , fieldname )
114144 }
115145 }
116- kafkaConn .ProducerChannel (* KafkaOutTopic ) <- & reduced
146+ kafkaConn .ProducerChannel (* kafkaOutTopic ) <- & reduced
117147 case <- signals :
118148 return
119149 }
0 commit comments