1
1
package acquisition
2
2
3
3
import (
4
+ "bytes"
4
5
"context"
5
6
"errors"
6
7
"fmt"
7
8
"io"
8
9
"maps"
9
10
"os"
11
+ "slices"
10
12
"strings"
11
13
12
14
"github.com/expr-lang/expr"
13
15
"github.com/expr-lang/expr/vm"
16
+ "github.com/goccy/go-yaml"
14
17
"github.com/google/uuid"
15
18
"github.com/prometheus/client_golang/prometheus"
16
19
log "github.com/sirupsen/logrus"
17
20
tomb "gopkg.in/tomb.v2"
18
- "gopkg.in/yaml.v2"
19
21
20
22
"github.com/crowdsecurity/go-cs-lib/csstring"
23
+ "github.com/crowdsecurity/go-cs-lib/csyaml"
21
24
"github.com/crowdsecurity/go-cs-lib/trace"
22
25
23
26
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
@@ -70,6 +73,10 @@ func GetDataSourceIface(dataSourceType string) (DataSource, error) {
70
73
71
74
built , known := component .Built ["datasource_" + dataSourceType ]
72
75
76
+ if dataSourceType == "" {
77
+ return nil , errors .New ("data source type is empty" )
78
+ }
79
+
73
80
if ! known {
74
81
return nil , fmt .Errorf ("unknown data source %s" , dataSourceType )
75
82
}
@@ -114,14 +121,7 @@ func setupLogger(source, name string, level *log.Level) (*log.Entry, error) {
114
121
// if the configuration is not valid it returns an error.
115
122
// If the datasource can't be run (eg. journalctl not available), it still returns an error which
116
123
// can be checked for the appropriate action.
117
- func DataSourceConfigure (commonConfig configuration.DataSourceCommonCfg , metricsLevel int ) (DataSource , error ) {
118
- // we dump it back to []byte, because we want to decode the yaml blob twice:
119
- // once to DataSourceCommonCfg, and then later to the dedicated type of the datasource
120
- yamlConfig , err := yaml .Marshal (commonConfig )
121
- if err != nil {
122
- return nil , fmt .Errorf ("unable to serialize back interface: %w" , err )
123
- }
124
-
124
+ func DataSourceConfigure (commonConfig configuration.DataSourceCommonCfg , yamlConfig []byte , metricsLevel int ) (DataSource , error ) {
125
125
dataSrc , err := GetDataSourceIface (commonConfig .Source )
126
126
if err != nil {
127
127
return nil , err
@@ -144,23 +144,6 @@ func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg, metrics
144
144
return dataSrc , nil
145
145
}
146
146
147
- // detectBackwardCompatAcquis: try to magically detect the type for backward compat (type was not mandatory then)
148
- func detectBackwardCompatAcquis (sub configuration.DataSourceCommonCfg ) string {
149
- if _ , ok := sub .Config ["filename" ]; ok {
150
- return "file"
151
- }
152
-
153
- if _ , ok := sub .Config ["filenames" ]; ok {
154
- return "file"
155
- }
156
-
157
- if _ , ok := sub .Config ["journalctl_filter" ]; ok {
158
- return "journalctl"
159
- }
160
-
161
- return ""
162
- }
163
-
164
147
func LoadAcquisitionFromDSN (dsn string , labels map [string ]string , transformExpr string ) ([]DataSource , error ) {
165
148
frags := strings .Split (dsn , ":" )
166
149
if len (frags ) == 1 {
@@ -216,6 +199,36 @@ func GetMetricsLevelFromPromCfg(prom *csconfig.PrometheusCfg) int {
216
199
return configuration .METRICS_FULL
217
200
}
218
201
202
+ func detectTypes (r io.Reader ) ([]string , error ) {
203
+ collectedKeys , err := csyaml .GetDocumentKeys (r )
204
+ if err != nil {
205
+ return nil , err
206
+ }
207
+
208
+ ret := make ([]string , len (collectedKeys ))
209
+
210
+ for idx , keys := range collectedKeys {
211
+ var detected string
212
+
213
+ switch {
214
+ case slices .Contains (keys , "source" ):
215
+ detected = ""
216
+ case slices .Contains (keys , "filename" ):
217
+ detected = "file"
218
+ case slices .Contains (keys , "filenames" ):
219
+ detected = "file"
220
+ case slices .Contains (keys , "journalctl_filter" ):
221
+ detected = "journalctl"
222
+ default :
223
+ detected = ""
224
+ }
225
+
226
+ ret [idx ] = detected
227
+ }
228
+
229
+ return ret , nil
230
+ }
231
+
219
232
// sourcesFromFile reads and parses one acquisition file into DataSources.
220
233
func sourcesFromFile (acquisFile string , metrics_level int ) ([]DataSource , error ) {
221
234
var sources []DataSource
@@ -236,29 +249,31 @@ func sourcesFromFile(acquisFile string, metrics_level int) ([]DataSource, error)
236
249
237
250
expandedAcquis := csstring .StrictExpand (string (acquisContent ), os .LookupEnv )
238
251
239
- dec := yaml .NewDecoder (strings .NewReader (expandedAcquis ))
240
- dec .SetStrict (true )
252
+ detectedTypes , err := detectTypes (strings .NewReader (expandedAcquis ))
253
+ if err != nil {
254
+ return nil , fmt .Errorf ("failed to parse %s: %w" , yamlFile .Name (), err )
255
+ }
256
+
257
+ documents , err := csyaml .SplitDocuments (strings .NewReader (expandedAcquis ))
258
+ if err != nil {
259
+ return nil , err
260
+ }
241
261
242
262
idx := - 1
243
263
244
- for {
245
- var sub configuration.DataSourceCommonCfg
246
-
264
+ for _ , yamlDoc := range documents {
247
265
idx += 1
248
266
249
- err = dec .Decode (& sub )
250
- if err != nil {
251
- if ! errors .Is (err , io .EOF ) {
252
- return nil , fmt .Errorf ("failed to parse %s: %w" , acquisFile , err )
253
- }
254
-
255
- log .Tracef ("End of yaml file" )
267
+ var sub configuration.DataSourceCommonCfg
256
268
257
- break
269
+ // can't be strict here, the doc contains specific datasource config too but we won't collect them now.
270
+ err := yaml .UnmarshalWithOptions (yamlDoc , & sub )
271
+ if err != nil {
272
+ return nil , fmt .Errorf ("failed to parse %s: %w" , yamlFile .Name (), errors .New (yaml .FormatError (err , false , false )))
258
273
}
259
274
260
275
// for backward compat ('type' was not mandatory, detect it)
261
- if guessType := detectBackwardCompatAcquis ( sub ) ; guessType != "" {
276
+ if guessType := detectedTypes [ idx ] ; guessType != "" {
262
277
log .Debugf ("datasource type missing in %s (position %d): detected 'source=%s'" , acquisFile , idx , guessType )
263
278
264
279
if sub .Source != "" && sub .Source != guessType {
@@ -267,33 +282,40 @@ func sourcesFromFile(acquisFile string, metrics_level int) ([]DataSource, error)
267
282
268
283
sub .Source = guessType
269
284
}
285
+
270
286
// it's an empty item, skip it
271
- if len (sub .Labels ) == 0 {
272
- if sub .Source == "" {
273
- log .Debugf ("skipping empty item in %s" , acquisFile )
274
- continue
275
- }
276
287
288
+ empty , err := csyaml .IsEmptyYAML (bytes .NewReader (yamlDoc ))
289
+ if err != nil {
290
+ return nil , fmt .Errorf ("failed to parse %s (position %d): %w" , acquisFile , idx , err )
291
+ }
292
+
293
+ if empty {
294
+ // there are no keys or only comments, skip the document
295
+ continue
296
+ }
297
+
298
+ if len (sub .Labels ) == 0 {
277
299
if sub .Source != "docker" {
278
300
// docker is the only source that can be empty
279
301
return nil , fmt .Errorf ("missing labels in %s (position %d)" , acquisFile , idx )
280
302
}
281
303
}
282
304
283
305
if sub .Source == "" {
284
- return nil , fmt .Errorf ("data source type is empty ( 'source') in %s (position %d)" , acquisFile , idx )
306
+ return nil , fmt .Errorf ("missing 'source' field in %s (position %d)" , acquisFile , idx )
285
307
}
286
308
287
309
// pre-check that the source is valid
288
- _ , err : = GetDataSourceIface (sub .Source )
310
+ _ , err = GetDataSourceIface (sub .Source )
289
311
if err != nil {
290
312
return nil , fmt .Errorf ("in file %s (position %d) - %w" , acquisFile , idx , err )
291
313
}
292
314
293
315
uniqueId := uuid .NewString ()
294
316
sub .UniqueId = uniqueId
295
317
296
- src , err := DataSourceConfigure (sub , metrics_level )
318
+ src , err := DataSourceConfigure (sub , yamlDoc , metrics_level )
297
319
if err != nil {
298
320
var dserr * DataSourceUnavailableError
299
321
if errors .As (err , & dserr ) {
@@ -376,7 +398,6 @@ func copyEvent(evt types.Event, line string) types.Event {
376
398
377
399
func transform (transformChan chan types.Event , output chan types.Event , acquisTomb * tomb.Tomb , transformRuntime * vm.Program , logger * log.Entry ) {
378
400
defer trace .CatchPanic ("crowdsec/acquis" )
379
-
380
401
logger .Infof ("transformer started" )
381
402
382
403
for {
0 commit comments