Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions cmd/crowdsec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/fflag"
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/logging"
"github.com/crowdsecurity/crowdsec/pkg/parser"
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
)

Expand Down Expand Up @@ -115,11 +114,6 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo
}
}

if flags.DumpDir != "" {
parser.ParseDump = true
leakybucket.BucketPourTrack = true
}

if flags.haveTimeMachine() {
// in time-machine mode, we want to see what's happening
cConfig.Common.LogMedia = "stdout"
Expand Down
3 changes: 2 additions & 1 deletion cmd/crowdsec/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func runOutput(
}

/* process post overflow parser nodes */
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes)
dump := flags.DumpDir != ""
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes, dump)
if err != nil {
return fmt.Errorf("postoverflow failed: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/crowdsec/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func parseEvent(

startParsing := time.Now()
/* parse the log using magic */
parsed, err := parser.Parse(parserCTX, event, nodes)
dump := flags.DumpDir != ""
parsed, err := parser.Parse(parserCTX, event, nodes, dump)
if err != nil {
log.Errorf("failed parsing: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/crowdsec/pour.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.Buc
triggerGC(parsed, buckets, cConfig)
}
// here we can bucketify with parsed
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets)
track := flags.DumpDir != ""
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets, track)
if err != nil {
log.Warningf("bucketify failed for: %v with %s", parsed, err)
continue
Expand Down
4 changes: 2 additions & 2 deletions pkg/leakybucket/buckets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func testFile(t *testing.T, file string, holders []BucketFactory, response chan
in.ExpectMode = pipeline.TIMEMACHINE
log.Infof("Buckets input : %s", spew.Sdump(in))

ok, err := PourItemToHolders(ctx, in, holders, buckets)
ok, err := PourItemToHolders(ctx, in, holders, buckets, false)
if err != nil {
t.Fatalf("Failed to pour : %s", err)
}
Expand All @@ -207,7 +207,7 @@ POLL_AGAIN:
results = append(results, ret)
if ret.Overflow.Reprocess {
log.Errorf("Overflow being reprocessed.")
ok, err := PourItemToHolders(ctx, ret, holders, buckets)
ok, err := PourItemToHolders(ctx, ret, holders, buckets, false)
if err != nil {
t.Fatalf("Failed to pour : %s", err)
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/leakybucket/manager_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
var (
serialized map[string]Leaky
BucketPourCache map[string][]pipeline.Event = make(map[string][]pipeline.Event)
BucketPourTrack bool
bucketPourMu sync.Mutex
)

Expand Down Expand Up @@ -75,7 +74,7 @@ func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) {
}
}

func PourItemToBucket(ctx context.Context, bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *pipeline.Event) (bool, error) {
func PourItemToBucket(ctx context.Context, bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *pipeline.Event, track bool) (bool, error) {
var sent bool
var buckey = bucket.Mapkey
var err error
Expand Down Expand Up @@ -143,7 +142,7 @@ func PourItemToBucket(ctx context.Context, bucket *Leaky, holder BucketFactory,
select {
case bucket.In <- parsed:
// holder.logger.Tracef("Successfully sent !")
if BucketPourTrack {
if track {
evt := deepcopy.Copy(*parsed).(pipeline.Event)

bucketPourMu.Lock()
Expand Down Expand Up @@ -204,10 +203,10 @@ func LoadOrStoreBucketFromHolder(ctx context.Context, partitionKey string, bucke

var orderEvent map[string]*sync.WaitGroup

func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []BucketFactory, buckets *Buckets) (bool, error) {
func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []BucketFactory, buckets *Buckets, track bool) (bool, error) {
var ok, condition, poured bool

if BucketPourTrack {
if track {
evt := deepcopy.Copy(parsed).(pipeline.Event)

bucketPourMu.Lock()
Expand Down Expand Up @@ -275,7 +274,7 @@ func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []Buc
orderEvent[buckey].Add(1)
}

ok, err := PourItemToBucket(ctx, bucket, holders[idx], buckets, &parsed)
ok, err := PourItemToBucket(ctx, bucket, holders[idx], buckets, &parsed, track)

if bucket.orderEvent {
orderEvent[buckey].Wait()
Expand Down
4 changes: 2 additions & 2 deletions pkg/leakybucket/manager_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestGCandDump(t *testing.T) {

in := pipeline.Event{Parsed: map[string]string{"something": "something"}}
// pour an item that will go to leaky + counter
ok, err := PourItemToHolders(ctx, in, Holders, buckets)
ok, err := PourItemToHolders(ctx, in, Holders, buckets, false)
if err != nil {
t.Fatalf("while pouring item: %s", err)
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestShutdownBuckets(t *testing.T) {
in := pipeline.Event{Parsed: map[string]string{"something": "something"}}
// pour an item that will go to leaky + counter
ctx, cancel := context.WithCancel(t.Context())
ok, err := PourItemToHolders(ctx, in, Holders, buckets)
ok, err := PourItemToHolders(ctx, in, Holders, buckets, false)
if err != nil {
t.Fatalf("while pouring item : %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/parser/parsing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func testSubSet(testSet TestFile, pctx UnixParserCtx, nodes []Node) (bool, error
var results []pipeline.Event

for _, in := range testSet.Lines {
out, err := Parse(pctx, in, nodes)
out, err := Parse(pctx, in, nodes, false)
if err != nil {
log.Errorf("Failed to process %s : %v", spew.Sdump(in), err)
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/parser/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,6 @@ func stageidx(stage string, stages []string) int {
return -1
}

var ParseDump bool

var (
StageParseCache dumps.ParserResults = make(dumps.ParserResults)
StageParseMutex sync.Mutex
Expand All @@ -254,7 +252,7 @@ var (
})
)

func Parse(ctx UnixParserCtx, xp pipeline.Event, nodes []Node) (pipeline.Event, error) {
func Parse(ctx UnixParserCtx, xp pipeline.Event, nodes []Node, dump bool) (pipeline.Event, error) {
event := xp

/* the stage is undefined, probably line is freshly acquired, set to first stage !*/
Expand Down Expand Up @@ -288,7 +286,7 @@ func Parse(ctx UnixParserCtx, xp pipeline.Event, nodes []Node) (pipeline.Event,
log.Tracef("INPUT '%s'", event.Line.Raw)
}

if ParseDump {
if dump {
ensureStageCache()
}

Expand Down Expand Up @@ -339,7 +337,7 @@ func Parse(ctx UnixParserCtx, xp pipeline.Event, nodes []Node) (pipeline.Event,

clog.Tracef("node (%s) ret : %v", nodes[idx].rn, ret)

if ParseDump {
if dump {
var parserIdxInStage int

// copy outside of critical section
Expand Down
Loading