Skip to content

Commit 74f9be2

Browse files
committed
pkg/leakybucket: replace global variables with injected collector
1 parent dab533f commit 74f9be2

File tree

13 files changed

+120
-47
lines changed

13 files changed

+120
-47
lines changed

cmd/crowdsec-cli/cliexplain/explain.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ func (cli *cliExplain) run(ctx context.Context) error {
135135

136136
defer func() {
137137
if cli.flags.noClean {
138+
fmt.Fprintf(os.Stdout, "Not removing dump directory: %s\n", dir)
138139
return
139140
}
140141

cmd/crowdsec/crowdsec.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
2020
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
2121
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
22+
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
2223
"github.com/crowdsecurity/crowdsec/pkg/metrics"
2324
"github.com/crowdsecurity/crowdsec/pkg/parser"
2425
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
@@ -81,12 +82,12 @@ func startParserRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconf
8182
}
8283
}
8384

84-
func startBucketRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config) {
85+
func startBucketRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, pourCollector *leakybucket.PourCollector) {
8586
for idx := range cConfig.Crowdsec.BucketsRoutinesCount {
8687
log.WithField("idx", idx).Info("Starting bucket routine")
8788
g.Go(func() error {
8889
defer trace.CatchPanic("crowdsec/runPour/"+strconv.Itoa(idx))
89-
runPour(ctx, inEvents, holders, buckets, cConfig)
90+
runPour(ctx, inEvents, holders, buckets, cConfig, pourCollector)
9091
return nil
9192
})
9293
}
@@ -135,12 +136,12 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap
135136
}
136137

137138
// runCrowdsec starts the log processor service
138-
func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource) error {
139+
func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource, pourCollector *leakybucket.PourCollector) error {
139140
inEvents = make(chan pipeline.Event)
140141
logLines = make(chan pipeline.Event)
141142

142143
startParserRoutines(ctx, g, cConfig, parsers)
143-
startBucketRoutines(ctx, g, cConfig)
144+
startBucketRoutines(ctx, g, cConfig, pourCollector)
144145

145146
apiClient, err := apiclient.GetLAPIClient()
146147
if err != nil {
@@ -165,7 +166,7 @@ func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Confi
165166
}
166167

167168
// serveCrowdsec wraps the log processor service
168-
func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource, agentReady chan bool) {
169+
func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource, agentReady chan bool, pourCollector *leakybucket.PourCollector) {
169170
cctx, cancel := context.WithCancel(ctx)
170171

171172
var g errgroup.Group
@@ -180,7 +181,7 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf
180181

181182
agentReady <- true
182183

183-
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources); err != nil {
184+
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources, pourCollector); err != nil {
184185
log.Fatalf("unable to start crowdsec routines: %s", err)
185186
}
186187
}()
@@ -201,7 +202,7 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf
201202
if flags.DumpDir != "" {
202203
log.Debugf("Dumping parser+bucket states to %s", flags.DumpDir)
203204

204-
if err := dumpAllStates(flags.DumpDir); err != nil {
205+
if err := dumpAllStates(flags.DumpDir, pourCollector); err != nil {
205206
log.Fatal(err)
206207
}
207208

cmd/crowdsec/dump.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ import (
77

88
"gopkg.in/yaml.v3"
99

10-
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
10+
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
1111
"github.com/crowdsecurity/crowdsec/pkg/parser"
12+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
1213
)
1314

14-
func dumpAllStates(dir string) error {
15+
func dumpAllStates(dir string, pourCollector *leakybucket.PourCollector) error {
1516
err := os.MkdirAll(dir, 0o755)
1617
if err != nil {
1718
return err
@@ -25,13 +26,30 @@ func dumpAllStates(dir string) error {
2526
return fmt.Errorf("dumping bucket overflow state: %w", err)
2627
}
2728

28-
if err := dumpState(dir, "bucketpour-dump.yaml", leaky.BucketPourCache); err != nil {
29+
if err := dumpCollector(dir, "bucketpour-dump.yaml", pourCollector); err != nil {
2930
return fmt.Errorf("dumping bucket pour state: %w", err)
3031
}
3132

3233
return nil
3334
}
3435

36+
type Collector interface {
37+
Snapshot() map[string][]pipeline.Event
38+
}
39+
40+
func dumpCollector(dir, name string, collector Collector) error {
41+
if collector == nil {
42+
return nil
43+
}
44+
45+
out, err := yaml.Marshal(collector.Snapshot())
46+
if err != nil {
47+
return err
48+
}
49+
50+
return os.WriteFile(filepath.Join(dir, name), out, 0o644)
51+
}
52+
3553
func dumpState(dir, name string, obj any) error {
3654
out, err := yaml.Marshal(obj)
3755
if err != nil {

cmd/crowdsec/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,13 @@ func run(flags Flags) error {
213213
return err
214214
}
215215

216-
return StartRunSvc(ctx, cConfig)
216+
var pourCollector *leakybucket.PourCollector
217+
218+
if flags.DumpDir != "" {
219+
pourCollector = leakybucket.NewPourCollector()
220+
}
221+
222+
return StartRunSvc(ctx, cConfig, pourCollector)
217223
}
218224

219225
func main() {

cmd/crowdsec/pour.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func triggerGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.
3535
leaky.GarbageCollectBuckets(*z, buckets)
3636
}
3737

38-
func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) {
38+
func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config, pourCollector *leaky.PourCollector) {
3939
count := 0
4040

4141
for {
@@ -52,8 +52,7 @@ func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.Buc
5252
triggerGC(parsed, buckets, cConfig)
5353
}
5454
// here we can bucketify with parsed
55-
track := flags.DumpDir != ""
56-
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets, track)
55+
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets, pourCollector)
5756
if err != nil {
5857
log.Warningf("bucketify failed for: %v with %s", parsed, err)
5958
continue

cmd/crowdsec/run_in_svc.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ import (
1616
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
1717
"github.com/crowdsecurity/crowdsec/pkg/database"
1818
"github.com/crowdsecurity/crowdsec/pkg/fflag"
19+
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
1920
)
2021

2122
func isWindowsService() (bool, error) {
2223
return false, nil
2324
}
2425

25-
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
26+
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, pourCollector *leakybucket.PourCollector) error {
2627
defer trace.CatchPanic("crowdsec/StartRunSvc")
2728

2829
// Always try to stop CPU profiling to avoid passing flags around
@@ -63,5 +64,5 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
6364
}()
6465
}
6566

66-
return Serve(ctx, cConfig, agentReady)
67+
return Serve(ctx, cConfig, agentReady, pourCollector)
6768
}

cmd/crowdsec/run_in_svc_windows.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@ import (
1515
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
1616
"github.com/crowdsecurity/crowdsec/pkg/database"
1717
"github.com/crowdsecurity/crowdsec/pkg/fflag"
18+
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
1819
)
1920

2021
func isWindowsService() (bool, error) {
2122
return svc.IsWindowsService()
2223
}
2324

24-
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
25+
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, pourCollector *leakybucket.PourCollector) error {
2526
const svcName = "CrowdSec"
2627
const svcDescription = "Crowdsec IPS/IDS"
2728

@@ -36,7 +37,7 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
3637
return fmt.Errorf("failed to determine if we are running in windows service mode: %w", err)
3738
}
3839
if isRunninginService {
39-
return runService(svcName)
40+
return runService(svcName, pourCollector)
4041
}
4142

4243
switch flags.WinSvc {
@@ -61,15 +62,15 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
6162
return fmt.Errorf("failed to %s %s: %w", flags.WinSvc, svcName, err)
6263
}
6364
case "":
64-
return WindowsRun(ctx, cConfig)
65+
return WindowsRun(ctx, cConfig, pourCollector)
6566
default:
6667
return fmt.Errorf("Invalid value for winsvc parameter: %s", flags.WinSvc)
6768
}
6869

6970
return nil
7071
}
7172

72-
func WindowsRun(ctx context.Context, cConfig *csconfig.Config) error {
73+
func WindowsRun(ctx context.Context, cConfig *csconfig.Config, pourCollector *leakybucket.PourCollector) error {
7374
if fflag.PProfBlockProfile.IsEnabled() {
7475
runtime.SetBlockProfileRate(1)
7576
runtime.SetMutexProfileFraction(1)
@@ -96,5 +97,5 @@ func WindowsRun(ctx context.Context, cConfig *csconfig.Config) error {
9697
registerPrometheus(cConfig.Prometheus)
9798
go servePrometheus(cConfig.Prometheus, dbClient, agentReady)
9899
}
99-
return Serve(ctx, cConfig, agentReady)
100+
return Serve(ctx, cConfig, agentReady, pourCollector)
100101
}

cmd/crowdsec/serve.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
2323
"github.com/crowdsecurity/crowdsec/pkg/database"
2424
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
25+
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
2526
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
2627
)
2728

@@ -33,6 +34,12 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) {
3334
crowdsecTomb = tomb.Tomb{}
3435
pluginTomb = tomb.Tomb{}
3536

37+
var pourCollector *leakybucket.PourCollector
38+
39+
if flags.DumpDir != "" {
40+
pourCollector = leakybucket.NewPourCollector()
41+
}
42+
3643
cConfig, err := LoadConfig(flags.ConfigFile, flags.DisableAgent, flags.DisableAPI, false)
3744
if err != nil {
3845
return nil, err
@@ -77,7 +84,7 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) {
7784
}
7885

7986
agentReady := make(chan bool, 1)
80-
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady)
87+
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, pourCollector)
8188
}
8289

8390
log.Info("Reload is finished")
@@ -301,7 +308,7 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error {
301308
return err
302309
}
303310

304-
func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool) error {
311+
func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool, pourCollector *leakybucket.PourCollector) error {
305312
acquisTomb = tomb.Tomb{}
306313
outputsTomb = tomb.Tomb{}
307314
apiTomb = tomb.Tomb{}
@@ -374,7 +381,7 @@ func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool)
374381

375382
// if it's just linting, we're done
376383
if !flags.TestMode {
377-
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady)
384+
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, pourCollector)
378385
} else {
379386
agentReady <- true
380387
}

cmd/crowdsec/win_service.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ import (
1818
"golang.org/x/sys/windows/svc/eventlog"
1919

2020
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
21+
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
2122
)
2223

2324
type crowdsec_winservice struct {
2425
config *csconfig.Config
26+
pourCollector *leakybucket.PourCollector
2527
}
2628

2729
func (m *crowdsec_winservice) Execute(args []string, r <-chan svc.ChangeRequest, changes chan<- svc.Status) (bool, uint32) {
@@ -61,7 +63,7 @@ func (m *crowdsec_winservice) Execute(args []string, r <-chan svc.ChangeRequest,
6163
log.Fatal(err)
6264
}
6365

64-
err = WindowsRun(ctx, cConfig)
66+
err = WindowsRun(ctx, cConfig, m.pourCollector)
6567
changes <- svc.Status{State: svc.Stopped}
6668
if err != nil {
6769
log.Fatal(err)
@@ -70,13 +72,13 @@ func (m *crowdsec_winservice) Execute(args []string, r <-chan svc.ChangeRequest,
7072
return false, 0
7173
}
7274

73-
func runService(name string) error {
75+
func runService(name string, pourCollector *leakybucket.PourCollector) error {
7476
// All the calls to logging before the logger is configured are pretty much useless, but we keep them for clarity
7577
err := eventlog.InstallAsEventCreate("CrowdSec", eventlog.Error|eventlog.Warning|eventlog.Info)
7678
if err != nil {
7779
if errno, ok := err.(syscall.Errno); ok { //nolint:errorlint
7880
if errno == windows.ERROR_ACCESS_DENIED {
79-
log.Warnf("Access denied when installing event source, running as non-admin ?")
81+
log.Warnf("Access denied when installing event source, running as non-admin?")
8082
} else {
8183
log.Warnf("Failed to install event log: %s (%d)", err, errno)
8284
}
@@ -110,7 +112,7 @@ func runService(name string) error {
110112
}
111113

112114
log.Infof("starting %s service", name)
113-
winsvc := crowdsec_winservice{config: cConfig}
115+
winsvc := crowdsec_winservice{config: cConfig, pourCollector: pourCollector}
114116

115117
if err := svc.Run(name, &winsvc); err != nil {
116118
return fmt.Errorf("%s service failed: %w", name, err)

pkg/leakybucket/buckets_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func testFile(t *testing.T, file string, holders []BucketFactory, response chan
184184
in.ExpectMode = pipeline.TIMEMACHINE
185185
log.Infof("Buckets input : %s", spew.Sdump(in))
186186

187-
ok, err := PourItemToHolders(ctx, in, holders, buckets, false)
187+
ok, err := PourItemToHolders(ctx, in, holders, buckets, nil)
188188
if err != nil {
189189
t.Fatalf("Failed to pour : %s", err)
190190
}
@@ -207,7 +207,7 @@ POLL_AGAIN:
207207
results = append(results, ret)
208208
if ret.Overflow.Reprocess {
209209
log.Errorf("Overflow being reprocessed.")
210-
ok, err := PourItemToHolders(ctx, ret, holders, buckets, false)
210+
ok, err := PourItemToHolders(ctx, ret, holders, buckets, nil)
211211
if err != nil {
212212
t.Fatalf("Failed to pour : %s", err)
213213
}

0 commit comments

Comments
 (0)