Skip to content

Commit 2617098

Browse files
committed
remove global bucketOverflows
1 parent 6a755e7 commit 2617098

File tree

6 files changed

+32
-14
lines changed

6 files changed

+32
-14
lines changed

cmd/crowdsec/crowdsec.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,12 @@ func startHeartBeat(ctx context.Context, _ *csconfig.Config, apiClient *apiclien
9797
apiClient.HeartBeat.StartHeartBeat(ctx)
9898
}
9999

100-
func startOutputRoutines(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, apiClient *apiclient.ApiClient, stageCollector *parser.StageParseCollector) {
100+
func startOutputRoutines(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, apiClient *apiclient.ApiClient, stageCollector *parser.StageParseCollector, bucketOverflows []pipeline.Event) {
101101
for idx := range cConfig.Crowdsec.OutputRoutinesCount {
102102
log.WithField("idx", idx).Info("Starting output routine")
103103
outputsTomb.Go(func() error {
104104
defer trace.CatchPanic("crowdsec/runOutput/"+strconv.Itoa(idx))
105-
return runOutput(ctx, inEvents, outEvents, buckets, *parsers.PovfwCtx, parsers.Povfwnodes, apiClient, stageCollector)
105+
return runOutput(ctx, inEvents, outEvents, buckets, *parsers.PovfwCtx, parsers.Povfwnodes, apiClient, stageCollector, bucketOverflows)
106106
})
107107
}
108108
}
@@ -144,6 +144,7 @@ func runCrowdsec(
144144
datasources []acquisitionTypes.DataSource,
145145
pourCollector *leakybucket.PourCollector,
146146
stageCollector *parser.StageParseCollector,
147+
bucketOverflows []pipeline.Event,
147148
) error {
148149
inEvents = make(chan pipeline.Event)
149150
logLines = make(chan pipeline.Event)
@@ -158,7 +159,7 @@ func runCrowdsec(
158159

159160
startHeartBeat(ctx, cConfig, apiClient)
160161

161-
startOutputRoutines(ctx, cConfig, parsers, apiClient, stageCollector)
162+
startOutputRoutines(ctx, cConfig, parsers, apiClient, stageCollector, bucketOverflows)
162163

163164
if err := startLPMetrics(ctx, cConfig, apiClient, hub, datasources); err != nil {
164165
return err
@@ -183,6 +184,7 @@ func serveCrowdsec(
183184
agentReady chan bool,
184185
pourCollector *leakybucket.PourCollector,
185186
stageCollector *parser.StageParseCollector,
187+
bucketOverflows []pipeline.Event,
186188
) {
187189
cctx, cancel := context.WithCancel(ctx)
188190

@@ -198,7 +200,7 @@ func serveCrowdsec(
198200

199201
agentReady <- true
200202

201-
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources, pourCollector, stageCollector); err != nil {
203+
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources, pourCollector, stageCollector, bucketOverflows); err != nil {
202204
log.Fatalf("unable to start crowdsec routines: %s", err)
203205
}
204206
}()
@@ -219,7 +221,7 @@ func serveCrowdsec(
219221
if flags.DumpDir != "" {
220222
log.Debugf("Dumping parser+bucket states to %s", flags.DumpDir)
221223

222-
if err := dumpAllStates(flags.DumpDir, pourCollector, stageCollector); err != nil {
224+
if err := dumpAllStates(flags.DumpDir, pourCollector, stageCollector, bucketOverflows); err != nil {
223225
log.Fatal(err)
224226
}
225227

cmd/crowdsec/dump.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ import (
99

1010
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
1111
"github.com/crowdsecurity/crowdsec/pkg/parser"
12+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
1213
)
1314

14-
func dumpAllStates(dir string, pourCollector *leakybucket.PourCollector, stageCollector *parser.StageParseCollector) error {
15+
func dumpAllStates(dir string, pourCollector *leakybucket.PourCollector, stageCollector *parser.StageParseCollector, bucketOverflows []pipeline.Event) error {
1516
err := os.MkdirAll(dir, 0o755)
1617
if err != nil {
1718
return err

cmd/crowdsec/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,15 @@ func run(flags Flags) error {
217217
var (
218218
pourCollector *leakybucket.PourCollector
219219
stageCollector *parser.StageParseCollector
220+
bucketOverflows []pipeline.Event
220221
)
221222

222223
if flags.DumpDir != "" {
223224
pourCollector = leakybucket.NewPourCollector()
224225
stageCollector = parser.NewStageParseCollector()
225226
}
226227

227-
return StartRunSvc(ctx, cConfig, pourCollector, stageCollector)
228+
return StartRunSvc(ctx, cConfig, pourCollector, stageCollector, bucketOverflows)
228229
}
229230

230231
func main() {

cmd/crowdsec/output.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ func PushAlerts(ctx context.Context, alerts []pipeline.RuntimeAlert, client *api
5252
return nil
5353
}
5454

55-
var bucketOverflows []pipeline.Event
56-
5755
func runOutput(
5856
ctx context.Context,
5957
input chan pipeline.Event,
@@ -63,6 +61,7 @@ func runOutput(
6361
postOverflowNodes []parser.Node,
6462
client *apiclient.ApiClient,
6563
stageCollector *parser.StageParseCollector,
64+
bucketOverflows []pipeline.Event,
6665
) error {
6766
var (
6867
cache []pipeline.RuntimeAlert

cmd/crowdsec/run_in_svc.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,20 @@ import (
1818
"github.com/crowdsecurity/crowdsec/pkg/fflag"
1919
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
2020
"github.com/crowdsecurity/crowdsec/pkg/parser"
21+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
2122
)
2223

2324
func isWindowsService() (bool, error) {
2425
return false, nil
2526
}
2627

27-
func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, pourCollector *leakybucket.PourCollector, stageCollector *parser.StageParseCollector) error {
28+
func StartRunSvc(
29+
ctx context.Context,
30+
cConfig *csconfig.Config,
31+
pourCollector *leakybucket.PourCollector,
32+
stageCollector *parser.StageParseCollector,
33+
bucketOverflows []pipeline.Event,
34+
) error {
2835
defer trace.CatchPanic("crowdsec/StartRunSvc")
2936

3037
// Always try to stop CPU profiling to avoid passing flags around
@@ -65,5 +72,5 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, pourCollector *l
6572
}()
6673
}
6774

68-
return Serve(ctx, cConfig, agentReady, pourCollector, stageCollector)
75+
return Serve(ctx, cConfig, agentReady, pourCollector, stageCollector, bucketOverflows)
6976
}

cmd/crowdsec/serve.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) {
3838
var (
3939
pourCollector *leakybucket.PourCollector
4040
stageCollector *parser.StageParseCollector
41+
bucketOverflows []pipeline.Event
4142
)
4243

4344
if flags.DumpDir != "" {
@@ -89,7 +90,7 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) {
8990
}
9091

9192
agentReady := make(chan bool, 1)
92-
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, pourCollector, stageCollector)
93+
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, pourCollector, stageCollector, bucketOverflows)
9394
}
9495

9596
log.Info("Reload is finished")
@@ -313,7 +314,14 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error {
313314
return err
314315
}
315316

316-
func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool, pourCollector *leakybucket.PourCollector, stageCollector *parser.StageParseCollector) error {
317+
func Serve(
318+
ctx context.Context,
319+
cConfig *csconfig.Config,
320+
agentReady chan bool,
321+
pourCollector *leakybucket.PourCollector,
322+
stageCollector *parser.StageParseCollector,
323+
bucketOverflows []pipeline.Event,
324+
) error {
317325
acquisTomb = tomb.Tomb{}
318326
outputsTomb = tomb.Tomb{}
319327
apiTomb = tomb.Tomb{}
@@ -386,7 +394,7 @@ func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool,
386394

387395
// if it's just linting, we're done
388396
if !flags.TestMode {
389-
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, pourCollector, stageCollector)
397+
serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, pourCollector, stageCollector, bucketOverflows)
390398
} else {
391399
agentReady <- true
392400
}

0 commit comments

Comments
 (0)