Skip to content

Commit 9ee3a13

Browse files
authored
pkg/leakybucket: replace global variables with injected StateDumper (#4197)
* pkg/leakybucket: replace global variables with injected collector * replace globals StageParse* with injected collector * remove global bucketOverflows * group collectors in a StateDumper{}
1 parent c0709b2 commit 9ee3a13

File tree

19 files changed

+317
-116
lines changed

19 files changed

+317
-116
lines changed

cmd/crowdsec-cli/cliexplain/explain.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ func (cli *cliExplain) run(ctx context.Context) error {
135135

136136
defer func() {
137137
if cli.flags.noClean {
138+
fmt.Fprintf(os.Stdout, "Not removing dump directory: %s\n", dir)
138139
return
139140
}
140141

cmd/crowdsec/crowdsec.go

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"os"
76
"strconv"
87
"time"
98

@@ -19,6 +18,7 @@ import (
1918
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
2019
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
2120
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
21+
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
2222
"github.com/crowdsecurity/crowdsec/pkg/metrics"
2323
"github.com/crowdsecurity/crowdsec/pkg/parser"
2424
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
@@ -70,23 +70,23 @@ func initCrowdsec(ctx context.Context, cConfig *csconfig.Config, hub *cwhub.Hub,
7070
return csParsers, datasources, nil
7171
}
7272

73-
func startParserRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers) {
73+
func startParserRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers, stageCollector *parser.StageParseCollector) {
7474
for idx := range cConfig.Crowdsec.ParserRoutinesCount {
7575
log.WithField("idx", idx).Info("Starting parser routine")
7676
g.Go(func() error {
7777
defer trace.CatchPanic("crowdsec/runParse/"+strconv.Itoa(idx))
78-
runParse(ctx, logLines, inEvents, *parsers.Ctx, parsers.Nodes)
78+
runParse(ctx, logLines, inEvents, *parsers.Ctx, parsers.Nodes, stageCollector)
7979
return nil
8080
})
8181
}
8282
}
8383

84-
func startBucketRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config) {
84+
func startBucketRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, pourCollector *leakybucket.PourCollector) {
8585
for idx := range cConfig.Crowdsec.BucketsRoutinesCount {
8686
log.WithField("idx", idx).Info("Starting bucket routine")
8787
g.Go(func() error {
8888
defer trace.CatchPanic("crowdsec/runPour/"+strconv.Itoa(idx))
89-
runPour(ctx, inEvents, holders, buckets, cConfig)
89+
runPour(ctx, inEvents, holders, buckets, cConfig, pourCollector)
9090
return nil
9191
})
9292
}
@@ -97,12 +97,12 @@ func startHeartBeat(ctx context.Context, _ *csconfig.Config, apiClient *apiclien
9797
apiClient.HeartBeat.StartHeartBeat(ctx)
9898
}
9999

100-
func startOutputRoutines(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, apiClient *apiclient.ApiClient) {
100+
func startOutputRoutines(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, apiClient *apiclient.ApiClient, stageCollector *parser.StageParseCollector, bucketOverflows []pipeline.Event) {
101101
for idx := range cConfig.Crowdsec.OutputRoutinesCount {
102102
log.WithField("idx", idx).Info("Starting output routine")
103103
outputsTomb.Go(func() error {
104104
defer trace.CatchPanic("crowdsec/runOutput/"+strconv.Itoa(idx))
105-
return runOutput(ctx, inEvents, outEvents, buckets, *parsers.PovfwCtx, parsers.Povfwnodes, apiClient)
105+
return runOutput(ctx, inEvents, outEvents, buckets, *parsers.PovfwCtx, parsers.Povfwnodes, apiClient, stageCollector, bucketOverflows)
106106
})
107107
}
108108
}
@@ -135,12 +135,20 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap
135135
}
136136

137137
// runCrowdsec starts the log processor service
138-
func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource) error {
138+
func runCrowdsec(
139+
ctx context.Context,
140+
g *errgroup.Group,
141+
cConfig *csconfig.Config,
142+
parsers *parser.Parsers,
143+
hub *cwhub.Hub,
144+
datasources []acquisitionTypes.DataSource,
145+
sd *StateDumper,
146+
) error {
139147
inEvents = make(chan pipeline.Event)
140148
logLines = make(chan pipeline.Event)
141149

142-
startParserRoutines(ctx, g, cConfig, parsers)
143-
startBucketRoutines(ctx, g, cConfig)
150+
startParserRoutines(ctx, g, cConfig, parsers, sd.StageParse)
151+
startBucketRoutines(ctx, g, cConfig, sd.Pour)
144152

145153
apiClient, err := apiclient.GetLAPIClient()
146154
if err != nil {
@@ -149,7 +157,7 @@ func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Confi
149157

150158
startHeartBeat(ctx, cConfig, apiClient)
151159

152-
startOutputRoutines(ctx, cConfig, parsers, apiClient)
160+
startOutputRoutines(ctx, cConfig, parsers, apiClient, sd.StageParse, sd.BucketOverflows)
153161

154162
if err := startLPMetrics(ctx, cConfig, apiClient, hub, datasources); err != nil {
155163
return err
@@ -165,7 +173,15 @@ func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Confi
165173
}
166174

167175
// serveCrowdsec wraps the log processor service
168-
func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource, agentReady chan bool) {
176+
func serveCrowdsec(
177+
ctx context.Context,
178+
parsers *parser.Parsers,
179+
cConfig *csconfig.Config,
180+
hub *cwhub.Hub,
181+
datasources []acquisitionTypes.DataSource,
182+
agentReady chan bool,
183+
sd *StateDumper,
184+
) {
169185
cctx, cancel := context.WithCancel(ctx)
170186

171187
var g errgroup.Group
@@ -180,7 +196,7 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf
180196

181197
agentReady <- true
182198

183-
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources); err != nil {
199+
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources, sd); err != nil {
184200
log.Fatalf("unable to start crowdsec routines: %s", err)
185201
}
186202
}()
@@ -197,15 +213,16 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf
197213
}
198214

199215
log.Debugf("everything is dead, return crowdsecTomb")
216+
log.Debugf("sd.DumpDir == %s", sd.DumpDir)
200217

201-
if flags.DumpDir != "" {
202-
log.Debugf("Dumping parser+bucket states to %s", flags.DumpDir)
218+
if sd.DumpDir != "" {
219+
log.Debugf("Dumping parser+bucket states to %s", sd.DumpDir)
203220

204-
if err := dumpAllStates(flags.DumpDir); err != nil {
221+
if err := sd.Dump(); err != nil {
205222
log.Fatal(err)
206223
}
207224

208-
os.Exit(0)
225+
return nil
209226
}
210227

211228
return nil

cmd/crowdsec/dump.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,66 @@ import (
77

88
"gopkg.in/yaml.v3"
99

10-
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
10+
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
1111
"github.com/crowdsecurity/crowdsec/pkg/parser"
12+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
1213
)
1314

14-
func dumpAllStates(dir string) error {
15+
type StateDumper struct {
16+
DumpDir string
17+
Pour *leakybucket.PourCollector
18+
StageParse *parser.StageParseCollector
19+
BucketOverflows []pipeline.Event
20+
}
21+
22+
func NewStateDumper(dumpDir string) *StateDumper {
23+
if dumpDir == "" {
24+
return &StateDumper{}
25+
}
26+
27+
return &StateDumper{
28+
DumpDir: dumpDir,
29+
Pour: leakybucket.NewPourCollector(),
30+
StageParse: parser.NewStageParseCollector(),
31+
}
32+
}
33+
34+
func (sd *StateDumper) Dump() error {
35+
dir := sd.DumpDir
36+
1537
err := os.MkdirAll(dir, 0o755)
1638
if err != nil {
1739
return err
1840
}
1941

20-
if err := dumpState(dir, "parser-dump.yaml", parser.StageParseCache); err != nil {
42+
if err := dumpCollector(dir, "parser-dump.yaml", sd.StageParse); err != nil {
2143
return fmt.Errorf("dumping parser state: %w", err)
2244
}
2345

24-
if err := dumpState(dir, "bucket-dump.yaml", bucketOverflows); err != nil {
46+
if err := dumpState(dir, "bucket-dump.yaml", sd.BucketOverflows); err != nil {
2547
return fmt.Errorf("dumping bucket overflow state: %w", err)
2648
}
2749

28-
if err := dumpState(dir, "bucketpour-dump.yaml", leaky.BucketPourCache); err != nil {
50+
if err := dumpCollector(dir, "bucketpour-dump.yaml", sd.Pour); err != nil {
2951
return fmt.Errorf("dumping bucket pour state: %w", err)
3052
}
3153

3254
return nil
3355
}
3456

57+
type YAMLDumper interface {
58+
DumpYAML() ([]byte, error)
59+
}
60+
61+
func dumpCollector(dir, name string, collector YAMLDumper) error {
62+
out, err := collector.DumpYAML()
63+
if err != nil {
64+
return err
65+
}
66+
67+
return os.WriteFile(filepath.Join(dir, name), out, 0o644)
68+
}
69+
3570
func dumpState(dir, name string, obj any) error {
3671
out, err := yaml.Marshal(obj)
3772
if err != nil {

cmd/crowdsec/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,9 @@ func run(flags Flags) error {
213213
return err
214214
}
215215

216-
return StartRunSvc(ctx, cConfig)
216+
sd := NewStateDumper(flags.DumpDir)
217+
218+
return StartRunSvc(ctx, cConfig, sd)
217219
}
218220

219221
func main() {

cmd/crowdsec/output.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ func PushAlerts(ctx context.Context, alerts []pipeline.RuntimeAlert, client *api
5252
return nil
5353
}
5454

55-
var bucketOverflows []pipeline.Event
56-
5755
func runOutput(
5856
ctx context.Context,
5957
input chan pipeline.Event,
@@ -62,6 +60,8 @@ func runOutput(
6260
postOverflowCTX parser.UnixParserCtx,
6361
postOverflowNodes []parser.Node,
6462
client *apiclient.ApiClient,
63+
stageCollector *parser.StageParseCollector,
64+
bucketOverflows []pipeline.Event,
6565
) error {
6666
var (
6767
cache []pipeline.RuntimeAlert
@@ -114,8 +114,7 @@ func runOutput(
114114
}
115115

116116
/* process post overflow parser nodes */
117-
dump := flags.DumpDir != ""
118-
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes, dump)
117+
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes, stageCollector)
119118
if err != nil {
120119
return fmt.Errorf("postoverflow failed: %w", err)
121120
}

cmd/crowdsec/parse.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func parseEvent(
1616
event pipeline.Event,
1717
parserCTX parser.UnixParserCtx,
1818
nodes []parser.Node,
19+
stageCollector *parser.StageParseCollector,
1920
) *pipeline.Event {
2021
if !event.Process {
2122
return nil
@@ -35,8 +36,7 @@ func parseEvent(
3536

3637
startParsing := time.Now()
3738
/* parse the log using magic */
38-
dump := flags.DumpDir != ""
39-
parsed, err := parser.Parse(parserCTX, event, nodes, dump)
39+
parsed, err := parser.Parse(parserCTX, event, nodes, stageCollector)
4040
if err != nil {
4141
log.Errorf("failed parsing: %v", err)
4242
}
@@ -56,14 +56,14 @@ func parseEvent(
5656
return &parsed
5757
}
5858

59-
func runParse(ctx context.Context, input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) {
59+
func runParse(ctx context.Context, input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node, stageCollector *parser.StageParseCollector) {
6060
for {
6161
select {
6262
case <-ctx.Done():
6363
log.Infof("Killing parser routines")
6464
return
6565
case event := <-input:
66-
parsed := parseEvent(event, parserCTX, nodes)
66+
parsed := parseEvent(event, parserCTX, nodes, stageCollector)
6767
if parsed == nil {
6868
continue
6969
}

cmd/crowdsec/pour.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func triggerGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.
3535
leaky.GarbageCollectBuckets(*z, buckets)
3636
}
3737

38-
func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) {
38+
func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config, pourCollector *leaky.PourCollector) {
3939
count := 0
4040

4141
for {
@@ -52,8 +52,7 @@ func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.Buc
5252
triggerGC(parsed, buckets, cConfig)
5353
}
5454
// here we can bucketify with parsed
55-
track := flags.DumpDir != ""
56-
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets, track)
55+
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets, pourCollector)
5756
if err != nil {
5857
log.Warningf("bucketify failed for: %v with %s", parsed, err)
5958
continue

cmd/crowdsec/run_in_svc.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ func isWindowsService() (bool, error) {
2222
return false, nil
2323
}
2424

25-
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
25+
func StartRunSvc(
26+
ctx context.Context,
27+
cConfig *csconfig.Config,
28+
sd *StateDumper,
29+
) error {
2630
defer trace.CatchPanic("crowdsec/StartRunSvc")
2731

2832
// Always try to stop CPU profiling to avoid passing flags around
@@ -63,5 +67,5 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
6367
}()
6468
}
6569

66-
return Serve(ctx, cConfig, agentReady)
70+
return Serve(ctx, cConfig, agentReady, sd)
6771
}

cmd/crowdsec/run_in_svc_windows.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func isWindowsService() (bool, error) {
2121
return svc.IsWindowsService()
2222
}
2323

24-
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
24+
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, sd *StateDumper) error {
2525
const svcName = "CrowdSec"
2626
const svcDescription = "Crowdsec IPS/IDS"
2727

@@ -36,7 +36,7 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
3636
return fmt.Errorf("failed to determine if we are running in windows service mode: %w", err)
3737
}
3838
if isRunninginService {
39-
return runService(svcName)
39+
return runService(svcName, sd)
4040
}
4141

4242
switch flags.WinSvc {
@@ -61,15 +61,15 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
6161
return fmt.Errorf("failed to %s %s: %w", flags.WinSvc, svcName, err)
6262
}
6363
case "":
64-
return WindowsRun(ctx, cConfig)
64+
return WindowsRun(ctx, cConfig, sd)
6565
default:
6666
return fmt.Errorf("Invalid value for winsvc parameter: %s", flags.WinSvc)
6767
}
6868

6969
return nil
7070
}
7171

72-
func WindowsRun(ctx context.Context, cConfig *csconfig.Config) error {
72+
func WindowsRun(ctx context.Context, cConfig *csconfig.Config, sd *StateDumper) error {
7373
if fflag.PProfBlockProfile.IsEnabled() {
7474
runtime.SetBlockProfileRate(1)
7575
runtime.SetMutexProfileFraction(1)
@@ -96,5 +96,5 @@ func WindowsRun(ctx context.Context, cConfig *csconfig.Config) error {
9696
registerPrometheus(cConfig.Prometheus)
9797
go servePrometheus(cConfig.Prometheus, dbClient, agentReady)
9898
}
99-
return Serve(ctx, cConfig, agentReady)
99+
return Serve(ctx, cConfig, agentReady, sd)
100100
}

0 commit comments

Comments
 (0)