Skip to content

Commit c5eb1c7

Browse files
committed
pkg/leakybucket: replace global variables with injected collector
1 parent dab533f commit c5eb1c7

File tree

11 files changed

+108
-38
lines changed

11 files changed

+108
-38
lines changed

cmd/crowdsec-cli/cliexplain/explain.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ func (cli *cliExplain) run(ctx context.Context) error {
135135

136136
defer func() {
137137
if cli.flags.noClean {
138+
fmt.Fprintf(os.Stdout, "Not removing dump directory: %s\n", dir)
138139
return
139140
}
140141

cmd/crowdsec/crowdsec.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
2020
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
2121
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
22+
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
2223
"github.com/crowdsecurity/crowdsec/pkg/metrics"
2324
"github.com/crowdsecurity/crowdsec/pkg/parser"
2425
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
@@ -81,12 +82,12 @@ func startParserRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconf
8182
}
8283
}
8384

84-
func startBucketRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config) {
85+
func startBucketRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, pourCollector *leakybucket.PourCollector) {
8586
for idx := range cConfig.Crowdsec.BucketsRoutinesCount {
8687
log.WithField("idx", idx).Info("Starting bucket routine")
8788
g.Go(func() error {
8889
defer trace.CatchPanic("crowdsec/runPour/"+strconv.Itoa(idx))
89-
runPour(ctx, inEvents, holders, buckets, cConfig)
90+
runPour(ctx, inEvents, holders, buckets, cConfig, pourCollector)
9091
return nil
9192
})
9293
}
@@ -135,12 +136,12 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap
135136
}
136137

137138
// runCrowdsec starts the log processor service
138-
func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource) error {
139+
func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource, pourCollector *leakybucket.PourCollector) error {
139140
inEvents = make(chan pipeline.Event)
140141
logLines = make(chan pipeline.Event)
141142

142143
startParserRoutines(ctx, g, cConfig, parsers)
143-
startBucketRoutines(ctx, g, cConfig)
144+
startBucketRoutines(ctx, g, cConfig, pourCollector)
144145

145146
apiClient, err := apiclient.GetLAPIClient()
146147
if err != nil {
@@ -165,7 +166,7 @@ func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Confi
165166
}
166167

167168
// serveCrowdsec wraps the log processor service
168-
func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource, agentReady chan bool) {
169+
func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource, agentReady chan bool, pourCollector *leakybucket.PourCollector) {
169170
cctx, cancel := context.WithCancel(ctx)
170171

171172
var g errgroup.Group
@@ -180,7 +181,7 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf
180181

181182
agentReady <- true
182183

183-
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources); err != nil {
184+
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources, pourCollector); err != nil {
184185
log.Fatalf("unable to start crowdsec routines: %s", err)
185186
}
186187
}()
@@ -201,7 +202,7 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf
201202
if flags.DumpDir != "" {
202203
log.Debugf("Dumping parser+bucket states to %s", flags.DumpDir)
203204

204-
if err := dumpAllStates(flags.DumpDir); err != nil {
205+
if err := dumpAllStates(flags.DumpDir, pourCollector); err != nil {
205206
log.Fatal(err)
206207
}
207208

cmd/crowdsec/dump.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ import (
77

88
"gopkg.in/yaml.v3"
99

10-
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
10+
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
1111
"github.com/crowdsecurity/crowdsec/pkg/parser"
12+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
1213
)
1314

14-
func dumpAllStates(dir string) error {
15+
func dumpAllStates(dir string, pourCollector *leakybucket.PourCollector) error {
1516
err := os.MkdirAll(dir, 0o755)
1617
if err != nil {
1718
return err
@@ -25,13 +26,30 @@ func dumpAllStates(dir string) error {
2526
return fmt.Errorf("dumping bucket overflow state: %w", err)
2627
}
2728

28-
if err := dumpState(dir, "bucketpour-dump.yaml", leaky.BucketPourCache); err != nil {
29+
if err := dumpCollector(dir, "bucketpour-dump.yaml", pourCollector); err != nil {
2930
return fmt.Errorf("dumping bucket pour state: %w", err)
3031
}
3132

3233
return nil
3334
}
3435

36+
type Collector interface {
37+
Snapshot() map[string][]pipeline.Event
38+
}
39+
40+
func dumpCollector(dir, name string, collector Collector) error {
41+
if collector == nil {
42+
return nil
43+
}
44+
45+
out, err := yaml.Marshal(collector.Snapshot())
46+
if err != nil {
47+
return err
48+
}
49+
50+
return os.WriteFile(filepath.Join(dir, name), out, 0o644)
51+
}
52+
3553
func dumpState(dir, name string, obj any) error {
3654
out, err := yaml.Marshal(obj)
3755
if err != nil {

cmd/crowdsec/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,13 @@ func run(flags Flags) error {
213213
return err
214214
}
215215

216-
return StartRunSvc(ctx, cConfig)
216+
var pourCollector *leakybucket.PourCollector
217+
218+
if flags.DumpDir != "" {
219+
pourCollector = leakybucket.NewPourCollector()
220+
}
221+
222+
return StartRunSvc(ctx, cConfig, pourCollector)
217223
}
218224

219225
func main() {

cmd/crowdsec/pour.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func triggerGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.
3535
leaky.GarbageCollectBuckets(*z, buckets)
3636
}
3737

38-
func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) {
38+
func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config, pourCollector *leaky.PourCollector) {
3939
count := 0
4040

4141
for {
@@ -52,8 +52,7 @@ func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.Buc
5252
triggerGC(parsed, buckets, cConfig)
5353
}
5454
// here we can bucketify with parsed
55-
track := flags.DumpDir != ""
56-
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets, track)
55+
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets, pourCollector)
5756
if err != nil {
5857
log.Warningf("bucketify failed for: %v with %s", parsed, err)
5958
continue

cmd/crowdsec/run_in_svc.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ import (
1616
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
1717
"github.com/crowdsecurity/crowdsec/pkg/database"
1818
"github.com/crowdsecurity/crowdsec/pkg/fflag"
19+
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
1920
)
2021

2122
func isWindowsService() (bool, error) {
2223
return false, nil
2324
}
2425

25-
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
26+
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, pourCollector *leakybucket.PourCollector) error {
2627
defer trace.CatchPanic("crowdsec/StartRunSvc")
2728

2829
// Always try to stop CPU profiling to avoid passing flags around
@@ -63,5 +64,5 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error {
6364
}()
6465
}
6566

66-
return Serve(ctx, cConfig, agentReady)
67+
return Serve(ctx, cConfig, agentReady, pourCollector)
6768
}

cmd/crowdsec/serve.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
2323
"github.com/crowdsecurity/crowdsec/pkg/database"
2424
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
25+
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
2526
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
2627
)
2728

@@ -33,6 +34,12 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) {
3334
crowdsecTomb = tomb.Tomb{}
3435
pluginTomb = tomb.Tomb{}
3536

37+
var pourCollector *leakybucket.PourCollector
38+
39+
if flags.DumpDir != "" {
40+
pourCollector = leakybucket.NewPourCollector()
41+
}
42+
3643
cConfig, err := LoadConfig(flags.ConfigFile, flags.DisableAgent, flags.DisableAPI, false)
3744
if err != nil {
3845
return nil, err
@@ -77,7 +84,7 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) {
7784
}
7885

7986
agentReady := make(chan bool, 1)
80-
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady)
87+
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, pourCollector)
8188
}
8289

8390
log.Info("Reload is finished")
@@ -301,7 +308,7 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error {
301308
return err
302309
}
303310

304-
func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool) error {
311+
func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool, pourCollector *leakybucket.PourCollector) error {
305312
acquisTomb = tomb.Tomb{}
306313
outputsTomb = tomb.Tomb{}
307314
apiTomb = tomb.Tomb{}
@@ -374,7 +381,7 @@ func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool)
374381

375382
// if it's just linting, we're done
376383
if !flags.TestMode {
377-
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady)
384+
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, pourCollector)
378385
} else {
379386
agentReady <- true
380387
}

pkg/leakybucket/buckets_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func testFile(t *testing.T, file string, holders []BucketFactory, response chan
184184
in.ExpectMode = pipeline.TIMEMACHINE
185185
log.Infof("Buckets input : %s", spew.Sdump(in))
186186

187-
ok, err := PourItemToHolders(ctx, in, holders, buckets, false)
187+
ok, err := PourItemToHolders(ctx, in, holders, buckets, nil)
188188
if err != nil {
189189
t.Fatalf("Failed to pour : %s", err)
190190
}
@@ -207,7 +207,7 @@ POLL_AGAIN:
207207
results = append(results, ret)
208208
if ret.Overflow.Reprocess {
209209
log.Errorf("Overflow being reprocessed.")
210-
ok, err := PourItemToHolders(ctx, ret, holders, buckets, false)
210+
ok, err := PourItemToHolders(ctx, ret, holders, buckets, nil)
211211
if err != nil {
212212
t.Fatalf("Failed to pour : %s", err)
213213
}

pkg/leakybucket/collector.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package leakybucket
2+
3+
import (
4+
"sync"
5+
6+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
7+
)
8+
9+
type PourCollector struct {
10+
mu sync.Mutex
11+
m map[string][]pipeline.Event
12+
}
13+
14+
func NewPourCollector() *PourCollector {
15+
return &PourCollector{
16+
m: make(map[string][]pipeline.Event),
17+
}
18+
}
19+
20+
func (c *PourCollector) Add(key string, evt pipeline.Event) {
21+
if c == nil {
22+
return
23+
}
24+
c.mu.Lock()
25+
c.m[key] = append(c.m[key], evt)
26+
c.mu.Unlock()
27+
}
28+
29+
// Snapshot returns a shallow copy of the map and slices.
30+
// The caller must not mutate the events.
31+
func (c *PourCollector) Snapshot() map[string][]pipeline.Event {
32+
if c == nil {
33+
return nil
34+
}
35+
c.mu.Lock()
36+
defer c.mu.Unlock()
37+
38+
out := make(map[string][]pipeline.Event, len(c.m))
39+
for k, v := range c.m {
40+
tmp := make([]pipeline.Event, len(v))
41+
copy(tmp, v)
42+
out[k] = tmp
43+
}
44+
return out
45+
}

pkg/leakybucket/manager_run.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ import (
1919

2020
var (
2121
serialized map[string]Leaky
22-
BucketPourCache map[string][]pipeline.Event = make(map[string][]pipeline.Event)
23-
bucketPourMu sync.Mutex
2422
)
2523

2624
/*
@@ -74,7 +72,7 @@ func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) {
7472
}
7573
}
7674

77-
func PourItemToBucket(ctx context.Context, bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *pipeline.Event, track bool) (bool, error) {
75+
func PourItemToBucket(ctx context.Context, bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *pipeline.Event, collector *PourCollector) (bool, error) {
7876
var sent bool
7977
var buckey = bucket.Mapkey
8078
var err error
@@ -142,12 +140,9 @@ func PourItemToBucket(ctx context.Context, bucket *Leaky, holder BucketFactory,
142140
select {
143141
case bucket.In <- parsed:
144142
// holder.logger.Tracef("Successfully sent !")
145-
if track {
143+
if collector != nil {
146144
evt := deepcopy.Copy(*parsed).(pipeline.Event)
147-
148-
bucketPourMu.Lock()
149-
BucketPourCache[bucket.Name] = append(BucketPourCache[bucket.Name], evt)
150-
bucketPourMu.Unlock()
145+
collector.Add(bucket.Name, evt)
151146
}
152147
sent = true
153148
continue
@@ -203,15 +198,12 @@ func LoadOrStoreBucketFromHolder(ctx context.Context, partitionKey string, bucke
203198

204199
var orderEvent map[string]*sync.WaitGroup
205200

206-
func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []BucketFactory, buckets *Buckets, track bool) (bool, error) {
201+
func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []BucketFactory, buckets *Buckets, collector *PourCollector) (bool, error) {
207202
var ok, condition, poured bool
208203

209-
if track {
204+
if collector != nil {
210205
evt := deepcopy.Copy(parsed).(pipeline.Event)
211-
212-
bucketPourMu.Lock()
213-
BucketPourCache["OK"] = append(BucketPourCache["OK"], evt)
214-
bucketPourMu.Unlock()
206+
collector.Add("OK", evt)
215207
}
216208
// find the relevant holders (scenarios)
217209
for idx := range holders {
@@ -274,7 +266,7 @@ func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []Buc
274266
orderEvent[buckey].Add(1)
275267
}
276268

277-
ok, err := PourItemToBucket(ctx, bucket, holders[idx], buckets, &parsed, track)
269+
ok, err := PourItemToBucket(ctx, bucket, holders[idx], buckets, &parsed, collector)
278270

279271
if bucket.orderEvent {
280272
orderEvent[buckey].Wait()

0 commit comments

Comments
 (0)