@@ -70,11 +70,7 @@ func initCrowdsec(cConfig *csconfig.Config, hub *cwhub.Hub, testMode bool) (*par
70
70
return csParsers , datasources , nil
71
71
}
72
72
73
- // runCrowdsec starts the log processor service
74
- func runCrowdsec (cConfig * csconfig.Config , parsers * parser.Parsers , hub * cwhub.Hub , datasources []acquisition.DataSource ) error {
75
- inputEventChan = make (chan types.Event )
76
- inputLineChan = make (chan types.Event )
77
-
73
+ func startParserRoutines (cConfig * csconfig.Config , parsers * parser.Parsers ) {
78
74
// start go-routines for parsing, buckets pour and outputs.
79
75
parserWg := & sync.WaitGroup {}
80
76
@@ -99,7 +95,9 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
99
95
return nil
100
96
})
101
97
parserWg .Wait ()
98
+ }
102
99
100
+ func startBucketRoutines (cConfig * csconfig.Config ) {
103
101
bucketWg := & sync.WaitGroup {}
104
102
105
103
bucketsTomb .Go (func () error {
@@ -126,15 +124,14 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
126
124
return nil
127
125
})
128
126
bucketWg .Wait ()
127
+ }
129
128
130
- apiClient , err := apiclient .GetLAPIClient ()
131
- if err != nil {
132
- return err
133
- }
134
-
129
+ func startHeartBeat (cConfig * csconfig.Config , apiClient * apiclient.ApiClient ) {
135
130
log .Debugf ("Starting HeartBeat service" )
136
131
apiClient .HeartBeat .StartHeartBeat (context .Background (), & outputsTomb )
132
+ }
137
133
134
+ func startOutputRoutines (cConfig * csconfig.Config , parsers * parser.Parsers , apiClient * apiclient.ApiClient ) {
138
135
outputWg := & sync.WaitGroup {}
139
136
140
137
outputsTomb .Go (func () error {
@@ -153,7 +150,9 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
153
150
return nil
154
151
})
155
152
outputWg .Wait ()
153
+ }
156
154
155
+ func startLPMetrics (cConfig * csconfig.Config , apiClient * apiclient.ApiClient , hub * cwhub.Hub , datasources []acquisition.DataSource ) error {
157
156
mp := NewMetricsProvider (
158
157
apiClient ,
159
158
lpMetricsDefaultInterval ,
@@ -178,6 +177,31 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
178
177
}
179
178
}
180
179
180
+ return nil
181
+ }
182
+
183
+ // runCrowdsec starts the log processor service
184
+ func runCrowdsec (cConfig * csconfig.Config , parsers * parser.Parsers , hub * cwhub.Hub , datasources []acquisition.DataSource ) error {
185
+ inputEventChan = make (chan types.Event )
186
+ inputLineChan = make (chan types.Event )
187
+
188
+ startParserRoutines (cConfig , parsers )
189
+
190
+ startBucketRoutines (cConfig )
191
+
192
+ apiClient , err := apiclient .GetLAPIClient ()
193
+ if err != nil {
194
+ return err
195
+ }
196
+
197
+ startHeartBeat (cConfig , apiClient )
198
+
199
+ startOutputRoutines (cConfig , parsers , apiClient )
200
+
201
+ if err := startLPMetrics (cConfig , apiClient , hub , datasources ); err != nil {
202
+ return err
203
+ }
204
+
181
205
log .Info ("Starting processing data" )
182
206
183
207
if err := acquisition .StartAcquisition (context .TODO (), dataSources , inputLineChan , & acquisTomb ); err != nil {
0 commit comments