Skip to content

Commit c386b7d

Browse files
committed
group collectors in a StateDumper{}
1 parent 2617098 commit c386b7d

File tree

8 files changed

+53
-65
lines changed

8 files changed

+53
-65
lines changed

cmd/crowdsec/crowdsec.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -142,15 +142,13 @@ func runCrowdsec(
142142
parsers *parser.Parsers,
143143
hub *cwhub.Hub,
144144
datasources []acquisitionTypes.DataSource,
145-
pourCollector *leakybucket.PourCollector,
146-
stageCollector *parser.StageParseCollector,
147-
bucketOverflows []pipeline.Event,
145+
sd *StateDumper,
148146
) error {
149147
inEvents = make(chan pipeline.Event)
150148
logLines = make(chan pipeline.Event)
151149

152-
startParserRoutines(ctx, g, cConfig, parsers, stageCollector)
153-
startBucketRoutines(ctx, g, cConfig, pourCollector)
150+
startParserRoutines(ctx, g, cConfig, parsers, sd.StageParse)
151+
startBucketRoutines(ctx, g, cConfig, sd.Pour)
154152

155153
apiClient, err := apiclient.GetLAPIClient()
156154
if err != nil {
@@ -159,7 +157,7 @@ func runCrowdsec(
159157

160158
startHeartBeat(ctx, cConfig, apiClient)
161159

162-
startOutputRoutines(ctx, cConfig, parsers, apiClient, stageCollector, bucketOverflows)
160+
startOutputRoutines(ctx, cConfig, parsers, apiClient, sd.StageParse, sd.BucketOverflows)
163161

164162
if err := startLPMetrics(ctx, cConfig, apiClient, hub, datasources); err != nil {
165163
return err
@@ -182,9 +180,7 @@ func serveCrowdsec(
182180
hub *cwhub.Hub,
183181
datasources []acquisitionTypes.DataSource,
184182
agentReady chan bool,
185-
pourCollector *leakybucket.PourCollector,
186-
stageCollector *parser.StageParseCollector,
187-
bucketOverflows []pipeline.Event,
183+
sd *StateDumper,
188184
) {
189185
cctx, cancel := context.WithCancel(ctx)
190186

@@ -200,7 +196,7 @@ func serveCrowdsec(
200196

201197
agentReady <- true
202198

203-
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources, pourCollector, stageCollector, bucketOverflows); err != nil {
199+
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources, sd); err != nil {
204200
log.Fatalf("unable to start crowdsec routines: %s", err)
205201
}
206202
}()
@@ -217,11 +213,12 @@ func serveCrowdsec(
217213
}
218214

219215
log.Debugf("everything is dead, return crowdsecTomb")
216+
log.Debugf("sd.DumpDir == %s", sd.DumpDir)
220217

221-
if flags.DumpDir != "" {
222-
log.Debugf("Dumping parser+bucket states to %s", flags.DumpDir)
218+
if sd.DumpDir != "" {
219+
log.Debugf("Dumping parser+bucket states to %s", sd.DumpDir)
223220

224-
if err := dumpAllStates(flags.DumpDir, pourCollector, stageCollector, bucketOverflows); err != nil {
221+
if err := sd.Dump(); err != nil {
225222
log.Fatal(err)
226223
}
227224

cmd/crowdsec/dump.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,42 @@ import (
1212
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
1313
)
1414

15-
func dumpAllStates(dir string, pourCollector *leakybucket.PourCollector, stageCollector *parser.StageParseCollector, bucketOverflows []pipeline.Event) error {
15+
type StateDumper struct {
16+
DumpDir string
17+
Pour *leakybucket.PourCollector
18+
StageParse *parser.StageParseCollector
19+
BucketOverflows []pipeline.Event
20+
}
21+
22+
func NewStateDumper(dumpDir string) *StateDumper {
23+
if dumpDir == "" {
24+
return &StateDumper{}
25+
}
26+
27+
return &StateDumper{
28+
DumpDir: dumpDir,
29+
Pour: leakybucket.NewPourCollector(),
30+
StageParse: parser.NewStageParseCollector(),
31+
}
32+
}
33+
34+
func (sd *StateDumper) Dump() error {
35+
dir := sd.DumpDir
36+
1637
err := os.MkdirAll(dir, 0o755)
1738
if err != nil {
1839
return err
1940
}
2041

21-
if err := dumpCollector(dir, "parser-dump.yaml", stageCollector); err != nil {
42+
if err := dumpCollector(dir, "parser-dump.yaml", sd.StageParse); err != nil {
2243
return fmt.Errorf("dumping parser state: %w", err)
2344
}
2445

25-
if err := dumpState(dir, "bucket-dump.yaml", bucketOverflows); err != nil {
46+
if err := dumpState(dir, "bucket-dump.yaml", sd.BucketOverflows); err != nil {
2647
return fmt.Errorf("dumping bucket overflow state: %w", err)
2748
}
2849

29-
if err := dumpCollector(dir, "bucketpour-dump.yaml", pourCollector); err != nil {
50+
if err := dumpCollector(dir, "bucketpour-dump.yaml", sd.Pour); err != nil {
3051
return fmt.Errorf("dumping bucket pour state: %w", err)
3152
}
3253

cmd/crowdsec/main.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/crowdsecurity/crowdsec/pkg/fflag"
2727
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
2828
"github.com/crowdsecurity/crowdsec/pkg/logging"
29-
"github.com/crowdsecurity/crowdsec/pkg/parser"
3029
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
3130
)
3231

@@ -214,18 +213,9 @@ func run(flags Flags) error {
214213
return err
215214
}
216215

217-
var (
218-
pourCollector *leakybucket.PourCollector
219-
stageCollector *parser.StageParseCollector
220-
bucketOverflows []pipeline.Event
221-
)
216+
sd := NewStateDumper(flags.DumpDir)
222217

223-
if flags.DumpDir != "" {
224-
pourCollector = leakybucket.NewPourCollector()
225-
stageCollector = parser.NewStageParseCollector()
226-
}
227-
228-
return StartRunSvc(ctx, cConfig, pourCollector, stageCollector, bucketOverflows)
218+
return StartRunSvc(ctx, cConfig, sd)
229219
}
230220

231221
func main() {

cmd/crowdsec/run_in_svc.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ import (
1616
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
1717
"github.com/crowdsecurity/crowdsec/pkg/database"
1818
"github.com/crowdsecurity/crowdsec/pkg/fflag"
19-
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
20-
"github.com/crowdsecurity/crowdsec/pkg/parser"
21-
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
2219
)
2320

2421
func isWindowsService() (bool, error) {
@@ -28,9 +25,7 @@ func isWindowsService() (bool, error) {
2825
func StartRunSvc(
2926
ctx context.Context,
3027
cConfig *csconfig.Config,
31-
pourCollector *leakybucket.PourCollector,
32-
stageCollector *parser.StageParseCollector,
33-
bucketOverflows []pipeline.Event,
28+
sd *StateDumper,
3429
) error {
3530
defer trace.CatchPanic("crowdsec/StartRunSvc")
3631

@@ -72,5 +67,5 @@ func StartRunSvc(
7267
}()
7368
}
7469

75-
return Serve(ctx, cConfig, agentReady, pourCollector, stageCollector, bucketOverflows)
70+
return Serve(ctx, cConfig, agentReady, sd)
7671
}

cmd/crowdsec/run_in_svc_windows.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@ import (
1515
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
1616
"github.com/crowdsecurity/crowdsec/pkg/database"
1717
"github.com/crowdsecurity/crowdsec/pkg/fflag"
18-
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
1918
)
2019

2120
func isWindowsService() (bool, error) {
2221
return svc.IsWindowsService()
2322
}
2423

25-
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, pourCollector *leakybucket.PourCollector) error {
24+
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, sd *StateDumper) error {
2625
const svcName = "CrowdSec"
2726
const svcDescription = "Crowdsec IPS/IDS"
2827

@@ -37,7 +36,7 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, pourCollector *l
3736
return fmt.Errorf("failed to determine if we are running in windows service mode: %w", err)
3837
}
3938
if isRunninginService {
40-
return runService(svcName, pourCollector)
39+
return runService(svcName, sd)
4140
}
4241

4342
switch flags.WinSvc {
@@ -62,15 +61,15 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, pourCollector *l
6261
return fmt.Errorf("failed to %s %s: %w", flags.WinSvc, svcName, err)
6362
}
6463
case "":
65-
return WindowsRun(ctx, cConfig, pourCollector)
64+
return WindowsRun(ctx, cConfig, sd)
6665
default:
6766
return fmt.Errorf("Invalid value for winsvc parameter: %s", flags.WinSvc)
6867
}
6968

7069
return nil
7170
}
7271

73-
func WindowsRun(ctx context.Context, cConfig *csconfig.Config, pourCollector *leakybucket.PourCollector) error {
72+
func WindowsRun(ctx context.Context, cConfig *csconfig.Config, sd *StateDumper) error {
7473
if fflag.PProfBlockProfile.IsEnabled() {
7574
runtime.SetBlockProfileRate(1)
7675
runtime.SetMutexProfileFraction(1)
@@ -97,5 +96,5 @@ func WindowsRun(ctx context.Context, cConfig *csconfig.Config, pourCollector *le
9796
registerPrometheus(cConfig.Prometheus)
9897
go servePrometheus(cConfig.Prometheus, dbClient, agentReady)
9998
}
100-
return Serve(ctx, cConfig, agentReady, pourCollector)
99+
return Serve(ctx, cConfig, agentReady, sd)
101100
}

cmd/crowdsec/serve.go

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ import (
2222
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
2323
"github.com/crowdsecurity/crowdsec/pkg/database"
2424
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
25-
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
26-
"github.com/crowdsecurity/crowdsec/pkg/parser"
2725
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
2826
)
2927

@@ -35,16 +33,7 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) {
3533
crowdsecTomb = tomb.Tomb{}
3634
pluginTomb = tomb.Tomb{}
3735

38-
var (
39-
pourCollector *leakybucket.PourCollector
40-
stageCollector *parser.StageParseCollector
41-
bucketOverflows []pipeline.Event
42-
)
43-
44-
if flags.DumpDir != "" {
45-
pourCollector = leakybucket.NewPourCollector()
46-
stageCollector = parser.NewStageParseCollector()
47-
}
36+
sd := NewStateDumper(flags.DumpDir)
4837

4938
cConfig, err := LoadConfig(flags.ConfigFile, flags.DisableAgent, flags.DisableAPI, false)
5039
if err != nil {
@@ -90,7 +79,7 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) {
9079
}
9180

9281
agentReady := make(chan bool, 1)
93-
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, pourCollector, stageCollector, bucketOverflows)
82+
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, sd)
9483
}
9584

9685
log.Info("Reload is finished")
@@ -318,9 +307,7 @@ func Serve(
318307
ctx context.Context,
319308
cConfig *csconfig.Config,
320309
agentReady chan bool,
321-
pourCollector *leakybucket.PourCollector,
322-
stageCollector *parser.StageParseCollector,
323-
bucketOverflows []pipeline.Event,
310+
sd *StateDumper,
324311
) error {
325312
acquisTomb = tomb.Tomb{}
326313
outputsTomb = tomb.Tomb{}
@@ -394,7 +381,7 @@ func Serve(
394381

395382
// if it's just linting, we're done
396383
if !flags.TestMode {
397-
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, pourCollector, stageCollector, bucketOverflows)
384+
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, sd)
398385
} else {
399386
agentReady <- true
400387
}

cmd/crowdsec/win_service.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@ import (
1818
"golang.org/x/sys/windows/svc/eventlog"
1919

2020
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
21-
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
2221
)
2322

2423
type crowdsec_winservice struct {
2524
config *csconfig.Config
26-
pourCollector *leakybucket.PourCollector
25+
stateDumper *StateDumper
2726
}
2827

2928
func (m *crowdsec_winservice) Execute(args []string, r <-chan svc.ChangeRequest, changes chan<- svc.Status) (bool, uint32) {
@@ -63,7 +62,7 @@ func (m *crowdsec_winservice) Execute(args []string, r <-chan svc.ChangeRequest,
6362
log.Fatal(err)
6463
}
6564

66-
err = WindowsRun(ctx, cConfig, m.pourCollector)
65+
err = WindowsRun(ctx, cConfig, m.stateDumper)
6766
changes <- svc.Status{State: svc.Stopped}
6867
if err != nil {
6968
log.Fatal(err)
@@ -72,7 +71,7 @@ func (m *crowdsec_winservice) Execute(args []string, r <-chan svc.ChangeRequest,
7271
return false, 0
7372
}
7473

75-
func runService(name string, pourCollector *leakybucket.PourCollector) error {
74+
func runService(name string, sd *StateDumper) error {
7675
// All the calls to logging before the logger is configured are pretty much useless, but we keep them for clarity
7776
err := eventlog.InstallAsEventCreate("CrowdSec", eventlog.Error|eventlog.Warning|eventlog.Info)
7877
if err != nil {
@@ -112,7 +111,7 @@ func runService(name string, pourCollector *leakybucket.PourCollector) error {
112111
}
113112

114113
log.Infof("starting %s service", name)
115-
winsvc := crowdsec_winservice{config: cConfig, pourCollector: pourCollector}
114+
winsvc := crowdsec_winservice{config: cConfig, stateDumper: sd}
116115

117116
if err := svc.Run(name, &winsvc); err != nil {
118117
return fmt.Errorf("%s service failed: %w", name, err)

pkg/parser/parsing_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ func testSubSet(testSet TestFile, pctx UnixParserCtx, nodes []Node) (bool, error
316316
var results []pipeline.Event
317317

318318
for _, in := range testSet.Lines {
319-
out, err := Parse(pctx, in, nodes, false)
319+
out, err := Parse(pctx, in, nodes, nil)
320320
if err != nil {
321321
log.Errorf("Failed to process %s : %v", spew.Sdump(in), err)
322322
}

0 commit comments

Comments
 (0)