Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions cmd/crowdsec/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@ func dedupAlerts(alerts []pipeline.RuntimeAlert) []*models.Alert {

for idx, alert := range alerts {
log.Tracef("alert %d/%d", idx, len(alerts))
// if we have more than one source, we need to dedup
if len(alert.Sources) == 0 || len(alert.Sources) == 1 {
if len(alert.Sources) <= 1 {
dedupCache = append(dedupCache, alert.Alert)
continue
}

for k := range alert.Sources {
refsrc := *alert.Alert // copy

// if we have more than one source, we need to dedup
for k, src := range alert.Sources {
log.Tracef("source[%s]", k)

src := alert.Sources[k]
refsrc := *alert.Alert // copy
refsrc.Source = &src
dedupCache = append(dedupCache, &refsrc)
}
Expand All @@ -57,8 +54,15 @@ func PushAlerts(ctx context.Context, alerts []pipeline.RuntimeAlert, client *api

var bucketOverflows []pipeline.Event

func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pipeline.Event, buckets *leaky.Buckets, postOverflowCTX parser.UnixParserCtx,
postOverflowNodes []parser.Node, client *apiclient.ApiClient) error {
func runOutput(
ctx context.Context,
input chan pipeline.Event,
overflow chan pipeline.Event,
buckets *leaky.Buckets,
postOverflowCTX parser.UnixParserCtx,
postOverflowNodes []parser.Node,
client *apiclient.ApiClient,
) error {
var (
cache []pipeline.RuntimeAlert
cacheMutex sync.Mutex
Expand All @@ -73,8 +77,7 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip
if len(cache) > 0 {
cacheMutex.Lock()
cachecopy := cache
newcache := make([]pipeline.RuntimeAlert, 0)
cache = newcache
cache = nil
cacheMutex.Unlock()
/*
This loop needs to block as little as possible as scenarios directly write to the input chan
Expand Down Expand Up @@ -103,48 +106,47 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip
}
return nil
case event := <-overflow:
ov := event.Overflow
// if alert is empty and mapKey is present, the overflow is just to cleanup bucket
if event.Overflow.Alert == nil && event.Overflow.Mapkey != "" {
if ov.Alert == nil && ov.Mapkey != "" {
buckets.Bucket_map.Delete(event.Overflow.Mapkey)
break
}

/* process post overflow parser nodes */
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes)
if err != nil {
return fmt.Errorf("postoverflow failed: %w", err)
}

log.Info(*event.Overflow.Alert.Message)
log.Info(*ov.Alert.Message)

// if the Alert is nil, it's to signal bucket is ready for GC, don't track this
// dump after postoveflow processing to avoid missing whitelist info
if dumpStates && event.Overflow.Alert != nil {
if bucketOverflows == nil {
bucketOverflows = make([]pipeline.Event, 0)
}

if dumpStates && ov.Alert != nil {
bucketOverflows = append(bucketOverflows, event)
}

if event.Overflow.Whitelisted {
log.Infof("[%s] is whitelisted, skip.", *event.Overflow.Alert.Message)
if ov.Whitelisted {
log.Infof("[%s] is whitelisted, skip.", *ov.Alert.Message)
continue
}

if event.Overflow.Reprocess {
if ov.Reprocess {
select {
case input <- event:
log.Debug("Reprocessing overflow event")
case <-ctx.Done():
log.Debug("Reprocessing overflow event: parsing is dead, skipping")
}
}

if dumpStates {
continue
}

cacheMutex.Lock()
cache = append(cache, event.Overflow)
cache = append(cache, ov)
cacheMutex.Unlock()
}
}
Expand Down
79 changes: 46 additions & 33 deletions cmd/crowdsec/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,61 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
)

func parseEvent(
event pipeline.Event,
parserCTX parser.UnixParserCtx,
nodes []parser.Node,
) *pipeline.Event {
if !event.Process {
return nil
}
/*Application security engine is going to generate 2 events:
- one that is treated as a log and can go to scenarios
- another one that will go directly to LAPI*/
if event.Type == pipeline.APPSEC {
outputEventChan <- event
return nil
}
if event.Line.Module == "" {
log.Errorf("empty event.Line.Module field, the acquisition module must set it ! : %+v", event.Line)
return nil
}
metrics.GlobalParserHits.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc()

startParsing := time.Now()
/* parse the log using magic */
parsed, err := parser.Parse(parserCTX, event, nodes)
if err != nil {
log.Errorf("failed parsing: %v", err)
}
elapsed := time.Since(startParsing)
metrics.GlobalParsingHistogram.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Observe(elapsed.Seconds())
if !parsed.Process {
metrics.GlobalParserHitsKo.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc()
log.Debugf("Discarding line %+v", parsed)
return nil
}
metrics.GlobalParserHitsOk.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc()
if parsed.Whitelisted {
log.Debugf("event whitelisted, discard")
return nil
}

return &parsed
}

func runParse(ctx context.Context, input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) {
for {
select {
case <-ctx.Done():
log.Infof("Killing parser routines")
return
case event := <-input:
if !event.Process {
continue
}
/*Application security engine is going to generate 2 events:
- one that is treated as a log and can go to scenarios
- another one that will go directly to LAPI*/
if event.Type == pipeline.APPSEC {
outputEventChan <- event
continue
}
if event.Line.Module == "" {
log.Errorf("empty event.Line.Module field, the acquisition module must set it ! : %+v", event.Line)
continue
}
metrics.GlobalParserHits.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc()

startParsing := time.Now()
/* parse the log using magic */
parsed, err := parser.Parse(parserCTX, event, nodes)
if err != nil {
log.Errorf("failed parsing: %v", err)
}
elapsed := time.Since(startParsing)
metrics.GlobalParsingHistogram.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Observe(elapsed.Seconds())
if !parsed.Process {
metrics.GlobalParserHitsKo.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc()
log.Debugf("Discarding line %+v", parsed)
continue
}
metrics.GlobalParserHitsOk.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc()
if parsed.Whitelisted {
log.Debugf("event whitelisted, discard")
parsed := parseEvent(event, parserCTX, nodes)
if parsed == nil {
continue
}
output <- parsed
output <- *parsed
}
}
}
31 changes: 18 additions & 13 deletions cmd/crowdsec/pour.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,26 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
)

func maybeGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.Config) {
func shouldTriggerGC(count int) bool {
return count % 5000 == 0
}

func triggerGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.Config) {
log.Infof("%d existing buckets", leaky.LeakyRoutineCount)
// when in forensics mode, garbage collect buckets
if cConfig.Crowdsec.BucketsGCEnabled {
if parsed.MarshaledTime != "" {
z := &time.Time{}
if err := z.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil {
log.Warningf("Failed to parse time from event '%s': %s", parsed.MarshaledTime, err)
} else {
log.Warning("Starting buckets garbage collection ...")
if !cConfig.Crowdsec.BucketsGCEnabled || parsed.MarshaledTime == "" {
return
}

leaky.GarbageCollectBuckets(*z, buckets)
}
}
z := &time.Time{}
if err := z.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil {
log.Warningf("Failed to parse time from event '%s': %s", parsed.MarshaledTime, err)
return
}

log.Warning("Starting buckets garbage collection ...")

leaky.GarbageCollectBuckets(*z, buckets)
}

func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) {
Expand All @@ -43,8 +48,8 @@ func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.Buc
startTime := time.Now()

count++
if count%5000 == 0 {
maybeGC(parsed, buckets, cConfig)
if shouldTriggerGC(count) {
triggerGC(parsed, buckets, cConfig)
}
// here we can bucketify with parsed
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets)
Expand Down