Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/csplugin/broker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package csplugin

import (
"cmp"
"context"
"errors"
"fmt"
Expand All @@ -22,7 +23,6 @@ import (

"github.com/crowdsecurity/go-cs-lib/csstring"
"github.com/crowdsecurity/go-cs-lib/ptr"
"github.com/crowdsecurity/go-cs-lib/slicetools"

"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/models"
Expand Down Expand Up @@ -153,7 +153,7 @@ func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) {
threshold = 1
}

for _, chunk := range slicetools.Chunks(tmpAlerts, threshold) {
for chunk := range slices.Chunk(tmpAlerts, max(1, cmp.Or(threshold, len(tmpAlerts)))) {
if err := pb.pushNotificationsToPlugin(ctx, pluginName, chunk); err != nil {
log.WithField("plugin:", pluginName).Error(err)
}
Expand Down
28 changes: 9 additions & 19 deletions pkg/database/alerts.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package database

import (
"cmp"
"context"
"encoding/json"
"fmt"
"slices"
"sort"
"strconv"
"strings"
Expand All @@ -14,7 +16,6 @@ import (
log "github.com/sirupsen/logrus"

"github.com/crowdsecurity/go-cs-lib/cstime"
"github.com/crowdsecurity/go-cs-lib/slicetools"

"github.com/crowdsecurity/crowdsec/pkg/csnet"
"github.com/crowdsecurity/crowdsec/pkg/database/ent"
Expand Down Expand Up @@ -166,9 +167,7 @@ func (c *Client) CreateOrUpdateAlert(ctx context.Context, machineID string, aler

decisions := []*ent.Decision{}

builderChunks := slicetools.Chunks(decisionBuilders, c.decisionBulkSize)

for _, builderChunk := range builderChunks {
for builderChunk := range slices.Chunk(decisionBuilders, max(1, cmp.Or(c.decisionBulkSize, len(decisionBuilders)))) {
decisionsCreateRet, err := c.Ent.Decision.CreateBulk(builderChunk...).Save(ctx)
if err != nil {
return "", fmt.Errorf("creating alert decisions: %w", err)
Expand All @@ -179,9 +178,7 @@ func (c *Client) CreateOrUpdateAlert(ctx context.Context, machineID string, aler

// now that we bulk created missing decisions, let's update the alert

decisionChunks := slicetools.Chunks(decisions, c.decisionBulkSize)

for _, decisionChunk := range decisionChunks {
for decisionChunk := range slices.Chunk(decisions, max(1, cmp.Or(c.decisionBulkSize, len(decisions)))) {
err = c.Ent.Alert.Update().Where(alert.UUID(alertItem.UUID)).AddDecisions(decisionChunk...).Exec(ctx)
if err != nil {
return "", fmt.Errorf("updating alert %s: %w", alertItem.UUID, err)
Expand Down Expand Up @@ -329,9 +326,7 @@ func (c *Client) UpdateCommunityBlocklist(ctx context.Context, alertItem *models
valueList = append(valueList, *decisionItem.Value)
}

deleteChunks := slicetools.Chunks(valueList, c.decisionBulkSize)

for _, deleteChunk := range deleteChunks {
for deleteChunk := range slices.Chunk(valueList, max(1, cmp.Or(c.decisionBulkSize, len(valueList)))) {
// Deleting older decisions from capi
deletedDecisions, err := txClient.Decision.Delete().
Where(decision.And(
Expand All @@ -346,9 +341,7 @@ func (c *Client) UpdateCommunityBlocklist(ctx context.Context, alertItem *models
deleted += deletedDecisions
}

builderChunks := slicetools.Chunks(decisionBuilders, c.decisionBulkSize)

for _, builderChunk := range builderChunks {
for builderChunk := range slices.Chunk(decisionBuilders, max(1, cmp.Or(c.decisionBulkSize, len(decisionBuilders)))) {
insertedDecisions, err := txClient.Decision.CreateBulk(builderChunk...).Save(ctx)
if err != nil {
return 0, 0, 0, rollbackOnError(txClient, err, "bulk creating decisions")
Expand Down Expand Up @@ -545,8 +538,7 @@ func buildMetaCreates(ctx context.Context, logger log.FieldLogger, client *ent.C
func buildDecisions(ctx context.Context, logger log.FieldLogger, client *Client, alertItem *models.Alert, stopAtTime time.Time) ([]*ent.Decision, int, error) {
decisions := []*ent.Decision{}

decisionChunks := slicetools.Chunks(alertItem.Decisions, client.decisionBulkSize)
for _, decisionChunk := range decisionChunks {
for decisionChunk := range slices.Chunk(alertItem.Decisions, max(1, cmp.Or(client.decisionBulkSize, len(alertItem.Decisions)))) {
decisionRet, err := client.createDecisionChunk(ctx, *alertItem.Simulated, stopAtTime, decisionChunk)
if err != nil {
return nil, 0, fmt.Errorf("creating alert decisions: %w", err)
Expand Down Expand Up @@ -601,9 +593,8 @@ func saveAlerts(ctx context.Context, c *Client, alertBuilders []*ent.AlertCreate
ret[i] = strconv.Itoa(a.ID)

d := alertDecisions[i]
decisionsChunk := slicetools.Chunks(d, c.decisionBulkSize)

for _, d2 := range decisionsChunk {
for d2 := range slices.Chunk(d, max(1, cmp.Or(c.decisionBulkSize, len(d)))) {
if err := retryOnBusy(func() error {
_, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(ctx)
return err
Expand Down Expand Up @@ -720,10 +711,9 @@ func (c *Client) CreateAlert(ctx context.Context, machineID string, alertList []

c.Log.Debugf("writing %d items", len(alertList))

alertChunks := slicetools.Chunks(alertList, alertCreateBulkSize)
alertIDs := []string{}

for _, alertChunk := range alertChunks {
for alertChunk := range slices.Chunk(alertList, max(1, cmp.Or(alertCreateBulkSize, len(alertList)))) {
ids, err := c.createAlertChunk(ctx, machineID, owner, alertChunk)
if err != nil {
return nil, fmt.Errorf("machine '%s': %w", machineID, err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/database/decisions.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package database

import (
"cmp"
"context"
"fmt"
"slices"
"strconv"
"strings"
"time"

"entgo.io/ent/dialect/sql"
"github.com/pkg/errors"

"github.com/crowdsecurity/go-cs-lib/slicetools"

"github.com/crowdsecurity/crowdsec/pkg/csnet"
"github.com/crowdsecurity/crowdsec/pkg/database/ent"
"github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
Expand Down Expand Up @@ -414,7 +414,7 @@ func (c *Client) ExpireDecisions(ctx context.Context, decisions []*ent.Decision)

total := 0

for _, chunk := range slicetools.Chunks(decisions, decisionDeleteBulkSize) {
for chunk := range slices.Chunk(decisions, max(1, cmp.Or(decisionDeleteBulkSize, len(decisions)))) {
rows, err := c.ExpireDecisions(ctx, chunk)
if err != nil {
return total, err
Expand Down Expand Up @@ -446,7 +446,7 @@ func (c *Client) DeleteDecisions(ctx context.Context, decisions []*ent.Decision)

tot := 0

for _, chunk := range slicetools.Chunks(decisions, decisionDeleteBulkSize) {
for chunk := range slices.Chunk(decisions, max(1, cmp.Or(decisionDeleteBulkSize, len(decisions)))) {
rows, err := c.DeleteDecisions(ctx, chunk)
if err != nil {
return tot, err
Expand Down