Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"os"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -97,12 +98,12 @@ func startHeartBeat(ctx context.Context, _ *csconfig.Config, apiClient *apiclien
apiClient.HeartBeat.StartHeartBeat(ctx)
}

func startOutputRoutines(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, apiClient *apiclient.ApiClient, stageCollector *parser.StageParseCollector, bucketOverflows []pipeline.Event) {
func startOutputRoutines(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, apiClient *apiclient.ApiClient, sd *StateDumper) {
for idx := range cConfig.Crowdsec.OutputRoutinesCount {
log.WithField("idx", idx).Info("Starting output routine")
outputsTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/runOutput/"+strconv.Itoa(idx))
return runOutput(ctx, inEvents, outEvents, buckets, *parsers.PovfwCtx, parsers.Povfwnodes, apiClient, stageCollector, bucketOverflows)
return runOutput(ctx, inEvents, outEvents, buckets, *parsers.PovfwCtx, parsers.Povfwnodes, apiClient, sd)
})
}
}
Expand Down Expand Up @@ -157,7 +158,7 @@ func runCrowdsec(

startHeartBeat(ctx, cConfig, apiClient)

startOutputRoutines(ctx, cConfig, parsers, apiClient, sd.StageParse, sd.BucketOverflows)
startOutputRoutines(ctx, cConfig, parsers, apiClient, sd)

if err := startLPMetrics(ctx, cConfig, apiClient, hub, datasources); err != nil {
return err
Expand Down Expand Up @@ -222,7 +223,7 @@ func serveCrowdsec(
log.Fatal(err)
}

return nil
os.Exit(0)
}

return nil
Expand Down
7 changes: 3 additions & 4 deletions cmd/crowdsec/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func runOutput(
postOverflowCTX parser.UnixParserCtx,
postOverflowNodes []parser.Node,
client *apiclient.ApiClient,
stageCollector *parser.StageParseCollector,
bucketOverflows []pipeline.Event,
sd *StateDumper,
) error {
var (
cache []pipeline.RuntimeAlert
Expand Down Expand Up @@ -114,7 +113,7 @@ func runOutput(
}

/* process post overflow parser nodes */
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes, stageCollector)
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes, sd.StageParse)
if err != nil {
return fmt.Errorf("postoverflow failed: %w", err)
}
Expand All @@ -124,7 +123,7 @@ func runOutput(
// if the Alert is nil, it's to signal bucket is ready for GC, don't track this
// dump after postoveflow processing to avoid missing whitelist info
if flags.DumpDir != "" && ov.Alert != nil {
bucketOverflows = append(bucketOverflows, event)
sd.BucketOverflows = append(sd.BucketOverflows, event)
}

if ov.Whitelisted {
Expand Down