diff --git a/cmd/crowdsec/output.go b/cmd/crowdsec/output.go index c3fa445d5bd..459f25e9acd 100644 --- a/cmd/crowdsec/output.go +++ b/cmd/crowdsec/output.go @@ -20,18 +20,15 @@ func dedupAlerts(alerts []pipeline.RuntimeAlert) []*models.Alert { for idx, alert := range alerts { log.Tracef("alert %d/%d", idx, len(alerts)) - // if we have more than one source, we need to dedup - if len(alert.Sources) == 0 || len(alert.Sources) == 1 { + if len(alert.Sources) <= 1 { dedupCache = append(dedupCache, alert.Alert) continue } - for k := range alert.Sources { - refsrc := *alert.Alert // copy - + // if we have more than one source, we need to dedup + for k, src := range alert.Sources { log.Tracef("source[%s]", k) - - src := alert.Sources[k] + refsrc := *alert.Alert // copy refsrc.Source = &src dedupCache = append(dedupCache, &refsrc) } @@ -57,8 +54,15 @@ func PushAlerts(ctx context.Context, alerts []pipeline.RuntimeAlert, client *api var bucketOverflows []pipeline.Event -func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pipeline.Event, buckets *leaky.Buckets, postOverflowCTX parser.UnixParserCtx, - postOverflowNodes []parser.Node, client *apiclient.ApiClient) error { +func runOutput( + ctx context.Context, + input chan pipeline.Event, + overflow chan pipeline.Event, + buckets *leaky.Buckets, + postOverflowCTX parser.UnixParserCtx, + postOverflowNodes []parser.Node, + client *apiclient.ApiClient, +) error { var ( cache []pipeline.RuntimeAlert cacheMutex sync.Mutex @@ -73,8 +77,7 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip if len(cache) > 0 { cacheMutex.Lock() cachecopy := cache - newcache := make([]pipeline.RuntimeAlert, 0) - cache = newcache + cache = nil cacheMutex.Unlock() /* This loop needs to block as little as possible as scenarios directly write to the input chan @@ -103,35 +106,33 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip } return nil case event := <-overflow: + ov := event.Overflow // if alert is empty and mapKey is present, the overflow is just to cleanup bucket - if event.Overflow.Alert == nil && event.Overflow.Mapkey != "" { + if ov.Alert == nil && ov.Mapkey != "" { buckets.Bucket_map.Delete(event.Overflow.Mapkey) break } + /* process post overflow parser nodes */ event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes) if err != nil { return fmt.Errorf("postoverflow failed: %w", err) } - log.Info(*event.Overflow.Alert.Message) + log.Info(*ov.Alert.Message) // if the Alert is nil, it's to signal bucket is ready for GC, don't track this // dump after postoveflow processing to avoid missing whitelist info - if dumpStates && event.Overflow.Alert != nil { - if bucketOverflows == nil { - bucketOverflows = make([]pipeline.Event, 0) - } - + if dumpStates && ov.Alert != nil { bucketOverflows = append(bucketOverflows, event) } - if event.Overflow.Whitelisted { - log.Infof("[%s] is whitelisted, skip.", *event.Overflow.Alert.Message) + if ov.Whitelisted { + log.Infof("[%s] is whitelisted, skip.", *ov.Alert.Message) continue } - if event.Overflow.Reprocess { + if ov.Reprocess { select { case input <- event: log.Debug("Reprocessing overflow event") @@ -139,12 +140,13 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip log.Debug("Reprocessing overflow event: parsing is dead, skipping") } } + if dumpStates { continue } cacheMutex.Lock() - cache = append(cache, event.Overflow) + cache = append(cache, ov) cacheMutex.Unlock() } } diff --git a/cmd/crowdsec/parse.go b/cmd/crowdsec/parse.go index 5be2a519f2e..cbfacc4422b 100644 --- a/cmd/crowdsec/parse.go +++ b/cmd/crowdsec/parse.go @@ -12,6 +12,49 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) +func parseEvent( + event pipeline.Event, + parserCTX parser.UnixParserCtx, + nodes []parser.Node, +) *pipeline.Event { + if !event.Process { + return nil + } + /*Application security engine is going to generate 2 events: + - one that is treated as a log and can go to scenarios + - another one that will go directly to LAPI*/ + if event.Type == pipeline.APPSEC { + outputEventChan <- event + return nil + } + if event.Line.Module == "" { + log.Errorf("empty event.Line.Module field, the acquisition module must set it ! : %+v", event.Line) + return nil + } + metrics.GlobalParserHits.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc() + + startParsing := time.Now() + /* parse the log using magic */ + parsed, err := parser.Parse(parserCTX, event, nodes) + if err != nil { + log.Errorf("failed parsing: %v", err) + } + elapsed := time.Since(startParsing) + metrics.GlobalParsingHistogram.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Observe(elapsed.Seconds()) + if !parsed.Process { + metrics.GlobalParserHitsKo.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc() + log.Debugf("Discarding line %+v", parsed) + return nil + } + metrics.GlobalParserHitsOk.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc() + if parsed.Whitelisted { + log.Debugf("event whitelisted, discard") + return nil + } + + return &parsed +} + func runParse(ctx context.Context, input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) { for { select { @@ -19,41 +62,11 @@ func runParse(ctx context.Context, input chan pipeline.Event, output chan pipeli log.Infof("Killing parser routines") return case event := <-input: - if !event.Process { - continue - } - /*Application security engine is going to generate 2 events: - - one that is treated as a log and can go to scenarios - - another one that will go directly to LAPI*/ - if event.Type == pipeline.APPSEC { - outputEventChan <- event - continue - } - if event.Line.Module == "" { - log.Errorf("empty event.Line.Module field, the acquisition module must set it ! : %+v", event.Line) - continue - } - metrics.GlobalParserHits.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc() - - startParsing := time.Now() - /* parse the log using magic */ - parsed, err := parser.Parse(parserCTX, event, nodes) - if err != nil { - log.Errorf("failed parsing: %v", err) - } - elapsed := time.Since(startParsing) - metrics.GlobalParsingHistogram.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Observe(elapsed.Seconds()) - if !parsed.Process { - metrics.GlobalParserHitsKo.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc() - log.Debugf("Discarding line %+v", parsed) - continue - } - metrics.GlobalParserHitsOk.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc() - if parsed.Whitelisted { - log.Debugf("event whitelisted, discard") + parsed := parseEvent(event, parserCTX, nodes) + if parsed == nil { continue } - output <- parsed + output <- *parsed } } } diff --git a/cmd/crowdsec/pour.go b/cmd/crowdsec/pour.go index 0ac9dfa80cd..402066d0595 100644 --- a/cmd/crowdsec/pour.go +++ b/cmd/crowdsec/pour.go @@ -13,21 +13,26 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) -func maybeGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.Config) { +func shouldTriggerGC(count int) bool { + return count % 5000 == 0 +} + +func triggerGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.Config) { log.Infof("%d existing buckets", leaky.LeakyRoutineCount) // when in forensics mode, garbage collect buckets - if cConfig.Crowdsec.BucketsGCEnabled { - if parsed.MarshaledTime != "" { - z := &time.Time{} - if err := z.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil { - log.Warningf("Failed to parse time from event '%s': %s", parsed.MarshaledTime, err) - } else { - log.Warning("Starting buckets garbage collection ...") + if !cConfig.Crowdsec.BucketsGCEnabled || parsed.MarshaledTime == "" { + return + } - leaky.GarbageCollectBuckets(*z, buckets) - } - } + z := &time.Time{} + if err := z.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil { + log.Warningf("Failed to parse time from event '%s': %s", parsed.MarshaledTime, err) + return } + + log.Warning("Starting buckets garbage collection ...") + + leaky.GarbageCollectBuckets(*z, buckets) } func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) { @@ -43,8 +48,8 @@ func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.Buc startTime := time.Now() count++ - if count%5000 == 0 { - maybeGC(parsed, buckets, cConfig) + if shouldTriggerGC(count) { + triggerGC(parsed, buckets, cConfig) } // here we can bucketify with parsed poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets)