diff --git a/.golangci.yml b/.golangci.yml index 0b91c4f0639..4dba7922d09 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -488,6 +488,10 @@ linters: path: pkg/acquisition/modules/s3/source.go text: found a struct that contains a context.Context field + - linters: + - unused + text: 'var _ is unused' + # migrate over time - linters: diff --git a/cmd/crowdsec/appsec_stub.go b/cmd/crowdsec/appsec_stub.go index 4a65b32a9ad..2e4c8d753c6 100644 --- a/cmd/crowdsec/appsec_stub.go +++ b/cmd/crowdsec/appsec_stub.go @@ -6,6 +6,6 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/cwhub" ) -func LoadAppsecRules(hub *cwhub.Hub) error { +func LoadAppsecRules(_ *cwhub.Hub) error { return nil } diff --git a/cmd/crowdsec/crowdsec.go b/cmd/crowdsec/crowdsec.go index 6ea50ff2329..3832c15831d 100644 --- a/cmd/crowdsec/crowdsec.go +++ b/cmd/crowdsec/crowdsec.go @@ -68,7 +68,7 @@ func initCrowdsec(ctx context.Context, cConfig *csconfig.Config, hub *cwhub.Hub, return csParsers, datasources, nil } -func startParserRoutines(cConfig *csconfig.Config, parsers *parser.Parsers) { +func startParserRoutines(cConfig *csconfig.Config, parsers *parser.Parsers, logLines chan pipeline.Event) { // start go-routines for parsing, buckets pour and outputs. parserWg := &sync.WaitGroup{} @@ -79,10 +79,7 @@ func startParserRoutines(cConfig *csconfig.Config, parsers *parser.Parsers) { parsersTomb.Go(func() error { defer trace.CatchPanic("crowdsec/runParse") - if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil { - // this error will never happen as parser.Parse is not able to return errors - return err - } + runParse(logLines, inputEventChan, *parsers.Ctx, parsers.Nodes) return nil }) @@ -147,7 +144,6 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap apiClient, lpMetricsDefaultInterval, log.WithField("service", "lpmetrics"), - []string{}, datasources, hub, ) @@ -171,11 +167,10 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap } // runCrowdsec starts the log processor service -func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource) error { +func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource, logLines chan pipeline.Event) error { inputEventChan = make(chan pipeline.Event) - inputLineChan = make(chan pipeline.Event) - startParserRoutines(cConfig, parsers) + startParserRoutines(cConfig, parsers, logLines) startBucketRoutines(cConfig) @@ -194,7 +189,7 @@ func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser. log.Info("Starting processing data") - if err := acquisition.StartAcquisition(ctx, dataSources, inputLineChan, &acquisTomb); err != nil { + if err := acquisition.StartAcquisition(ctx, dataSources, logLines, &acquisTomb); err != nil { return fmt.Errorf("starting acquisition error: %w", err) } @@ -202,7 +197,7 @@ func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser. } // serveCrowdsec wraps the log processor service -func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool) { +func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool, logLines chan pipeline.Event) { crowdsecTomb.Go(func() error { defer trace.CatchPanic("crowdsec/serveCrowdsec") @@ -213,7 +208,7 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf agentReady <- true - if err := runCrowdsec(ctx, cConfig, parsers, hub, datasources); err != nil { + if err := runCrowdsec(ctx, cConfig, parsers, hub, datasources, logLines); err != nil { log.Fatalf("unable to start crowdsec routines: %s", err) } }() @@ -225,14 +220,16 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf waitOnTomb() log.Debugf("Shutting down crowdsec routines") - if err := ShutdownCrowdsecRoutines(); err != nil { + if err := ShutdownCrowdsecRoutines(logLines); err != nil { return fmt.Errorf("unable to shutdown crowdsec routines: %w", err) } log.Debugf("everything is dead, return crowdsecTomb") if dumpStates { - if err := dumpAllStates(); err != nil { + log.Debugf("Dumping parser+bucket states to %s", dumpFolder) + + if err := dumpAllStates(dumpFolder); err != nil { log.Fatal(err) } diff --git a/cmd/crowdsec/dump.go b/cmd/crowdsec/dump.go index 33c65878b11..1c7f1d35a96 100644 --- a/cmd/crowdsec/dump.go +++ b/cmd/crowdsec/dump.go @@ -5,52 +5,38 @@ import ( "os" "path/filepath" - log "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/parser" ) -func dumpAllStates() error { - log.Debugf("Dumping parser+bucket states to %s", parser.DumpFolder) +func dumpAllStates(dir string) error { + err := os.MkdirAll(dir, 0o755) + if err != nil { + return err + } - if err := dumpState( - filepath.Join(parser.DumpFolder, "parser-dump.yaml"), - parser.StageParseCache, - ); err != nil { - return fmt.Errorf("while dumping parser state: %w", err) + if err := dumpState(dir, "parser-dump.yaml", parser.StageParseCache); err != nil { + return fmt.Errorf("dumping parser state: %w", err) } - if err := dumpState( - filepath.Join(parser.DumpFolder, "bucket-dump.yaml"), - bucketOverflows, - ); err != nil { - return fmt.Errorf("while dumping bucket overflow state: %w", err) + if err := dumpState(dir, "bucket-dump.yaml", bucketOverflows); err != nil { + return fmt.Errorf("dumping bucket overflow state: %w", err) } - if err := dumpState( - filepath.Join(parser.DumpFolder, "bucketpour-dump.yaml"), - leaky.BucketPourCache, - ); err != nil { - return fmt.Errorf("while dumping bucket pour state: %w", err) + if err := dumpState(dir, "bucketpour-dump.yaml", leaky.BucketPourCache); err != nil { + return fmt.Errorf("dumping bucket pour state: %w", err) } return nil } -func dumpState(destPath string, obj any) error { - dir := filepath.Dir(destPath) - - err := os.MkdirAll(dir, 0o755) - if err != nil { - return err - } - +func dumpState(dir, name string, obj any) error { out, err := yaml.Marshal(obj) if err != nil { return err } - return os.WriteFile(destPath, out, 0o666) + return os.WriteFile(filepath.Join(dir, name), out, 0o666) } diff --git a/cmd/crowdsec/lpmetrics.go b/cmd/crowdsec/lpmetrics.go index 4cb46792043..579c14b0970 100644 --- a/cmd/crowdsec/lpmetrics.go +++ b/cmd/crowdsec/lpmetrics.go @@ -88,7 +88,7 @@ func getHubState(hub *cwhub.Hub) models.HubItems { } // newStaticMetrics is called when the process starts, or reloads the configuration -func newStaticMetrics(consoleOptions []string, datasources []acquisition.DataSource, hub *cwhub.Hub) staticMetrics { +func newStaticMetrics(datasources []acquisition.DataSource, hub *cwhub.Hub) staticMetrics { datasourceMap := map[string]int64{} for _, ds := range datasources { @@ -112,11 +112,10 @@ func NewMetricsProvider( apic *apiclient.ApiClient, interval time.Duration, logger *logrus.Entry, - consoleOptions []string, datasources []acquisition.DataSource, hub *cwhub.Hub, ) *MetricsProvider { - static := newStaticMetrics(consoleOptions, datasources, hub) + static := newStaticMetrics(datasources, hub) logger.Debugf("Detected %s %s (family: %s)", static.osName, static.osVersion, static.osFamily) diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index 6ca465f0508..6392a60d0c1 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -47,7 +47,6 @@ var ( holders []leakybucket.BucketFactory buckets *leakybucket.Buckets - inputLineChan chan pipeline.Event inputEventChan chan pipeline.Event outputEventChan chan pipeline.Event // the buckets init returns its own chan that is used for multiplexing // settings @@ -129,7 +128,6 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo if dumpFolder != "" { parser.ParseDump = true - parser.DumpFolder = dumpFolder leakybucket.BucketPourTrack = true dumpStates = true } @@ -231,7 +229,9 @@ func run() error { return err } - return StartRunSvc(ctx, cConfig) + logLines := make(chan pipeline.Event) + + return StartRunSvc(ctx, cConfig, logLines) } func main() { diff --git a/cmd/crowdsec/output.go b/cmd/crowdsec/output.go index fcb372d1d57..b479f955370 100644 --- a/cmd/crowdsec/output.go +++ b/cmd/crowdsec/output.go @@ -20,18 +20,15 @@ func dedupAlerts(alerts []pipeline.RuntimeAlert) []*models.Alert { for idx, alert := range alerts { log.Tracef("alert %d/%d", idx, len(alerts)) - // if we have more than one source, we need to dedup - if len(alert.Sources) == 0 || len(alert.Sources) == 1 { + if len(alert.Sources) <= 1 { dedupCache = append(dedupCache, alert.Alert) continue } - for k := range alert.Sources { - refsrc := *alert.Alert // copy - + // if we have more than one source, we need to dedup + for k, src := range alert.Sources { log.Tracef("source[%s]", k) - - src := alert.Sources[k] + refsrc := *alert.Alert // copy refsrc.Source = &src dedupCache = append(dedupCache, &refsrc) } @@ -57,8 +54,15 @@ func PushAlerts(ctx context.Context, alerts []pipeline.RuntimeAlert, client *api var bucketOverflows []pipeline.Event -func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pipeline.Event, buckets *leaky.Buckets, postOverflowCTX parser.UnixParserCtx, - postOverflowNodes []parser.Node, client *apiclient.ApiClient) error { +func runOutput( + ctx context.Context, + input chan pipeline.Event, + overflow chan pipeline.Event, + buckets *leaky.Buckets, + postOverflowCTX parser.UnixParserCtx, + postOverflowNodes []parser.Node, + client *apiclient.ApiClient, +) error { var ( cache []pipeline.RuntimeAlert cacheMutex sync.Mutex @@ -73,8 +77,7 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip if len(cache) > 0 { cacheMutex.Lock() cachecopy := cache - newcache := make([]pipeline.RuntimeAlert, 0) - cache = newcache + cache = nil cacheMutex.Unlock() /* This loop needs to block as little as possible as scenarios directly write to the input chan @@ -103,35 +106,34 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip } return nil case event := <-overflow: + ov := event.Overflow + // if alert is empty and mapKey is present, the overflow is just to cleanup bucket - if event.Overflow.Alert == nil && event.Overflow.Mapkey != "" { - buckets.Bucket_map.Delete(event.Overflow.Mapkey) + if ov.Alert == nil && ov.Mapkey != "" { + buckets.Bucket_map.Delete(ov.Mapkey) break } + /* process post overflow parser nodes */ event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes) if err != nil { return fmt.Errorf("postoverflow failed: %w", err) } - log.Info(*event.Overflow.Alert.Message) + log.Info(*ov.Alert.Message) // if the Alert is nil, it's to signal bucket is ready for GC, don't track this // dump after postoveflow processing to avoid missing whitelist info - if dumpStates && event.Overflow.Alert != nil { - if bucketOverflows == nil { - bucketOverflows = make([]pipeline.Event, 0) - } - + if dumpStates && ov.Alert != nil { bucketOverflows = append(bucketOverflows, event) } - if event.Overflow.Whitelisted { - log.Infof("[%s] is whitelisted, skip.", *event.Overflow.Alert.Message) + if ov.Whitelisted { + log.Infof("[%s] is whitelisted, skip.", *ov.Alert.Message) continue } - if event.Overflow.Reprocess { + if ov.Reprocess { log.Debugf("Overflow being reprocessed.") select { case input <- event: @@ -140,12 +142,13 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip log.Debugf("parsing is dead, skipping") } } + if dumpStates { continue } cacheMutex.Lock() - cache = append(cache, event.Overflow) + cache = append(cache, ov) cacheMutex.Unlock() } } diff --git a/cmd/crowdsec/parse.go b/cmd/crowdsec/parse.go index d6c4e199103..7b6b92baf54 100644 --- a/cmd/crowdsec/parse.go +++ b/cmd/crowdsec/parse.go @@ -11,48 +11,66 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) -func runParse(input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) error { +func parseEvent( + event pipeline.Event, + parserCTX parser.UnixParserCtx, + nodes []parser.Node, +) *pipeline.Event { + if !event.Process { + return nil + } + + // Application security engine is going to generate 2 events: + // - one that is treated as a log and can go to scenarios + // - another one that will go directly to LAPI + if event.Type == pipeline.APPSEC { + outputEventChan <- event + return nil + } + + if event.Line.Module == "" { + log.Errorf("empty event.Line.Module field, the acquisition module must set it ! : %+v", event.Line) + return nil + } + + metrics.GlobalParserHits.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc() + + startParsing := time.Now() + /* parse the log using magic */ + parsed, err := parser.Parse(parserCTX, event, nodes) + if err != nil { + log.Errorf("failed parsing: %v", err) + } + + elapsed := time.Since(startParsing) + metrics.GlobalParsingHistogram.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Observe(elapsed.Seconds()) + if !parsed.Process { + metrics.GlobalParserHitsKo.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc() + log.Debugf("Discarding line %+v", parsed) + return nil + } + + metrics.GlobalParserHitsOk.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc() + if parsed.Whitelisted { + log.Debugf("event whitelisted, discard") + return nil + } + + return &parsed +} + +func runParse(input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) { for { select { case <-parsersTomb.Dying(): log.Infof("Killing parser routines") - return nil + return case event := <-input: - if !event.Process { - continue - } - /*Application security engine is going to generate 2 events: - - one that is treated as a log and can go to scenarios - - another one that will go directly to LAPI*/ - if event.Type == pipeline.APPSEC { - outputEventChan <- event - continue - } - if event.Line.Module == "" { - log.Errorf("empty event.Line.Module field, the acquisition module must set it ! : %+v", event.Line) - continue - } - metrics.GlobalParserHits.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc() - - startParsing := time.Now() - /* parse the log using magic */ - parsed, err := parser.Parse(parserCTX, event, nodes) - if err != nil { - log.Errorf("failed parsing: %v", err) - } - elapsed := time.Since(startParsing) - metrics.GlobalParsingHistogram.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Observe(elapsed.Seconds()) - if !parsed.Process { - metrics.GlobalParserHitsKo.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc() - log.Debugf("Discarding line %+v", parsed) - continue - } - metrics.GlobalParserHitsOk.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc() - if parsed.Whitelisted { - log.Debugf("event whitelisted, discard") + parsed := parseEvent(event, parserCTX, nodes) + if parsed == nil { continue } - output <- parsed + output <- *parsed } } } diff --git a/cmd/crowdsec/pour.go b/cmd/crowdsec/pour.go index f48de97fc41..c21ad7c715f 100644 --- a/cmd/crowdsec/pour.go +++ b/cmd/crowdsec/pour.go @@ -13,25 +13,27 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) -func maybeGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.Config) error { +func shouldTriggerGC(count int) bool { + return count % 5000 == 0 +} + +func triggerGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.Config) error { log.Infof("%d existing buckets", leaky.LeakyRoutineCount) + // when in forensics mode, garbage collect buckets - if cConfig.Crowdsec.BucketsGCEnabled { - if parsed.MarshaledTime != "" { - z := &time.Time{} - if err := z.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil { - log.Warningf("Failed to parse time from event '%s': %s", parsed.MarshaledTime, err) - } else { - log.Warning("Starting buckets garbage collection ...") + if !cConfig.Crowdsec.BucketsGCEnabled || parsed.MarshaledTime == "" { + return nil + } - if err = leaky.GarbageCollectBuckets(*z, buckets); err != nil { - return err - } - } - } + z := &time.Time{} + if err := z.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil { + log.Warningf("Failed to parse time from event '%s': %s", parsed.MarshaledTime, err) + return nil } - return nil + log.Warning("Starting buckets garbage collection...") + + return leaky.GarbageCollectBuckets(*z, buckets) } func runPour(input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) error { @@ -47,8 +49,8 @@ func runPour(input chan pipeline.Event, holders []leaky.BucketFactory, buckets * startTime := time.Now() count++ - if count%5000 == 0 { - if err := maybeGC(parsed, buckets, cConfig); err != nil { + if shouldTriggerGC(count) { + if err := triggerGC(parsed, buckets, cConfig); err != nil { return fmt.Errorf("failed to start bucket GC: %w", err) } } diff --git a/cmd/crowdsec/run_in_svc.go b/cmd/crowdsec/run_in_svc.go index 996dc3a897a..c637e5899af 100644 --- a/cmd/crowdsec/run_in_svc.go +++ b/cmd/crowdsec/run_in_svc.go @@ -16,13 +16,14 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/database" "github.com/crowdsecurity/crowdsec/pkg/fflag" + "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) func isWindowsService() (bool, error) { return false, nil } -func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error { +func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, logLines chan pipeline.Event) error { defer trace.CatchPanic("crowdsec/StartRunSvc") // Always try to stop CPU profiling to avoid passing flags around @@ -63,5 +64,5 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error { }() } - return Serve(ctx, cConfig, agentReady) + return Serve(ctx, cConfig, agentReady, logLines) } diff --git a/cmd/crowdsec/run_in_svc_windows.go b/cmd/crowdsec/run_in_svc_windows.go index 9df909483d2..4c581e72cfd 100644 --- a/cmd/crowdsec/run_in_svc_windows.go +++ b/cmd/crowdsec/run_in_svc_windows.go @@ -15,13 +15,14 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/database" "github.com/crowdsecurity/crowdsec/pkg/fflag" + "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) func isWindowsService() (bool, error) { return svc.IsWindowsService() } -func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error { +func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, logLines chan pipeline.Event) error { const svcName = "CrowdSec" const svcDescription = "Crowdsec IPS/IDS" @@ -61,7 +62,7 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error { return fmt.Errorf("failed to %s %s: %w", flags.WinSvc, svcName, err) } case "": - return WindowsRun(ctx, cConfig) + return WindowsRun(ctx, cConfig, logLines) default: return fmt.Errorf("Invalid value for winsvc parameter: %s", flags.WinSvc) } @@ -69,7 +70,7 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error { return nil } -func WindowsRun(ctx context.Context, cConfig *csconfig.Config) error { +func WindowsRun(ctx context.Context, cConfig *csconfig.Config, logLines chan pipeline.Event) error { if fflag.PProfBlockProfile.IsEnabled() { runtime.SetBlockProfileRate(1) runtime.SetMutexProfileFraction(1) @@ -96,5 +97,5 @@ func WindowsRun(ctx context.Context, cConfig *csconfig.Config) error { registerPrometheus(cConfig.Prometheus) go servePrometheus(cConfig.Prometheus, dbClient, agentReady) } - return Serve(ctx, cConfig, agentReady) + return Serve(ctx, cConfig, agentReady, logLines) } diff --git a/cmd/crowdsec/serve.go b/cmd/crowdsec/serve.go index 1fc9ab366ab..3c4cd24c50f 100644 --- a/cmd/crowdsec/serve.go +++ b/cmd/crowdsec/serve.go @@ -24,7 +24,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) -func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) { +func reloadHandler(ctx context.Context, _ os.Signal, logLines chan pipeline.Event) (*csconfig.Config, error) { // re-initialize tombs acquisTomb = tomb.Tomb{} parsersTomb = tomb.Tomb{} @@ -79,7 +79,7 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) { } agentReady := make(chan bool, 1) - serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady) + serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, logLines) } log.Info("Reload is finished") @@ -87,7 +87,7 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) { return cConfig, nil } -func ShutdownCrowdsecRoutines() error { +func ShutdownCrowdsecRoutines(logLines chan pipeline.Event) error { var reterr error log.Debugf("Shutting down crowdsec sub-routines") @@ -95,7 +95,7 @@ func ShutdownCrowdsecRoutines() error { if len(dataSources) > 0 { acquisTomb.Kill(nil) log.Debugf("waiting for acquisition to finish") - drainChan(inputLineChan) + drainChan(logLines) if err := acquisTomb.Wait(); err != nil { log.Warningf("Acquisition returned error : %s", err) @@ -222,7 +222,25 @@ func drainChan(c chan pipeline.Event) { } } -func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { +func unregisterWatcher(ctx context.Context, cConfig *csconfig.Config) (bool, error) { + if cConfig.API == nil || cConfig.API.Client == nil || !cConfig.API.Client.UnregisterOnExit { + return false, nil + } + + lapiClient, err := apiclient.GetLAPIClient() + if err != nil { + return false, err + } + + _, err = lapiClient.Auth.UnregisterWatcher(ctx) + if err != nil { + return false, err + } + + return true, nil +} + +func HandleSignals(ctx context.Context, cConfig *csconfig.Config, logLines chan pipeline.Event) error { var ( newConfig *csconfig.Config err error @@ -246,7 +264,6 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { go func() { defer trace.CatchPanic("crowdsec/HandleSignals") - Loop: for { s := <-signalChan switch s { @@ -256,14 +273,12 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { if err = shutdown(s, cConfig); err != nil { exitChan <- fmt.Errorf("failed shutdown: %w", err) - - break Loop + return } - if newConfig, err = reloadHandler(ctx, s); err != nil { + if newConfig, err = reloadHandler(ctx, s, logLines); err != nil { exitChan <- fmt.Errorf("reload handler failure: %w", err) - - break Loop + return } if newConfig != nil { @@ -275,8 +290,7 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { if err = shutdown(s, cConfig); err != nil { exitChan <- fmt.Errorf("failed shutdown: %w", err) - - break Loop + return } exitChan <- nil @@ -289,26 +303,17 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { log.Warning("Crowdsec service shutting down") } - if cConfig.API != nil && cConfig.API.Client != nil && cConfig.API.Client.UnregisterOnExit { - log.Warning("Unregistering watcher") - - lapiClient, err := apiclient.GetLAPIClient() - if err != nil { - return err - } - - _, err = lapiClient.Auth.UnregisterWatcher(ctx) - if err != nil { - return fmt.Errorf("failed to unregister watcher: %w", err) + if ok, werr := unregisterWatcher(ctx, cConfig); werr != nil { + log.WithError(werr).Warning("unregistering watcher") + if ok { + log.Warning("Watcher unregistered") } - - log.Warning("Watcher unregistered") } return err } -func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool) error { +func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool, logLines chan pipeline.Event) error { acquisTomb = tomb.Tomb{} parsersTomb = tomb.Tomb{} bucketsTomb = tomb.Tomb{} @@ -384,7 +389,7 @@ func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool) // if it's just linting, we're done if !flags.TestMode { - serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady) + serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, logLines) } else { agentReady <- true } @@ -407,7 +412,7 @@ func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool) if cConfig.Common != nil && !flags.haveTimeMachine() && !isWindowsSvc { _ = csdaemon.Notify(csdaemon.Ready, log.StandardLogger()) // wait for signals - return HandleSignals(ctx, cConfig) + return HandleSignals(ctx, cConfig, logLines) } waitChans := make([]<-chan struct{}, 0) diff --git a/cmd/crowdsec/win_service.go b/cmd/crowdsec/win_service.go index 1e63e5c7626..8342638f16a 100644 --- a/cmd/crowdsec/win_service.go +++ b/cmd/crowdsec/win_service.go @@ -31,7 +31,6 @@ func (m *crowdsec_winservice) Execute(args []string, r <-chan svc.ChangeRequest, changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted} go func() { - loop: for { select { case <-tick: @@ -47,7 +46,7 @@ func (m *crowdsec_winservice) Execute(args []string, r <-chan svc.ChangeRequest, log.Errorf("Error while shutting down: %s", err) // don't return, we still want to notify windows that we are stopped ? } - break loop + return default: log.Errorf("unexpected control request #%d", c) } diff --git a/cmd/crowdsec/win_service_manage.go b/cmd/crowdsec/win_service_manage.go index 4e31dc019af..be3556cba51 100644 --- a/cmd/crowdsec/win_service_manage.go +++ b/cmd/crowdsec/win_service_manage.go @@ -10,8 +10,6 @@ import ( "fmt" "time" - //"time" - "golang.org/x/sys/windows/svc" "golang.org/x/sys/windows/svc/mgr" ) diff --git a/pkg/parser/runtime.go b/pkg/parser/runtime.go index cfe71b512ab..603c90b283e 100644 --- a/pkg/parser/runtime.go +++ b/pkg/parser/runtime.go @@ -242,10 +242,7 @@ func stageidx(stage string, stages []string) int { return -1 } -var ( - ParseDump bool - DumpFolder string -) +var ParseDump bool var ( StageParseCache dumps.ParserResults = make(dumps.ParserResults)