Skip to content

Commit d024c41

Browse files
authored
pkg/types -> new imports pt 2 (#4013)
* pkg/parser * pkg/leakybucket * lint
1 parent d4a3276 commit d024c41

30 files changed

+196
-192
lines changed

.golangci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ linters:
285285
- name: line-length-limit
286286
arguments:
287287
# lower this after refactoring
288-
- 216
288+
- 219
289289
- name: max-control-nesting
290290
arguments:
291291
# lower this after refactoring

pkg/leakybucket/bayesian.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"github.com/expr-lang/expr/vm"
88

99
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
10-
"github.com/crowdsecurity/crowdsec/pkg/types"
10+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
1111
)
1212

1313
type RawBayesianCondition struct {
@@ -60,8 +60,8 @@ func (c *BayesianBucket) OnBucketInit(g *BucketFactory) error {
6060
return err
6161
}
6262

63-
func (c *BayesianBucket) AfterBucketPour(_ *BucketFactory) func(types.Event, *Leaky) *types.Event {
64-
return func(msg types.Event, l *Leaky) *types.Event {
63+
func (c *BayesianBucket) AfterBucketPour(_ *BucketFactory) func(pipeline.Event, *Leaky) *pipeline.Event {
64+
return func(msg pipeline.Event, l *Leaky) *pipeline.Event {
6565
c.posterior = c.prior
6666
l.logger.Debugf("starting bayesian evaluation with prior: %v", c.posterior)
6767

@@ -85,7 +85,7 @@ func (c *BayesianBucket) AfterBucketPour(_ *BucketFactory) func(types.Event, *Le
8585
}
8686
}
8787

88-
func (b *BayesianEvent) bayesianUpdate(c *BayesianBucket, msg types.Event, l *Leaky) error {
88+
func (b *BayesianEvent) bayesianUpdate(c *BayesianBucket, msg pipeline.Event, l *Leaky) error {
8989
var condition, ok bool
9090

9191
if b.conditionalFilterRuntime == nil {
@@ -148,7 +148,7 @@ func (b *BayesianEvent) compileCondition() (*vm.Program, error) {
148148
}
149149

150150
// don't hold lock during compile
151-
compiled, err := expr.Compile(name, exprhelpers.GetExprOptions(map[string]any{"queue": &types.Queue{}, "leaky": &Leaky{}, "evt": &types.Event{}})...)
151+
compiled, err := expr.Compile(name, exprhelpers.GetExprOptions(map[string]any{"queue": &pipeline.Queue{}, "leaky": &Leaky{}, "evt": &pipeline.Event{}})...)
152152
if err != nil {
153153
return nil, fmt.Errorf("bayesian condition compile error: %w", err)
154154
}

pkg/leakybucket/blackhole.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"fmt"
55
"time"
66

7-
"github.com/crowdsecurity/crowdsec/pkg/types"
7+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
88
)
99

1010
type HiddenKey struct {
@@ -30,8 +30,8 @@ func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) {
3030
}, nil
3131
}
3232

33-
func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue) {
34-
return func(leaky *Leaky, alert types.RuntimeAlert, queue *types.Queue) (types.RuntimeAlert, *types.Queue) {
33+
func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, pipeline.RuntimeAlert, *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
34+
return func(leaky *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
3535
var blackholed = false
3636
var tmp []HiddenKey
3737
// search if we are blackholed and refresh the slice
@@ -54,7 +54,7 @@ func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky,
5454

5555
if blackholed {
5656
leaky.logger.Tracef("Event is blackholed (%s)", leaky.First_ts)
57-
return types.RuntimeAlert{
57+
return pipeline.RuntimeAlert{
5858
Mapkey: leaky.Mapkey,
5959
}, nil
6060
}

pkg/leakybucket/bucket.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"github.com/crowdsecurity/go-cs-lib/trace"
1717

1818
"github.com/crowdsecurity/crowdsec/pkg/metrics"
19-
"github.com/crowdsecurity/crowdsec/pkg/types"
19+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
2020
)
2121

2222
// Leaky represents one instance of a bucket
@@ -27,13 +27,13 @@ type Leaky struct {
2727
Limiter rate.RateLimiter `json:"-"`
2828
SerializedState rate.Lstate
2929
// Queue is used to hold the cache of objects in the bucket, it is used to know 'how many' objects we have in buffer.
30-
Queue *types.Queue
30+
Queue *pipeline.Queue
3131
// Leaky buckets are receiving message through a chan
32-
In chan *types.Event `json:"-"`
32+
In chan *pipeline.Event `json:"-"`
3333
// Leaky buckets are pushing their overflows through a chan
34-
Out chan *types.Queue `json:"-"`
34+
Out chan *pipeline.Queue `json:"-"`
3535
// shared for all buckets (the idea is to kill this afterward)
36-
AllOut chan types.Event `json:"-"`
36+
AllOut chan pipeline.Event `json:"-"`
3737
// max capacity (for burst)
3838
Capacity int
3939
// CacheRatio is the number of elements that should be kept in memory (compared to capacity)
@@ -53,7 +53,7 @@ type Leaky struct {
5353
Leakspeed time.Duration
5454
BucketConfig *BucketFactory
5555
Duration time.Duration
56-
Pour func(*Leaky, types.Event) `json:"-"`
56+
Pour func(*Leaky, pipeline.Event) `json:"-"`
5757
// Profiling when set to true enables profiling of bucket
5858
Profiling bool
5959
timedOverflow bool
@@ -107,9 +107,9 @@ func FromFactory(bucketFactory BucketFactory) *Leaky {
107107
Name: bucketFactory.Name,
108108
Limiter: limiter,
109109
Uuid: seed.Generate(),
110-
Queue: types.NewQueue(Qsize),
110+
Queue: pipeline.NewQueue(Qsize),
111111
CacheSize: bucketFactory.CacheSize,
112-
Out: make(chan *types.Queue, 1),
112+
Out: make(chan *pipeline.Queue, 1),
113113
Suicide: make(chan bool, 1),
114114
AllOut: bucketFactory.ret,
115115
Capacity: bucketFactory.Capacity,
@@ -118,7 +118,7 @@ func FromFactory(bucketFactory BucketFactory) *Leaky {
118118
Pour: Pour,
119119
Reprocess: bucketFactory.Reprocess,
120120
Profiling: bucketFactory.Profiling,
121-
Mode: types.LIVE,
121+
Mode: pipeline.LIVE,
122122
scopeType: bucketFactory.ScopeType,
123123
scenarioVersion: bucketFactory.ScenarioVersion,
124124
hash: bucketFactory.hash,
@@ -249,19 +249,19 @@ func LeakRoutine(leaky *Leaky) error {
249249
close(leaky.Signal)
250250
metrics.BucketsCanceled.With(prometheus.Labels{"name": leaky.Name}).Inc()
251251
leaky.logger.Debugf("Suicide triggered")
252-
leaky.AllOut <- types.Event{Type: types.OVFLW, Overflow: types.RuntimeAlert{Mapkey: leaky.Mapkey}}
252+
leaky.AllOut <- pipeline.Event{Type: pipeline.OVFLW, Overflow: pipeline.RuntimeAlert{Mapkey: leaky.Mapkey}}
253253
leaky.logger.Tracef("Returning from leaky routine.")
254254
return nil
255255
/*we underflow or reach bucket deadline (timers)*/
256256
case <-durationTickerChan:
257257
var (
258-
alert types.RuntimeAlert
258+
alert pipeline.RuntimeAlert
259259
err error
260260
)
261261
leaky.Ovflw_ts = time.Now().UTC()
262262
close(leaky.Signal)
263263
ofw := leaky.Queue
264-
alert = types.RuntimeAlert{Mapkey: leaky.Mapkey}
264+
alert = pipeline.RuntimeAlert{Mapkey: leaky.Mapkey}
265265

266266
if leaky.timedOverflow {
267267
metrics.BucketsOverflow.With(prometheus.Labels{"name": leaky.Name}).Inc()
@@ -285,10 +285,10 @@ func LeakRoutine(leaky *Leaky) error {
285285
}
286286
if leaky.logger.Level >= log.TraceLevel {
287287
/*don't sdump if it's not going to be printed, it's expensive*/
288-
leaky.logger.Tracef("Overflow event: %s", spew.Sdump(types.Event{Overflow: alert}))
288+
leaky.logger.Tracef("Overflow event: %s", spew.Sdump(pipeline.Event{Overflow: alert}))
289289
}
290290

291-
leaky.AllOut <- types.Event{Overflow: alert, Type: types.OVFLW}
291+
leaky.AllOut <- pipeline.Event{Overflow: alert, Type: pipeline.OVFLW}
292292
leaky.logger.Tracef("Returning from leaky routine.")
293293
return nil
294294
case <-leaky.tomb.Dying():
@@ -297,15 +297,15 @@ func LeakRoutine(leaky *Leaky) error {
297297
ofw := <-leaky.Out
298298
leaky.overflow(ofw)
299299
}
300-
leaky.AllOut <- types.Event{Type: types.OVFLW, Overflow: types.RuntimeAlert{Mapkey: leaky.Mapkey}}
300+
leaky.AllOut <- pipeline.Event{Type: pipeline.OVFLW, Overflow: pipeline.RuntimeAlert{Mapkey: leaky.Mapkey}}
301301
return nil
302302

303303
}
304304
End:
305305
}
306306
}
307307

308-
func Pour(leaky *Leaky, msg types.Event) {
308+
func Pour(leaky *Leaky, msg pipeline.Event) {
309309
leaky.wgDumpState.Wait()
310310
leaky.wgPour.Add(1)
311311
defer leaky.wgPour.Done()
@@ -326,7 +326,7 @@ func Pour(leaky *Leaky, msg types.Event) {
326326
}
327327
}
328328

329-
func (leaky *Leaky) overflow(ofw *types.Queue) {
329+
func (leaky *Leaky) overflow(ofw *pipeline.Queue) {
330330
close(leaky.Signal)
331331
alert, err := NewAlert(leaky, ofw)
332332
if err != nil {
@@ -348,5 +348,5 @@ func (leaky *Leaky) overflow(ofw *types.Queue) {
348348

349349
metrics.BucketsOverflow.With(prometheus.Labels{"name": leaky.Name}).Inc()
350350

351-
leaky.AllOut <- types.Event{Overflow: alert, Type: types.OVFLW, MarshaledTime: string(mt)}
351+
leaky.AllOut <- pipeline.Event{Overflow: alert, Type: pipeline.OVFLW, MarshaledTime: string(mt)}
352352
}

pkg/leakybucket/buckets_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ import (
2424
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
2525
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
2626
"github.com/crowdsecurity/crowdsec/pkg/parser"
27-
"github.com/crowdsecurity/crowdsec/pkg/types"
27+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
2828
)
2929

3030
type TestFile struct {
31-
Lines []types.Event `yaml:"lines,omitempty"`
32-
Results []types.Event `yaml:"results,omitempty"`
31+
Lines []pipeline.Event `yaml:"lines,omitempty"`
32+
Results []pipeline.Event `yaml:"results,omitempty"`
3333
}
3434

3535
func TestBucket(t *testing.T) {
@@ -174,8 +174,8 @@ func testOneBucket(t *testing.T, hub *cwhub.Hub, dir string, tomb *tomb.Tomb) er
174174
return nil
175175
}
176176

177-
func testFile(t *testing.T, file string, holders []BucketFactory, response chan types.Event, buckets *Buckets) bool {
178-
var results []types.Event
177+
func testFile(t *testing.T, file string, holders []BucketFactory, response chan pipeline.Event, buckets *Buckets) bool {
178+
var results []pipeline.Event
179179

180180
/* now we can load the test files */
181181
// process the yaml
@@ -211,7 +211,7 @@ func testFile(t *testing.T, file string, holders []BucketFactory, response chan
211211
latest_ts = ts
212212
}
213213

214-
in.ExpectMode = types.TIMEMACHINE
214+
in.ExpectMode = pipeline.TIMEMACHINE
215215
log.Infof("Buckets input : %s", spew.Sdump(in))
216216

217217
ok, err := PourItemToHolders(in, holders, buckets)

pkg/leakybucket/conditional.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/expr-lang/expr/vm"
99

1010
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
11-
"github.com/crowdsecurity/crowdsec/pkg/types"
11+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
1212
)
1313

1414
var (
@@ -33,7 +33,7 @@ func (c *ConditionalOverflow) OnBucketInit(g *BucketFactory) error {
3333
} else {
3434
conditionalExprCacheLock.Unlock()
3535
//release the lock during compile
36-
compiledExpr, err = expr.Compile(g.ConditionalOverflow, exprhelpers.GetExprOptions(map[string]interface{}{"queue": &types.Queue{}, "leaky": &Leaky{}, "evt": &types.Event{}})...)
36+
compiledExpr, err = expr.Compile(g.ConditionalOverflow, exprhelpers.GetExprOptions(map[string]interface{}{"queue": &pipeline.Queue{}, "leaky": &Leaky{}, "evt": &pipeline.Event{}})...)
3737
if err != nil {
3838
return fmt.Errorf("conditional compile error : %w", err)
3939
}
@@ -47,8 +47,8 @@ func (c *ConditionalOverflow) OnBucketInit(g *BucketFactory) error {
4747
return err
4848
}
4949

50-
func (c *ConditionalOverflow) AfterBucketPour(b *BucketFactory) func(types.Event, *Leaky) *types.Event {
51-
return func(msg types.Event, l *Leaky) *types.Event {
50+
func (c *ConditionalOverflow) AfterBucketPour(b *BucketFactory) func(pipeline.Event, *Leaky) *pipeline.Event {
51+
return func(msg pipeline.Event, l *Leaky) *pipeline.Event {
5252
var condition, ok bool
5353

5454
if c.ConditionalFilterRuntime != nil {

pkg/leakybucket/manager_load.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/crowdsecurity/crowdsec/pkg/enrichment"
2424
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
2525
"github.com/crowdsecurity/crowdsec/pkg/logging"
26+
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
2627
"github.com/crowdsecurity/crowdsec/pkg/types"
2728
)
2829

@@ -63,7 +64,7 @@ type BucketFactory struct {
6364
CancelOnFilter string `yaml:"cancel_on,omitempty"` // a filter that, if matched, kills the bucket
6465
leakspeed time.Duration // internal representation of `Leakspeed`
6566
duration time.Duration // internal representation of `Duration`
66-
ret chan types.Event // the bucket-specific output chan for overflows
67+
ret chan pipeline.Event // the bucket-specific output chan for overflows
6768
processors []Processor // processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.)
6869
ScenarioVersion string `yaml:"version,omitempty"`
6970
hash string
@@ -223,7 +224,7 @@ func compileScopeFilter(bucketFactory *BucketFactory) error {
223224
return errors.New("filter is mandatory for non-IP, non-Range scope")
224225
}
225226

226-
runTimeFilter, err := expr.Compile(bucketFactory.ScopeType.Filter, exprhelpers.GetExprOptions(map[string]any{"evt": &types.Event{}})...)
227+
runTimeFilter, err := expr.Compile(bucketFactory.ScopeType.Filter, exprhelpers.GetExprOptions(map[string]any{"evt": &pipeline.Event{}})...)
227228
if err != nil {
228229
return fmt.Errorf("error compiling the scope filter: %w", err)
229230
}
@@ -233,7 +234,7 @@ func compileScopeFilter(bucketFactory *BucketFactory) error {
233234
return nil
234235
}
235236

236-
func loadBucketFactoriesFromFile(item *cwhub.Item, hub *cwhub.Hub, buckets *Buckets, tomb *tomb.Tomb, response chan types.Event, orderEvent bool, simulationConfig csconfig.SimulationConfig) ([]BucketFactory, error) {
237+
func loadBucketFactoriesFromFile(item *cwhub.Item, hub *cwhub.Hub, buckets *Buckets, tomb *tomb.Tomb, response chan pipeline.Event, orderEvent bool, simulationConfig csconfig.SimulationConfig) ([]BucketFactory, error) {
237238
itemPath := item.State.LocalPath
238239

239240
// process the yaml
@@ -312,9 +313,9 @@ func loadBucketFactoriesFromFile(item *cwhub.Item, hub *cwhub.Hub, buckets *Buck
312313
return factories, nil
313314
}
314315

315-
func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, hub *cwhub.Hub, scenarios []*cwhub.Item, tomb *tomb.Tomb, buckets *Buckets, orderEvent bool) ([]BucketFactory, chan types.Event, error) {
316+
func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, hub *cwhub.Hub, scenarios []*cwhub.Item, tomb *tomb.Tomb, buckets *Buckets, orderEvent bool) ([]BucketFactory, chan pipeline.Event, error) {
316317
allFactories := []BucketFactory{}
317-
response := make(chan types.Event, 1)
318+
response := make(chan pipeline.Event, 1)
318319

319320
for _, item := range scenarios {
320321
log.Debugf("Loading '%s'", item.State.LocalPath)
@@ -377,13 +378,13 @@ func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error {
377378
return errors.New("bucket without filter directive")
378379
}
379380

380-
bucketFactory.RunTimeFilter, err = expr.Compile(bucketFactory.Filter, exprhelpers.GetExprOptions(map[string]any{"evt": &types.Event{}})...)
381+
bucketFactory.RunTimeFilter, err = expr.Compile(bucketFactory.Filter, exprhelpers.GetExprOptions(map[string]any{"evt": &pipeline.Event{}})...)
381382
if err != nil {
382383
return fmt.Errorf("invalid filter '%s' in %s: %w", bucketFactory.Filter, bucketFactory.Filename, err)
383384
}
384385

385386
if bucketFactory.GroupBy != "" {
386-
bucketFactory.RunTimeGroupBy, err = expr.Compile(bucketFactory.GroupBy, exprhelpers.GetExprOptions(map[string]any{"evt": &types.Event{}})...)
387+
bucketFactory.RunTimeGroupBy, err = expr.Compile(bucketFactory.GroupBy, exprhelpers.GetExprOptions(map[string]any{"evt": &pipeline.Event{}})...)
387388
if err != nil {
388389
return fmt.Errorf("invalid groupby '%s' in %s: %w", bucketFactory.GroupBy, bucketFactory.Filename, err)
389390
}
@@ -411,7 +412,7 @@ func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error {
411412
bucketFactory.logger.Tracef("Adding a non duplicate filter")
412413
bucketFactory.processors = append(bucketFactory.processors, &Uniq{})
413414
// we're compiling and discarding the expression to be able to detect it during loading
414-
_, err = expr.Compile(bucketFactory.Distinct, exprhelpers.GetExprOptions(map[string]any{"evt": &types.Event{}})...)
415+
_, err = expr.Compile(bucketFactory.Distinct, exprhelpers.GetExprOptions(map[string]any{"evt": &pipeline.Event{}})...)
415416
if err != nil {
416417
return fmt.Errorf("invalid distinct '%s' in %s: %w", bucketFactory.Distinct, bucketFactory.Filename, err)
417418
}
@@ -421,7 +422,7 @@ func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error {
421422
bucketFactory.logger.Tracef("Adding a cancel_on filter")
422423
bucketFactory.processors = append(bucketFactory.processors, &CancelOnFilter{})
423424
// we're compiling and discarding the expression to be able to detect it during loading
424-
_, err = expr.Compile(bucketFactory.CancelOnFilter, exprhelpers.GetExprOptions(map[string]any{"evt": &types.Event{}})...)
425+
_, err = expr.Compile(bucketFactory.CancelOnFilter, exprhelpers.GetExprOptions(map[string]any{"evt": &pipeline.Event{}})...)
425426
if err != nil {
426427
return fmt.Errorf("invalid cancel_on '%s' in %s: %w", bucketFactory.CancelOnFilter, bucketFactory.Filename, err)
427428
}
@@ -455,7 +456,7 @@ func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error {
455456
bucketFactory.logger.Tracef("Adding conditional overflow")
456457
bucketFactory.processors = append(bucketFactory.processors, &ConditionalOverflow{})
457458
// we're compiling and discarding the expression to be able to detect it during loading
458-
_, err = expr.Compile(bucketFactory.ConditionalOverflow, exprhelpers.GetExprOptions(map[string]any{"queue": &types.Queue{}, "leaky": &Leaky{}, "evt": &types.Event{}})...)
459+
_, err = expr.Compile(bucketFactory.ConditionalOverflow, exprhelpers.GetExprOptions(map[string]any{"queue": &pipeline.Queue{}, "leaky": &Leaky{}, "evt": &pipeline.Event{}})...)
459460
if err != nil {
460461
return fmt.Errorf("invalid condition '%s' in %s: %w", bucketFactory.ConditionalOverflow, bucketFactory.Filename, err)
461462
}

0 commit comments

Comments
 (0)