Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,10 @@ linters:
path: pkg/acquisition/modules/s3/source.go
text: found a struct that contains a context.Context field

- linters:
- unused
text: 'var _ is unused'

# migrate over time

- linters:
Expand Down
2 changes: 1 addition & 1 deletion cmd/crowdsec/appsec_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
)

func LoadAppsecRules(hub *cwhub.Hub) error {
func LoadAppsecRules(_ *cwhub.Hub) error {
return nil
}
25 changes: 11 additions & 14 deletions cmd/crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func initCrowdsec(ctx context.Context, cConfig *csconfig.Config, hub *cwhub.Hub,
return csParsers, datasources, nil
}

func startParserRoutines(cConfig *csconfig.Config, parsers *parser.Parsers) {
func startParserRoutines(cConfig *csconfig.Config, parsers *parser.Parsers, logLines chan pipeline.Event) {
// start go-routines for parsing, buckets pour and outputs.
parserWg := &sync.WaitGroup{}

Expand All @@ -79,10 +79,7 @@ func startParserRoutines(cConfig *csconfig.Config, parsers *parser.Parsers) {
parsersTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/runParse")

if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil {
// this error will never happen as parser.Parse is not able to return errors
return err
}
runParse(logLines, inputEventChan, *parsers.Ctx, parsers.Nodes)

return nil
})
Expand Down Expand Up @@ -147,7 +144,6 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap
apiClient,
lpMetricsDefaultInterval,
log.WithField("service", "lpmetrics"),
[]string{},
datasources,
hub,
)
Expand All @@ -171,11 +167,10 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap
}

// runCrowdsec starts the log processor service
func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource) error {
func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource, logLines chan pipeline.Event) error {
inputEventChan = make(chan pipeline.Event)
inputLineChan = make(chan pipeline.Event)

startParserRoutines(cConfig, parsers)
startParserRoutines(cConfig, parsers, logLines)

startBucketRoutines(cConfig)

Expand All @@ -194,15 +189,15 @@ func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser.

log.Info("Starting processing data")

if err := acquisition.StartAcquisition(ctx, dataSources, inputLineChan, &acquisTomb); err != nil {
if err := acquisition.StartAcquisition(ctx, dataSources, logLines, &acquisTomb); err != nil {
return fmt.Errorf("starting acquisition error: %w", err)
}

return nil
}

// serveCrowdsec wraps the log processor service
func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool) {
func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool, logLines chan pipeline.Event) {
crowdsecTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/serveCrowdsec")

Expand All @@ -213,7 +208,7 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf

agentReady <- true

if err := runCrowdsec(ctx, cConfig, parsers, hub, datasources); err != nil {
if err := runCrowdsec(ctx, cConfig, parsers, hub, datasources, logLines); err != nil {
log.Fatalf("unable to start crowdsec routines: %s", err)
}
}()
Expand All @@ -225,14 +220,16 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf
waitOnTomb()
log.Debugf("Shutting down crowdsec routines")

if err := ShutdownCrowdsecRoutines(); err != nil {
if err := ShutdownCrowdsecRoutines(logLines); err != nil {
return fmt.Errorf("unable to shutdown crowdsec routines: %w", err)
}

log.Debugf("everything is dead, return crowdsecTomb")

if dumpStates {
if err := dumpAllStates(); err != nil {
log.Debugf("Dumping parser+bucket states to %s", dumpFolder)

if err := dumpAllStates(dumpFolder); err != nil {
log.Fatal(err)
}

Expand Down
40 changes: 13 additions & 27 deletions cmd/crowdsec/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,38 @@ import (
"os"
"path/filepath"

log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/parser"
)

func dumpAllStates() error {
log.Debugf("Dumping parser+bucket states to %s", parser.DumpFolder)
func dumpAllStates(dir string) error {
err := os.MkdirAll(dir, 0o755)
if err != nil {
return err
}

if err := dumpState(
filepath.Join(parser.DumpFolder, "parser-dump.yaml"),
parser.StageParseCache,
); err != nil {
return fmt.Errorf("while dumping parser state: %w", err)
if err := dumpState(dir, "parser-dump.yaml", parser.StageParseCache); err != nil {
return fmt.Errorf("dumping parser state: %w", err)
}

if err := dumpState(
filepath.Join(parser.DumpFolder, "bucket-dump.yaml"),
bucketOverflows,
); err != nil {
return fmt.Errorf("while dumping bucket overflow state: %w", err)
if err := dumpState(dir, "bucket-dump.yaml", bucketOverflows); err != nil {
return fmt.Errorf("dumping bucket overflow state: %w", err)
}

if err := dumpState(
filepath.Join(parser.DumpFolder, "bucketpour-dump.yaml"),
leaky.BucketPourCache,
); err != nil {
return fmt.Errorf("while dumping bucket pour state: %w", err)
if err := dumpState(dir, "bucketpour-dump.yaml", leaky.BucketPourCache); err != nil {
return fmt.Errorf("dumping bucket pour state: %w", err)
}

return nil
}

func dumpState(destPath string, obj any) error {
dir := filepath.Dir(destPath)

err := os.MkdirAll(dir, 0o755)
if err != nil {
return err
}

func dumpState(dir, name string, obj any) error {
out, err := yaml.Marshal(obj)
if err != nil {
return err
}

return os.WriteFile(destPath, out, 0o666)
return os.WriteFile(filepath.Join(dir, name), out, 0o666)
}
5 changes: 2 additions & 3 deletions cmd/crowdsec/lpmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func getHubState(hub *cwhub.Hub) models.HubItems {
}

// newStaticMetrics is called when the process starts, or reloads the configuration
func newStaticMetrics(consoleOptions []string, datasources []acquisition.DataSource, hub *cwhub.Hub) staticMetrics {
func newStaticMetrics(datasources []acquisition.DataSource, hub *cwhub.Hub) staticMetrics {
datasourceMap := map[string]int64{}

for _, ds := range datasources {
Expand All @@ -112,11 +112,10 @@ func NewMetricsProvider(
apic *apiclient.ApiClient,
interval time.Duration,
logger *logrus.Entry,
consoleOptions []string,
datasources []acquisition.DataSource,
hub *cwhub.Hub,
) *MetricsProvider {
static := newStaticMetrics(consoleOptions, datasources, hub)
static := newStaticMetrics(datasources, hub)

logger.Debugf("Detected %s %s (family: %s)", static.osName, static.osVersion, static.osFamily)

Expand Down
6 changes: 3 additions & 3 deletions cmd/crowdsec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ var (
holders []leakybucket.BucketFactory
buckets *leakybucket.Buckets

inputLineChan chan pipeline.Event
inputEventChan chan pipeline.Event
outputEventChan chan pipeline.Event // the buckets init returns its own chan that is used for multiplexing
// settings
Expand Down Expand Up @@ -129,7 +128,6 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo

if dumpFolder != "" {
parser.ParseDump = true
parser.DumpFolder = dumpFolder
leakybucket.BucketPourTrack = true
dumpStates = true
}
Expand Down Expand Up @@ -231,7 +229,9 @@ func run() error {
return err
}

return StartRunSvc(ctx, cConfig)
logLines := make(chan pipeline.Event)

return StartRunSvc(ctx, cConfig, logLines)
}

func main() {
Expand Down
49 changes: 26 additions & 23 deletions cmd/crowdsec/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@ func dedupAlerts(alerts []pipeline.RuntimeAlert) []*models.Alert {

for idx, alert := range alerts {
log.Tracef("alert %d/%d", idx, len(alerts))
// if we have more than one source, we need to dedup
if len(alert.Sources) == 0 || len(alert.Sources) == 1 {
if len(alert.Sources) <= 1 {
dedupCache = append(dedupCache, alert.Alert)
continue
}

for k := range alert.Sources {
refsrc := *alert.Alert // copy

// if we have more than one source, we need to dedup
for k, src := range alert.Sources {
log.Tracef("source[%s]", k)

src := alert.Sources[k]
refsrc := *alert.Alert // copy
refsrc.Source = &src
dedupCache = append(dedupCache, &refsrc)
}
Expand All @@ -57,8 +54,15 @@ func PushAlerts(ctx context.Context, alerts []pipeline.RuntimeAlert, client *api

var bucketOverflows []pipeline.Event

func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pipeline.Event, buckets *leaky.Buckets, postOverflowCTX parser.UnixParserCtx,
postOverflowNodes []parser.Node, client *apiclient.ApiClient) error {
func runOutput(
ctx context.Context,
input chan pipeline.Event,
overflow chan pipeline.Event,
buckets *leaky.Buckets,
postOverflowCTX parser.UnixParserCtx,
postOverflowNodes []parser.Node,
client *apiclient.ApiClient,
) error {
var (
cache []pipeline.RuntimeAlert
cacheMutex sync.Mutex
Expand All @@ -73,8 +77,7 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip
if len(cache) > 0 {
cacheMutex.Lock()
cachecopy := cache
newcache := make([]pipeline.RuntimeAlert, 0)
cache = newcache
cache = nil
cacheMutex.Unlock()
/*
This loop needs to block as little as possible as scenarios directly write to the input chan
Expand Down Expand Up @@ -103,35 +106,34 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip
}
return nil
case event := <-overflow:
ov := event.Overflow

// if alert is empty and mapKey is present, the overflow is just to cleanup bucket
if event.Overflow.Alert == nil && event.Overflow.Mapkey != "" {
buckets.Bucket_map.Delete(event.Overflow.Mapkey)
if ov.Alert == nil && ov.Mapkey != "" {
buckets.Bucket_map.Delete(ov.Mapkey)
break
}

/* process post overflow parser nodes */
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes)
if err != nil {
return fmt.Errorf("postoverflow failed: %w", err)
}

log.Info(*event.Overflow.Alert.Message)
log.Info(*ov.Alert.Message)

// if the Alert is nil, it's to signal bucket is ready for GC, don't track this
// dump after postoveflow processing to avoid missing whitelist info
if dumpStates && event.Overflow.Alert != nil {
if bucketOverflows == nil {
bucketOverflows = make([]pipeline.Event, 0)
}

if dumpStates && ov.Alert != nil {
bucketOverflows = append(bucketOverflows, event)
}

if event.Overflow.Whitelisted {
log.Infof("[%s] is whitelisted, skip.", *event.Overflow.Alert.Message)
if ov.Whitelisted {
log.Infof("[%s] is whitelisted, skip.", *ov.Alert.Message)
continue
}

if event.Overflow.Reprocess {
if ov.Reprocess {
log.Debugf("Overflow being reprocessed.")
select {
case input <- event:
Expand All @@ -140,12 +142,13 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip
log.Debugf("parsing is dead, skipping")
}
}

if dumpStates {
continue
}

cacheMutex.Lock()
cache = append(cache, event.Overflow)
cache = append(cache, ov)
cacheMutex.Unlock()
}
}
Expand Down
Loading
Loading