@@ -140,12 +140,39 @@ func mainfunc(cmd *cobra.Command, args []string) {
140140 }()
141141 }
142142
143- // Configure forwarding
144- outputSocket := viper .GetString ("output.socket" )
145- forward = (outputSocket != "" )
146- eventTypes := viper .GetStringSlice ("forward.types" )
147- allTypes := viper .GetBool ("forward.all" )
148- util .PrepareEventFilter (eventTypes , allTypes )
143+ // Get config from viper
144+ var multiForwardConf processing.MultiForwardConfiguration
145+ err = viper .Unmarshal (& multiForwardConf )
146+ if err != nil {
147+ log .Fatal (err )
148+ }
149+ // Keep supporting legacy config ("output" and "forward" sections)
150+ if len (multiForwardConf .Outputs ) == 0 {
151+ log .Info ("no multi-forwarder configuration, found, using legacy config" )
152+ outSocketPath := viper .GetString ("output.socket" )
153+ if len (outSocketPath ) > 0 {
154+ multiForwardConf .Outputs = make (map [string ]processing.MultiForwardOutput )
155+ multiForwardConf .Outputs ["default" ] =
156+ processing.MultiForwardOutput {
157+ Socket : viper .GetString ("output.socket" ),
158+ All : viper .GetBool ("forward.all" ),
159+ Types : viper .GetStringSlice ("forward.types" ),
160+ }
161+ }
162+ } else {
163+ log .Info ("found multi-forwarder configuration, ignoring legacy config" )
164+ }
165+
166+ if len (multiForwardConf .Outputs ) > 0 {
167+ forward = true
168+ }
169+
170+ if pse != nil {
171+ multiForwardConf .SubmitStats (pse )
172+ }
173+ multiFwdChan := make (chan types.Entry )
174+ reconnectTimes := viper .GetInt ("reconnect-retries" )
175+ multiForwardConf .Run (multiFwdChan , reconnectTimes )
149176
150177 // Optional profiling
151178 profileFile := viper .GetString ("profile" )
@@ -185,14 +212,10 @@ func mainfunc(cmd *cobra.Command, args []string) {
185212 s .Run (eventChan )
186213
187214 var forwardHandler processing.Handler
188- reconnectTimes := viper .GetInt ("reconnect-retries" )
189215 // start forwarding
190216 if forward {
191- forwardHandler = processing .MakeForwardHandler (int ( reconnectTimes ), outputSocket )
217+ forwardHandler = processing .MakeForwardHandler (multiFwdChan )
192218 fh := forwardHandler .(* processing.ForwardHandler )
193- if pse != nil {
194- fh .SubmitStats (pse )
195- }
196219 rdns := viper .GetBool ("active.rdns" )
197220 if rdns {
198221 expiryPeriod := viper .GetDuration ("active.rdns-cache-expiry" )
@@ -202,7 +225,6 @@ func mainfunc(cmd *cobra.Command, args []string) {
202225 fh .RDNSHandler .EnableOnlyPrivateIPRanges ()
203226 }
204227 }
205- fh .Run ()
206228
207229 stenosis := viper .GetBool ("stenosis.enable" )
208230 if stenosis {
@@ -236,12 +258,6 @@ func mainfunc(cmd *cobra.Command, args []string) {
236258
237259 addFields := viper .GetStringMapString ("add-fields" )
238260 fh .AddFields (addFields )
239-
240- defer func () {
241- c := make (chan bool )
242- fh .Stop (c )
243- <- c
244- }()
245261 } else {
246262 // in this case we use a void handler that does nothing
247263 forwardHandler = processing .MakeVoidHandler ()
0 commit comments