Skip to content

Commit bcd7210

Browse files
authored
pkg/types -> new imports pt 4 (#4012)
1 parent 33b1ec8 commit bcd7210

39 files changed

+356
-384
lines changed

pkg/acquisition/acquisition.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
3030
"github.com/crowdsecurity/crowdsec/pkg/logging"
3131
"github.com/crowdsecurity/crowdsec/pkg/metrics"
32-
"github.com/crowdsecurity/crowdsec/pkg/types"
32+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
3333
)
3434

3535
type DataSourceUnavailableError struct {
@@ -61,12 +61,12 @@ type DataSource interface {
6161

6262
type Fetcher interface {
6363
// Start one shot acquisition(eg, cat a file)
64-
OneShotAcquisition(ctx context.Context, out chan types.Event, acquisTomb *tomb.Tomb) error
64+
OneShotAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error
6565
}
6666

6767
type Tailer interface {
6868
// Start live acquisition (eg, tail a file)
69-
StreamingAcquisition(ctx context.Context, out chan types.Event, acquisTomb *tomb.Tomb) error
69+
StreamingAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error
7070
}
7171

7272
type MetricsProvider interface {
@@ -186,7 +186,7 @@ func LoadAcquisitionFromDSN(ctx context.Context, dsn string, labels map[string]s
186186
uniqueID := uuid.NewString()
187187

188188
if transformExpr != "" {
189-
vm, err := expr.Compile(transformExpr, exprhelpers.GetExprOptions(map[string]any{"evt": &types.Event{}})...)
189+
vm, err := expr.Compile(transformExpr, exprhelpers.GetExprOptions(map[string]any{"evt": &pipeline.Event{}})...)
190190
if err != nil {
191191
return nil, fmt.Errorf("while compiling transform expression '%s': %w", transformExpr, err)
192192
}
@@ -353,7 +353,7 @@ func sourcesFromFile(ctx context.Context, acquisFile string, metricsLevel metric
353353
}
354354

355355
if sub.TransformExpr != "" {
356-
vm, err := expr.Compile(sub.TransformExpr, exprhelpers.GetExprOptions(map[string]any{"evt": &types.Event{}})...)
356+
vm, err := expr.Compile(sub.TransformExpr, exprhelpers.GetExprOptions(map[string]any{"evt": &pipeline.Event{}})...)
357357
if err != nil {
358358
return nil, fmt.Errorf("while compiling transform expression '%s' for datasource %s in %s (position %d): %w", sub.TransformExpr, sub.Source, acquisFile, idx, err)
359359
}
@@ -417,8 +417,8 @@ func GetMetrics(sources []DataSource, aggregated bool) error {
417417

418418
// There's no need for an actual deep copy
419419
// The event is almost empty, we are mostly interested in allocating new maps for Parsed/Meta/...
420-
func copyEvent(evt types.Event, line string) types.Event {
421-
evtCopy := types.MakeEvent(evt.ExpectMode == types.TIMEMACHINE, evt.Type, evt.Process)
420+
func copyEvent(evt pipeline.Event, line string) pipeline.Event {
421+
evtCopy := pipeline.MakeEvent(evt.ExpectMode == pipeline.TIMEMACHINE, evt.Type, evt.Process)
422422
evtCopy.Line = evt.Line
423423
evtCopy.Line.Raw = line
424424
evtCopy.Line.Labels = make(map[string]string)
@@ -428,7 +428,7 @@ func copyEvent(evt types.Event, line string) types.Event {
428428
return evtCopy
429429
}
430430

431-
func transform(transformChan chan types.Event, output chan types.Event, acquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) {
431+
func transform(transformChan chan pipeline.Event, output chan pipeline.Event, acquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) {
432432
defer trace.CatchPanic("crowdsec/acquis")
433433

434434
logger.Info("transformer started")
@@ -484,7 +484,7 @@ func transform(transformChan chan types.Event, output chan types.Event, acquisTo
484484
}
485485
}
486486

487-
func StartAcquisition(ctx context.Context, sources []DataSource, output chan types.Event, acquisTomb *tomb.Tomb) error {
487+
func StartAcquisition(ctx context.Context, sources []DataSource, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
488488
// Don't wait if we have no sources, as it will hang forever
489489
if len(sources) == 0 {
490490
return nil
@@ -506,7 +506,7 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan typ
506506
if transformRuntime, ok := transformRuntimes[subsrc.GetUuid()]; ok {
507507
log.Infof("transform expression found for datasource %s", subsrc.GetName())
508508

509-
transformChan := make(chan types.Event)
509+
transformChan := make(chan pipeline.Event)
510510
outChan = transformChan
511511
transformLogger := log.WithFields(log.Fields{
512512
"component": "transform",

pkg/acquisition/acquisition_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
2020
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
2121
"github.com/crowdsecurity/crowdsec/pkg/metrics"
22-
"github.com/crowdsecurity/crowdsec/pkg/types"
22+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
2323
)
2424

2525
type MockSource struct {
@@ -337,9 +337,9 @@ func (f *MockCat) Configure(_ context.Context, _ []byte, _ *log.Entry, _ metrics
337337
func (*MockCat) UnmarshalConfig(_ []byte) error { return nil }
338338
func (*MockCat) GetName() string { return "mock_cat" }
339339
func (*MockCat) GetMode() string { return "cat" }
340-
func (*MockCat) OneShotAcquisition(_ context.Context, out chan types.Event, _ *tomb.Tomb) error {
340+
func (*MockCat) OneShotAcquisition(_ context.Context, out chan pipeline.Event, _ *tomb.Tomb) error {
341341
for range 10 {
342-
evt := types.Event{}
342+
evt := pipeline.Event{}
343343
evt.Line.Src = "test"
344344
out <- evt
345345
}
@@ -373,9 +373,9 @@ func (*MockTail) UnmarshalConfig(_ []byte) error { return nil }
373373
func (*MockTail) GetName() string { return "mock_tail" }
374374
func (*MockTail) GetMode() string { return "tail" }
375375

376-
func (*MockTail) StreamingAcquisition(_ context.Context, out chan types.Event, t *tomb.Tomb) error {
376+
func (*MockTail) StreamingAcquisition(_ context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
377377
for range 10 {
378-
evt := types.Event{}
378+
evt := pipeline.Event{}
379379
evt.Line.Src = "test"
380380
out <- evt
381381
}
@@ -395,7 +395,7 @@ func TestStartAcquisitionCat(t *testing.T) {
395395
sources := []DataSource{
396396
&MockCat{},
397397
}
398-
out := make(chan types.Event)
398+
out := make(chan pipeline.Event)
399399
acquisTomb := tomb.Tomb{}
400400

401401
go func() {
@@ -424,7 +424,7 @@ func TestStartAcquisitionTail(t *testing.T) {
424424
sources := []DataSource{
425425
&MockTail{},
426426
}
427-
out := make(chan types.Event)
427+
out := make(chan pipeline.Event)
428428
acquisTomb := tomb.Tomb{}
429429

430430
go func() {
@@ -456,9 +456,9 @@ type MockTailError struct {
456456
MockTail
457457
}
458458

459-
func (*MockTailError) StreamingAcquisition(_ context.Context, out chan types.Event, t *tomb.Tomb) error {
459+
func (*MockTailError) StreamingAcquisition(_ context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
460460
for range 10 {
461-
evt := types.Event{}
461+
evt := pipeline.Event{}
462462
evt.Line.Src = "test"
463463
out <- evt
464464
}
@@ -473,7 +473,7 @@ func TestStartAcquisitionTailError(t *testing.T) {
473473
sources := []DataSource{
474474
&MockTailError{},
475475
}
476-
out := make(chan types.Event)
476+
out := make(chan pipeline.Event)
477477
acquisTomb := tomb.Tomb{}
478478

479479
go func() {
@@ -575,7 +575,7 @@ func (*TailModeNoTailer) CanRun() error { return nil }
575575

576576
func TestStartAcquisition_MissingTailer(t *testing.T) {
577577
ctx := t.Context()
578-
out := make(chan types.Event)
578+
out := make(chan pipeline.Event)
579579
errCh := make(chan error, 1)
580580

581581
var tb tomb.Tomb
@@ -598,7 +598,7 @@ func (*CatModeNoFetcher) CanRun() error { return nil }
598598

599599
func TestStartAcquisition_MissingFetcher(t *testing.T) {
600600
ctx := t.Context()
601-
out := make(chan types.Event)
601+
out := make(chan pipeline.Event)
602602
errCh := make(chan error, 1)
603603

604604
var tb tomb.Tomb

pkg/acquisition/modules/appsec/appsec.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
3131
"github.com/crowdsecurity/crowdsec/pkg/csnet"
3232
"github.com/crowdsecurity/crowdsec/pkg/metrics"
33-
"github.com/crowdsecurity/crowdsec/pkg/types"
33+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
3434
)
3535

3636
const (
@@ -429,7 +429,7 @@ func (w *AppsecSource) listenAndServe(ctx context.Context, t *tomb.Tomb) error {
429429
return nil
430430
}
431431

432-
func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
432+
func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
433433
apiClient, err := apiclient.GetLAPIClient()
434434
if err != nil {
435435
return fmt.Errorf("unable to get authenticated LAPI client: %w", err)

0 commit comments

Comments
 (0)