Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/crowdsec-cli/cliexplain/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (cli *cliExplain) run(ctx context.Context) error {

defer func() {
if cli.flags.noClean {
fmt.Fprintf(os.Stdout, "Not removing dump directory: %s\n", dir)
return
}

Expand Down
51 changes: 34 additions & 17 deletions cmd/crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"os"
"strconv"
"time"

Expand All @@ -19,6 +18,7 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/metrics"
"github.com/crowdsecurity/crowdsec/pkg/parser"
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
Expand Down Expand Up @@ -70,23 +70,23 @@ func initCrowdsec(ctx context.Context, cConfig *csconfig.Config, hub *cwhub.Hub,
return csParsers, datasources, nil
}

func startParserRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers) {
func startParserRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers, stageCollector *parser.StageParseCollector) {
for idx := range cConfig.Crowdsec.ParserRoutinesCount {
log.WithField("idx", idx).Info("Starting parser routine")
g.Go(func() error {
defer trace.CatchPanic("crowdsec/runParse/"+strconv.Itoa(idx))
runParse(ctx, logLines, inEvents, *parsers.Ctx, parsers.Nodes)
runParse(ctx, logLines, inEvents, *parsers.Ctx, parsers.Nodes, stageCollector)
return nil
})
}
}

func startBucketRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config) {
func startBucketRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, pourCollector *leakybucket.PourCollector) {
for idx := range cConfig.Crowdsec.BucketsRoutinesCount {
log.WithField("idx", idx).Info("Starting bucket routine")
g.Go(func() error {
defer trace.CatchPanic("crowdsec/runPour/"+strconv.Itoa(idx))
runPour(ctx, inEvents, holders, buckets, cConfig)
runPour(ctx, inEvents, holders, buckets, cConfig, pourCollector)
return nil
})
}
Expand All @@ -97,12 +97,12 @@ func startHeartBeat(ctx context.Context, _ *csconfig.Config, apiClient *apiclien
apiClient.HeartBeat.StartHeartBeat(ctx)
}

func startOutputRoutines(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, apiClient *apiclient.ApiClient) {
func startOutputRoutines(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, apiClient *apiclient.ApiClient, stageCollector *parser.StageParseCollector, bucketOverflows []pipeline.Event) {
for idx := range cConfig.Crowdsec.OutputRoutinesCount {
log.WithField("idx", idx).Info("Starting output routine")
outputsTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/runOutput/"+strconv.Itoa(idx))
return runOutput(ctx, inEvents, outEvents, buckets, *parsers.PovfwCtx, parsers.Povfwnodes, apiClient)
return runOutput(ctx, inEvents, outEvents, buckets, *parsers.PovfwCtx, parsers.Povfwnodes, apiClient, stageCollector, bucketOverflows)
})
}
}
Expand Down Expand Up @@ -135,12 +135,20 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap
}

// runCrowdsec starts the log processor service
func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource) error {
func runCrowdsec(
ctx context.Context,
g *errgroup.Group,
cConfig *csconfig.Config,
parsers *parser.Parsers,
hub *cwhub.Hub,
datasources []acquisitionTypes.DataSource,
sd *StateDumper,
) error {
inEvents = make(chan pipeline.Event)
logLines = make(chan pipeline.Event)

startParserRoutines(ctx, g, cConfig, parsers)
startBucketRoutines(ctx, g, cConfig)
startParserRoutines(ctx, g, cConfig, parsers, sd.StageParse)
startBucketRoutines(ctx, g, cConfig, sd.Pour)

apiClient, err := apiclient.GetLAPIClient()
if err != nil {
Expand All @@ -149,7 +157,7 @@ func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Confi

startHeartBeat(ctx, cConfig, apiClient)

startOutputRoutines(ctx, cConfig, parsers, apiClient)
startOutputRoutines(ctx, cConfig, parsers, apiClient, sd.StageParse, sd.BucketOverflows)

if err := startLPMetrics(ctx, cConfig, apiClient, hub, datasources); err != nil {
return err
Expand All @@ -165,7 +173,15 @@ func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Confi
}

// serveCrowdsec wraps the log processor service
func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource, agentReady chan bool) {
func serveCrowdsec(
ctx context.Context,
parsers *parser.Parsers,
cConfig *csconfig.Config,
hub *cwhub.Hub,
datasources []acquisitionTypes.DataSource,
agentReady chan bool,
sd *StateDumper,
) {
cctx, cancel := context.WithCancel(ctx)

var g errgroup.Group
Expand All @@ -180,7 +196,7 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf

agentReady <- true

if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources); err != nil {
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources, sd); err != nil {
log.Fatalf("unable to start crowdsec routines: %s", err)
}
}()
Expand All @@ -197,15 +213,16 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf
}

log.Debugf("everything is dead, return crowdsecTomb")
log.Debugf("sd.DumpDir == %s", sd.DumpDir)

if flags.DumpDir != "" {
log.Debugf("Dumping parser+bucket states to %s", flags.DumpDir)
if sd.DumpDir != "" {
log.Debugf("Dumping parser+bucket states to %s", sd.DumpDir)

if err := dumpAllStates(flags.DumpDir); err != nil {
if err := sd.Dump(); err != nil {
log.Fatal(err)
}

os.Exit(0)
return nil
}

return nil
Expand Down
45 changes: 40 additions & 5 deletions cmd/crowdsec/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,66 @@ import (

"gopkg.in/yaml.v3"

leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/parser"
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
)

func dumpAllStates(dir string) error {
type StateDumper struct {
DumpDir string
Pour *leakybucket.PourCollector
StageParse *parser.StageParseCollector
BucketOverflows []pipeline.Event
}

func NewStateDumper(dumpDir string) *StateDumper {
if dumpDir == "" {
return &StateDumper{}
}

return &StateDumper{
DumpDir: dumpDir,
Pour: leakybucket.NewPourCollector(),
StageParse: parser.NewStageParseCollector(),
}
}

func (sd *StateDumper) Dump() error {
dir := sd.DumpDir

err := os.MkdirAll(dir, 0o755)
if err != nil {
return err
}

if err := dumpState(dir, "parser-dump.yaml", parser.StageParseCache); err != nil {
if err := dumpCollector(dir, "parser-dump.yaml", sd.StageParse); err != nil {
return fmt.Errorf("dumping parser state: %w", err)
}

if err := dumpState(dir, "bucket-dump.yaml", bucketOverflows); err != nil {
if err := dumpState(dir, "bucket-dump.yaml", sd.BucketOverflows); err != nil {
return fmt.Errorf("dumping bucket overflow state: %w", err)
}

if err := dumpState(dir, "bucketpour-dump.yaml", leaky.BucketPourCache); err != nil {
if err := dumpCollector(dir, "bucketpour-dump.yaml", sd.Pour); err != nil {
return fmt.Errorf("dumping bucket pour state: %w", err)
}

return nil
}

type YAMLDumper interface {
DumpYAML() ([]byte, error)
}

func dumpCollector(dir, name string, collector YAMLDumper) error {
out, err := collector.DumpYAML()
if err != nil {
return err
}

return os.WriteFile(filepath.Join(dir, name), out, 0o644)
}

func dumpState(dir, name string, obj any) error {
out, err := yaml.Marshal(obj)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/crowdsec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ func run(flags Flags) error {
return err
}

return StartRunSvc(ctx, cConfig)
sd := NewStateDumper(flags.DumpDir)

return StartRunSvc(ctx, cConfig, sd)
}

func main() {
Expand Down
7 changes: 3 additions & 4 deletions cmd/crowdsec/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ func PushAlerts(ctx context.Context, alerts []pipeline.RuntimeAlert, client *api
return nil
}

var bucketOverflows []pipeline.Event

func runOutput(
ctx context.Context,
input chan pipeline.Event,
Expand All @@ -62,6 +60,8 @@ func runOutput(
postOverflowCTX parser.UnixParserCtx,
postOverflowNodes []parser.Node,
client *apiclient.ApiClient,
stageCollector *parser.StageParseCollector,
bucketOverflows []pipeline.Event,
) error {
var (
cache []pipeline.RuntimeAlert
Expand Down Expand Up @@ -114,8 +114,7 @@ func runOutput(
}

/* process post overflow parser nodes */
dump := flags.DumpDir != ""
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes, dump)
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes, stageCollector)
if err != nil {
return fmt.Errorf("postoverflow failed: %w", err)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/crowdsec/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func parseEvent(
event pipeline.Event,
parserCTX parser.UnixParserCtx,
nodes []parser.Node,
stageCollector *parser.StageParseCollector,
) *pipeline.Event {
if !event.Process {
return nil
Expand All @@ -35,8 +36,7 @@ func parseEvent(

startParsing := time.Now()
/* parse the log using magic */
dump := flags.DumpDir != ""
parsed, err := parser.Parse(parserCTX, event, nodes, dump)
parsed, err := parser.Parse(parserCTX, event, nodes, stageCollector)
if err != nil {
log.Errorf("failed parsing: %v", err)
}
Expand All @@ -56,14 +56,14 @@ func parseEvent(
return &parsed
}

func runParse(ctx context.Context, input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) {
func runParse(ctx context.Context, input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node, stageCollector *parser.StageParseCollector) {
for {
select {
case <-ctx.Done():
log.Infof("Killing parser routines")
return
case event := <-input:
parsed := parseEvent(event, parserCTX, nodes)
parsed := parseEvent(event, parserCTX, nodes, stageCollector)
if parsed == nil {
continue
}
Expand Down
5 changes: 2 additions & 3 deletions cmd/crowdsec/pour.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func triggerGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.
leaky.GarbageCollectBuckets(*z, buckets)
}

func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) {
func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config, pourCollector *leaky.PourCollector) {
count := 0

for {
Expand All @@ -52,8 +52,7 @@ func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.Buc
triggerGC(parsed, buckets, cConfig)
}
// here we can bucketify with parsed
track := flags.DumpDir != ""
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets, track)
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets, pourCollector)
if err != nil {
log.Warningf("bucketify failed for: %v with %s", parsed, err)
continue
Expand Down
8 changes: 6 additions & 2 deletions cmd/crowdsec/run_in_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ func isWindowsService() (bool, error) {
return false, nil
}

func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
func StartRunSvc(
ctx context.Context,
cConfig *csconfig.Config,
sd *StateDumper,
) error {
defer trace.CatchPanic("crowdsec/StartRunSvc")

// Always try to stop CPU profiling to avoid passing flags around
Expand Down Expand Up @@ -63,5 +67,5 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
}()
}

return Serve(ctx, cConfig, agentReady)
return Serve(ctx, cConfig, agentReady, sd)
}
10 changes: 5 additions & 5 deletions cmd/crowdsec/run_in_svc_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func isWindowsService() (bool, error) {
return svc.IsWindowsService()
}

func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, sd *StateDumper) error {
const svcName = "CrowdSec"
const svcDescription = "Crowdsec IPS/IDS"

Expand All @@ -36,7 +36,7 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
return fmt.Errorf("failed to determine if we are running in windows service mode: %w", err)
}
if isRunninginService {
return runService(svcName)
return runService(svcName, sd)
}

switch flags.WinSvc {
Expand All @@ -61,15 +61,15 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
return fmt.Errorf("failed to %s %s: %w", flags.WinSvc, svcName, err)
}
case "":
return WindowsRun(ctx, cConfig)
return WindowsRun(ctx, cConfig, sd)
default:
return fmt.Errorf("Invalid value for winsvc parameter: %s", flags.WinSvc)
}

return nil
}

func WindowsRun(ctx context.Context, cConfig *csconfig.Config) error {
func WindowsRun(ctx context.Context, cConfig *csconfig.Config, sd *StateDumper) error {
if fflag.PProfBlockProfile.IsEnabled() {
runtime.SetBlockProfileRate(1)
runtime.SetMutexProfileFraction(1)
Expand All @@ -96,5 +96,5 @@ func WindowsRun(ctx context.Context, cConfig *csconfig.Config) error {
registerPrometheus(cConfig.Prometheus)
go servePrometheus(cConfig.Prometheus, dbClient, agentReady)
}
return Serve(ctx, cConfig, agentReady)
return Serve(ctx, cConfig, agentReady, sd)
}
Loading
Loading