diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index 6edf0b1ef05..28efdff6eb3 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -24,7 +24,6 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/fflag" "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/logging" - "github.com/crowdsecurity/crowdsec/pkg/parser" "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) @@ -115,11 +114,6 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo } } - if flags.DumpDir != "" { - parser.ParseDump = true - leakybucket.BucketPourTrack = true - } - if flags.haveTimeMachine() { // in time-machine mode, we want to see what's happening cConfig.Common.LogMedia = "stdout" diff --git a/cmd/crowdsec/output.go b/cmd/crowdsec/output.go index ff3dd8afbaf..23b9dc98d5b 100644 --- a/cmd/crowdsec/output.go +++ b/cmd/crowdsec/output.go @@ -114,7 +114,8 @@ func runOutput( } /* process post overflow parser nodes */ - event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes) + dump := flags.DumpDir != "" + event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes, dump) if err != nil { return fmt.Errorf("postoverflow failed: %w", err) } diff --git a/cmd/crowdsec/parse.go b/cmd/crowdsec/parse.go index 5886363c5e6..8b0cdd1067b 100644 --- a/cmd/crowdsec/parse.go +++ b/cmd/crowdsec/parse.go @@ -35,7 +35,8 @@ func parseEvent( startParsing := time.Now() /* parse the log using magic */ - parsed, err := parser.Parse(parserCTX, event, nodes) + dump := flags.DumpDir != "" + parsed, err := parser.Parse(parserCTX, event, nodes, dump) if err != nil { log.Errorf("failed parsing: %v", err) } diff --git a/cmd/crowdsec/pour.go b/cmd/crowdsec/pour.go index 249df3ce025..111d0562f80 100644 --- a/cmd/crowdsec/pour.go +++ b/cmd/crowdsec/pour.go @@ -52,7 +52,8 @@ func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.Buc triggerGC(parsed, buckets, cConfig) } // here we can bucketify with parsed - poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets) + track := flags.DumpDir != "" + poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets, track) if err != nil { log.Warningf("bucketify failed for: %v with %s", parsed, err) continue diff --git a/pkg/leakybucket/buckets_test.go b/pkg/leakybucket/buckets_test.go index 36e9cdc4061..800a7cf0783 100644 --- a/pkg/leakybucket/buckets_test.go +++ b/pkg/leakybucket/buckets_test.go @@ -184,7 +184,7 @@ func testFile(t *testing.T, file string, holders []BucketFactory, response chan in.ExpectMode = pipeline.TIMEMACHINE log.Infof("Buckets input : %s", spew.Sdump(in)) - ok, err := PourItemToHolders(ctx, in, holders, buckets) + ok, err := PourItemToHolders(ctx, in, holders, buckets, false) if err != nil { t.Fatalf("Failed to pour : %s", err) } @@ -207,7 +207,7 @@ POLL_AGAIN: results = append(results, ret) if ret.Overflow.Reprocess { log.Errorf("Overflow being reprocessed.") - ok, err := PourItemToHolders(ctx, ret, holders, buckets) + ok, err := PourItemToHolders(ctx, ret, holders, buckets, false) if err != nil { t.Fatalf("Failed to pour : %s", err) } diff --git a/pkg/leakybucket/manager_run.go b/pkg/leakybucket/manager_run.go index 3a1893e8f36..3e2c14ccf9f 100644 --- a/pkg/leakybucket/manager_run.go +++ b/pkg/leakybucket/manager_run.go @@ -20,7 +20,6 @@ import ( var ( serialized map[string]Leaky BucketPourCache map[string][]pipeline.Event = make(map[string][]pipeline.Event) - BucketPourTrack bool bucketPourMu sync.Mutex ) @@ -75,7 +74,7 @@ func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) { } } -func PourItemToBucket(ctx context.Context, bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *pipeline.Event) (bool, error) { +func PourItemToBucket(ctx context.Context, bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *pipeline.Event, track bool) (bool, error) { var sent bool var buckey = bucket.Mapkey var err error @@ -143,7 +142,7 @@ func PourItemToBucket(ctx context.Context, bucket *Leaky, holder BucketFactory, select { case bucket.In <- parsed: // holder.logger.Tracef("Successfully sent !") - if BucketPourTrack { + if track { evt := deepcopy.Copy(*parsed).(pipeline.Event) bucketPourMu.Lock() @@ -204,10 +203,10 @@ func LoadOrStoreBucketFromHolder(ctx context.Context, partitionKey string, bucke var orderEvent map[string]*sync.WaitGroup -func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []BucketFactory, buckets *Buckets) (bool, error) { +func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []BucketFactory, buckets *Buckets, track bool) (bool, error) { var ok, condition, poured bool - if BucketPourTrack { + if track { evt := deepcopy.Copy(parsed).(pipeline.Event) bucketPourMu.Lock() @@ -275,7 +274,7 @@ func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []Buc orderEvent[buckey].Add(1) } - ok, err := PourItemToBucket(ctx, bucket, holders[idx], buckets, &parsed) + ok, err := PourItemToBucket(ctx, bucket, holders[idx], buckets, &parsed, track) if bucket.orderEvent { orderEvent[buckey].Wait() diff --git a/pkg/leakybucket/manager_run_test.go b/pkg/leakybucket/manager_run_test.go index 49421a4bfea..2f1f0cbf2fd 100644 --- a/pkg/leakybucket/manager_run_test.go +++ b/pkg/leakybucket/manager_run_test.go @@ -86,7 +86,7 @@ func TestGCandDump(t *testing.T) { in := pipeline.Event{Parsed: map[string]string{"something": "something"}} // pour an item that will go to leaky + counter - ok, err := PourItemToHolders(ctx, in, Holders, buckets) + ok, err := PourItemToHolders(ctx, in, Holders, buckets, false) if err != nil { t.Fatalf("while pouring item: %s", err) } @@ -157,7 +157,7 @@ func TestShutdownBuckets(t *testing.T) { in := pipeline.Event{Parsed: map[string]string{"something": "something"}} // pour an item that will go to leaky + counter ctx, cancel := context.WithCancel(t.Context()) - ok, err := PourItemToHolders(ctx, in, Holders, buckets) + ok, err := PourItemToHolders(ctx, in, Holders, buckets, false) if err != nil { t.Fatalf("while pouring item : %s", err) } diff --git a/pkg/parser/parsing_test.go b/pkg/parser/parsing_test.go index d808bea1bad..2202b0119b0 100644 --- a/pkg/parser/parsing_test.go +++ b/pkg/parser/parsing_test.go @@ -316,7 +316,7 @@ func testSubSet(testSet TestFile, pctx UnixParserCtx, nodes []Node) (bool, error var results []pipeline.Event for _, in := range testSet.Lines { - out, err := Parse(pctx, in, nodes) + out, err := Parse(pctx, in, nodes, false) if err != nil { log.Errorf("Failed to process %s : %v", spew.Sdump(in), err) } diff --git a/pkg/parser/runtime.go b/pkg/parser/runtime.go index 603c90b283e..353aca4c2d0 100644 --- a/pkg/parser/runtime.go +++ b/pkg/parser/runtime.go @@ -242,8 +242,6 @@ func stageidx(stage string, stages []string) int { return -1 } -var ParseDump bool - var ( StageParseCache dumps.ParserResults = make(dumps.ParserResults) StageParseMutex sync.Mutex @@ -254,7 +252,7 @@ var ( }) ) -func Parse(ctx UnixParserCtx, xp pipeline.Event, nodes []Node) (pipeline.Event, error) { +func Parse(ctx UnixParserCtx, xp pipeline.Event, nodes []Node, dump bool) (pipeline.Event, error) { event := xp /* the stage is undefined, probably line is freshly acquired, set to first stage !*/ @@ -288,7 +286,7 @@ func Parse(ctx UnixParserCtx, xp pipeline.Event, nodes []Node) (pipeline.Event, log.Tracef("INPUT '%s'", event.Line.Raw) } - if ParseDump { + if dump { ensureStageCache() } @@ -339,7 +337,7 @@ func Parse(ctx UnixParserCtx, xp pipeline.Event, nodes []Node) (pipeline.Event, clog.Tracef("node (%s) ret : %v", nodes[idx].rn, ret) - if ParseDump { + if dump { var parserIdxInStage int // copy outside of critical section