Skip to content

Commit 12d2e32

Browse files
authored
refact pkg/acquisition: extract loop method (#3615)
* refact pkg/acquisition: extract loop method * lint
1 parent 0490986 commit 12d2e32

File tree

3 files changed

+120
-97
lines changed

3 files changed

+120
-97
lines changed

cmd/crowdsec/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func LoadAcquisition(cConfig *csconfig.Config) ([]acquisition.DataSource, error)
120120
return nil, fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err)
121121
}
122122
} else {
123-
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec, cConfig.Prometheus)
123+
dataSources, err = acquisition.LoadAcquisitionFromFiles(cConfig.Crowdsec, cConfig.Prometheus)
124124
if err != nil {
125125
return nil, err
126126
}

pkg/acquisition/acquisition.go

Lines changed: 101 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"io"
8+
"maps"
89
"os"
910
"strings"
1011

@@ -24,7 +25,6 @@ import (
2425
"github.com/crowdsecurity/crowdsec/pkg/cwversion/component"
2526
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
2627
"github.com/crowdsecurity/crowdsec/pkg/types"
27-
"maps"
2828
)
2929

3030
type DataSourceUnavailableError struct {
@@ -42,18 +42,18 @@ func (e *DataSourceUnavailableError) Unwrap() error {
4242

4343
// The interface each datasource must implement
4444
type DataSource interface {
45-
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
46-
GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
47-
UnmarshalConfig(yamlConfig []byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime
48-
Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error // Complete the YAML datasource configuration and perform runtime checks.
49-
ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uniqueID string) error // Configure the datasource
50-
GetMode() string // Get the mode (TAIL, CAT or SERVER)
51-
GetName() string // Get the name of the module
52-
OneShotAcquisition(ctx context.Context, out chan types.Event, acquisTomb *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
53-
StreamingAcquisition(ctx context.Context, out chan types.Event, acquisTomb *tomb.Tomb) error // Start live acquisition (eg, tail a file)
54-
CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
55-
GetUuid() string // Get the unique identifier of the datasource
56-
Dump() interface{}
45+
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
46+
GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
47+
UnmarshalConfig(yamlConfig []byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime
48+
Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error // Complete the YAML datasource configuration and perform runtime checks.
49+
ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uniqueID string) error // Configure the datasource
50+
GetMode() string // Get the mode (TAIL, CAT or SERVER)
51+
GetName() string // Get the name of the module
52+
OneShotAcquisition(ctx context.Context, out chan types.Event, acquisTomb *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
53+
StreamingAcquisition(ctx context.Context, out chan types.Event, acquisTomb *tomb.Tomb) error // Start live acquisition (eg, tail a file)
54+
CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
55+
GetUuid() string // Get the unique identifier of the datasource
56+
Dump() any
5757
}
5858

5959
var (
@@ -180,7 +180,7 @@ func LoadAcquisitionFromDSN(dsn string, labels map[string]string, transformExpr
180180
uniqueId := uuid.NewString()
181181

182182
if transformExpr != "" {
183-
vm, err := expr.Compile(transformExpr, exprhelpers.GetExprOptions(map[string]interface{}{"evt": &types.Event{}})...)
183+
vm, err := expr.Compile(transformExpr, exprhelpers.GetExprOptions(map[string]any{"evt": &types.Event{}})...)
184184
if err != nil {
185185
return nil, fmt.Errorf("while compiling transform expression '%s': %w", transformExpr, err)
186186
}
@@ -216,113 +216,127 @@ func GetMetricsLevelFromPromCfg(prom *csconfig.PrometheusCfg) int {
216216
return configuration.METRICS_FULL
217217
}
218218

219-
// LoadAcquisitionFromFile unmarshals the configuration item and checks its availability
220-
func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg, prom *csconfig.PrometheusCfg) ([]DataSource, error) {
219+
// sourcesFromFile reads and parses one acquisition file into DataSources.
220+
func sourcesFromFile(acquisFile string, metrics_level int) ([]DataSource, error) {
221221
var sources []DataSource
222222

223-
metrics_level := GetMetricsLevelFromPromCfg(prom)
224-
225-
for _, acquisFile := range config.AcquisitionFiles {
226-
log.Infof("loading acquisition file : %s", acquisFile)
227-
228-
yamlFile, err := os.Open(acquisFile)
229-
if err != nil {
230-
return nil, err
231-
}
232-
233-
defer yamlFile.Close()
223+
log.Infof("loading acquisition file : %s", acquisFile)
234224

235-
acquisContent, err := io.ReadAll(yamlFile)
236-
if err != nil {
237-
return nil, fmt.Errorf("failed to read %s: %w", acquisFile, err)
238-
}
225+
yamlFile, err := os.Open(acquisFile)
226+
if err != nil {
227+
return nil, err
228+
}
239229

240-
expandedAcquis := csstring.StrictExpand(string(acquisContent), os.LookupEnv)
230+
defer yamlFile.Close()
241231

242-
dec := yaml.NewDecoder(strings.NewReader(expandedAcquis))
243-
dec.SetStrict(true)
232+
acquisContent, err := io.ReadAll(yamlFile)
233+
if err != nil {
234+
return nil, fmt.Errorf("failed to read %s: %w", acquisFile, err)
235+
}
244236

245-
idx := -1
237+
expandedAcquis := csstring.StrictExpand(string(acquisContent), os.LookupEnv)
246238

247-
for {
248-
var sub configuration.DataSourceCommonCfg
239+
dec := yaml.NewDecoder(strings.NewReader(expandedAcquis))
240+
dec.SetStrict(true)
249241

250-
idx += 1
242+
idx := -1
251243

252-
err = dec.Decode(&sub)
253-
if err != nil {
254-
if !errors.Is(err, io.EOF) {
255-
return nil, fmt.Errorf("failed to parse %s: %w", acquisFile, err)
256-
}
244+
for {
245+
var sub configuration.DataSourceCommonCfg
257246

258-
log.Tracef("End of yaml file")
247+
idx += 1
259248

260-
break
249+
err = dec.Decode(&sub)
250+
if err != nil {
251+
if !errors.Is(err, io.EOF) {
252+
return nil, fmt.Errorf("failed to parse %s: %w", acquisFile, err)
261253
}
262254

263-
// for backward compat ('type' was not mandatory, detect it)
264-
if guessType := detectBackwardCompatAcquis(sub); guessType != "" {
265-
log.Debugf("datasource type missing in %s (position %d): detected 'source=%s'", acquisFile, idx, guessType)
255+
log.Tracef("End of yaml file")
266256

267-
if sub.Source != "" && sub.Source != guessType {
268-
log.Warnf("datasource type mismatch in %s (position %d): found '%s' but should probably be '%s'", acquisFile, idx, sub.Source, guessType)
269-
}
257+
break
258+
}
270259

271-
sub.Source = guessType
272-
}
273-
// it's an empty item, skip it
274-
if len(sub.Labels) == 0 {
275-
if sub.Source == "" {
276-
log.Debugf("skipping empty item in %s", acquisFile)
277-
continue
278-
}
260+
// for backward compat ('type' was not mandatory, detect it)
261+
if guessType := detectBackwardCompatAcquis(sub); guessType != "" {
262+
log.Debugf("datasource type missing in %s (position %d): detected 'source=%s'", acquisFile, idx, guessType)
279263

280-
if sub.Source != "docker" {
281-
// docker is the only source that can be empty
282-
return nil, fmt.Errorf("missing labels in %s (position %d)", acquisFile, idx)
283-
}
264+
if sub.Source != "" && sub.Source != guessType {
265+
log.Warnf("datasource type mismatch in %s (position %d): found '%s' but should probably be '%s'", acquisFile, idx, sub.Source, guessType)
284266
}
285267

268+
sub.Source = guessType
269+
}
270+
// it's an empty item, skip it
271+
if len(sub.Labels) == 0 {
286272
if sub.Source == "" {
287-
return nil, fmt.Errorf("data source type is empty ('source') in %s (position %d)", acquisFile, idx)
273+
log.Debugf("skipping empty item in %s", acquisFile)
274+
continue
288275
}
289276

290-
// pre-check that the source is valid
291-
_, err := GetDataSourceIface(sub.Source)
292-
if err != nil {
293-
return nil, fmt.Errorf("in file %s (position %d) - %w", acquisFile, idx, err)
277+
if sub.Source != "docker" {
278+
// docker is the only source that can be empty
279+
return nil, fmt.Errorf("missing labels in %s (position %d)", acquisFile, idx)
294280
}
281+
}
295282

296-
uniqueId := uuid.NewString()
297-
sub.UniqueId = uniqueId
283+
if sub.Source == "" {
284+
return nil, fmt.Errorf("data source type is empty ('source') in %s (position %d)", acquisFile, idx)
285+
}
298286

299-
src, err := DataSourceConfigure(sub, metrics_level)
300-
if err != nil {
301-
var dserr *DataSourceUnavailableError
302-
if errors.As(err, &dserr) {
303-
log.Error(err)
304-
continue
305-
}
287+
// pre-check that the source is valid
288+
_, err := GetDataSourceIface(sub.Source)
289+
if err != nil {
290+
return nil, fmt.Errorf("in file %s (position %d) - %w", acquisFile, idx, err)
291+
}
306292

307-
return nil, fmt.Errorf("while configuring datasource of type %s from %s (position %d): %w", sub.Source, acquisFile, idx, err)
293+
uniqueId := uuid.NewString()
294+
sub.UniqueId = uniqueId
295+
296+
src, err := DataSourceConfigure(sub, metrics_level)
297+
if err != nil {
298+
var dserr *DataSourceUnavailableError
299+
if errors.As(err, &dserr) {
300+
log.Error(err)
301+
continue
308302
}
309303

310-
if sub.TransformExpr != "" {
311-
vm, err := expr.Compile(sub.TransformExpr, exprhelpers.GetExprOptions(map[string]interface{}{"evt": &types.Event{}})...)
312-
if err != nil {
313-
return nil, fmt.Errorf("while compiling transform expression '%s' for datasource %s in %s (position %d): %w", sub.TransformExpr, sub.Source, acquisFile, idx, err)
314-
}
304+
return nil, fmt.Errorf("while configuring datasource of type %s from %s (position %d): %w", sub.Source, acquisFile, idx, err)
305+
}
315306

316-
transformRuntimes[uniqueId] = vm
307+
if sub.TransformExpr != "" {
308+
vm, err := expr.Compile(sub.TransformExpr, exprhelpers.GetExprOptions(map[string]any{"evt": &types.Event{}})...)
309+
if err != nil {
310+
return nil, fmt.Errorf("while compiling transform expression '%s' for datasource %s in %s (position %d): %w", sub.TransformExpr, sub.Source, acquisFile, idx, err)
317311
}
318312

319-
sources = append(sources, src)
313+
transformRuntimes[uniqueId] = vm
320314
}
315+
316+
sources = append(sources, src)
321317
}
322318

323319
return sources, nil
324320
}
325321

322+
// LoadAcquisitionFromFiles unmarshals the configuration item and checks its availability
323+
func LoadAcquisitionFromFiles(config *csconfig.CrowdsecServiceCfg, prom *csconfig.PrometheusCfg) ([]DataSource, error) {
324+
var allSources []DataSource
325+
326+
metrics_level := GetMetricsLevelFromPromCfg(prom)
327+
328+
for _, acquisFile := range config.AcquisitionFiles {
329+
sources, err := sourcesFromFile(acquisFile, metrics_level)
330+
if err != nil {
331+
return nil, err
332+
}
333+
334+
allSources = append(allSources, sources...)
335+
}
336+
337+
return allSources, nil
338+
}
339+
326340
func GetMetrics(sources []DataSource, aggregated bool) error {
327341
var metrics []prometheus.Collector
328342

@@ -362,6 +376,7 @@ func copyEvent(evt types.Event, line string) types.Event {
362376

363377
func transform(transformChan chan types.Event, output chan types.Event, acquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) {
364378
defer trace.CatchPanic("crowdsec/acquis")
379+
365380
logger.Infof("transformer started")
366381

367382
for {
@@ -372,7 +387,7 @@ func transform(transformChan chan types.Event, output chan types.Event, acquisTo
372387
case evt := <-transformChan:
373388
logger.Tracef("Received event %s", evt.Line.Raw)
374389

375-
out, err := expr.Run(transformRuntime, map[string]interface{}{"evt": &evt})
390+
out, err := expr.Run(transformRuntime, map[string]any{"evt": &evt})
376391
if err != nil {
377392
logger.Errorf("while running transform expression: %s, sending event as-is", err)
378393
output <- evt

pkg/acquisition/acquisition_test.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ import (
2424

2525
type MockSource struct {
2626
configuration.DataSourceCommonCfg `yaml:",inline"`
27-
Toto string `yaml:"toto"`
28-
logger *log.Entry
27+
28+
Toto string `yaml:"toto"`
29+
logger *log.Entry
2930
}
3031

3132
func (f *MockSource) UnmarshalConfig(cfg []byte) error {
@@ -215,7 +216,7 @@ wowo: ajsajasjas
215216
}
216217
}
217218

218-
func TestLoadAcquisitionFromFile(t *testing.T) {
219+
func TestLoadAcquisitionFromFiles(t *testing.T) {
219220
appendMockSource()
220221
t.Setenv("TEST_ENV", "test_value2")
221222

@@ -293,7 +294,7 @@ func TestLoadAcquisitionFromFile(t *testing.T) {
293294
}
294295
for _, tc := range tests {
295296
t.Run(tc.TestName, func(t *testing.T) {
296-
dss, err := LoadAcquisitionFromFile(&tc.Config, nil)
297+
dss, err := LoadAcquisitionFromFiles(&tc.Config, nil)
297298
cstest.RequireErrorContains(t, err, tc.ExpectedError)
298299

299300
if tc.ExpectedError != "" {
@@ -321,7 +322,8 @@ func TestLoadAcquisitionFromFile(t *testing.T) {
321322

322323
type MockCat struct {
323324
configuration.DataSourceCommonCfg `yaml:",inline"`
324-
logger *log.Entry
325+
326+
logger *log.Entry
325327
}
326328

327329
func (f *MockCat) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
@@ -366,7 +368,8 @@ func (f *MockCat) GetUuid() string { return "" }
366368

367369
type MockTail struct {
368370
configuration.DataSourceCommonCfg `yaml:",inline"`
369-
logger *log.Entry
371+
372+
logger *log.Entry
370373
}
371374

372375
func (f *MockTail) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
@@ -421,11 +424,12 @@ func TestStartAcquisitionCat(t *testing.T) {
421424

422425
go func() {
423426
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil {
424-
t.Errorf("unexpected error")
427+
t.Error("unexpected error")
425428
}
426429
}()
427430

428431
count := 0
432+
429433
READLOOP:
430434
for {
431435
select {
@@ -449,11 +453,12 @@ func TestStartAcquisitionTail(t *testing.T) {
449453

450454
go func() {
451455
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil {
452-
t.Errorf("unexpected error")
456+
t.Error("unexpected error")
453457
}
454458
}()
455459

456460
count := 0
461+
457462
READLOOP:
458463
for {
459464
select {
@@ -502,6 +507,7 @@ func TestStartAcquisitionTailError(t *testing.T) {
502507
}()
503508

504509
count := 0
510+
505511
READLOOP:
506512
for {
507513
select {
@@ -511,6 +517,7 @@ READLOOP:
511517
break READLOOP
512518
}
513519
}
520+
514521
assert.Equal(t, 10, count)
515522
// acquisTomb.Kill(nil)
516523
time.Sleep(1 * time.Second)
@@ -519,8 +526,9 @@ READLOOP:
519526

520527
type MockSourceByDSN struct {
521528
configuration.DataSourceCommonCfg `yaml:",inline"`
522-
Toto string `yaml:"toto"`
523-
logger *log.Entry //nolint: unused
529+
530+
Toto string `yaml:"toto"`
531+
logger *log.Entry //nolint: unused
524532
}
525533

526534
func (f *MockSourceByDSN) UnmarshalConfig(cfg []byte) error { return nil }

0 commit comments

Comments
 (0)