diff --git a/pkg/csplugin/broker.go b/pkg/csplugin/broker.go index 48a1503291a..e3d537ec1b7 100644 --- a/pkg/csplugin/broker.go +++ b/pkg/csplugin/broker.go @@ -1,6 +1,7 @@ package csplugin import ( + "cmp" "context" "errors" "fmt" @@ -22,7 +23,6 @@ import ( "github.com/crowdsecurity/go-cs-lib/csstring" "github.com/crowdsecurity/go-cs-lib/ptr" - "github.com/crowdsecurity/go-cs-lib/slicetools" "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/models" @@ -153,7 +153,7 @@ func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) { threshold = 1 } - for _, chunk := range slicetools.Chunks(tmpAlerts, threshold) { + for chunk := range slices.Chunk(tmpAlerts, max(1, cmp.Or(threshold, len(tmpAlerts)))) { if err := pb.pushNotificationsToPlugin(ctx, pluginName, chunk); err != nil { log.WithField("plugin:", pluginName).Error(err) } diff --git a/pkg/database/alerts.go b/pkg/database/alerts.go index 63bf4a9f069..d808fa09893 100644 --- a/pkg/database/alerts.go +++ b/pkg/database/alerts.go @@ -1,9 +1,11 @@ package database import ( + "cmp" "context" "encoding/json" "fmt" + "slices" "sort" "strconv" "strings" @@ -14,7 +16,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/crowdsecurity/go-cs-lib/cstime" - "github.com/crowdsecurity/go-cs-lib/slicetools" "github.com/crowdsecurity/crowdsec/pkg/csnet" "github.com/crowdsecurity/crowdsec/pkg/database/ent" @@ -166,9 +167,7 @@ func (c *Client) CreateOrUpdateAlert(ctx context.Context, machineID string, aler decisions := []*ent.Decision{} - builderChunks := slicetools.Chunks(decisionBuilders, c.decisionBulkSize) - - for _, builderChunk := range builderChunks { + for builderChunk := range slices.Chunk(decisionBuilders, max(1, cmp.Or(c.decisionBulkSize, len(decisionBuilders)))) { decisionsCreateRet, err := c.Ent.Decision.CreateBulk(builderChunk...).Save(ctx) if err != nil { return "", fmt.Errorf("creating alert decisions: %w", err) @@ -179,9 +178,7 @@ func (c *Client) CreateOrUpdateAlert(ctx context.Context, machineID string, aler // now that we bulk created missing decisions, let's update the alert - decisionChunks := slicetools.Chunks(decisions, c.decisionBulkSize) - - for _, decisionChunk := range decisionChunks { + for decisionChunk := range slices.Chunk(decisions, max(1, cmp.Or(c.decisionBulkSize, len(decisions)))) { err = c.Ent.Alert.Update().Where(alert.UUID(alertItem.UUID)).AddDecisions(decisionChunk...).Exec(ctx) if err != nil { return "", fmt.Errorf("updating alert %s: %w", alertItem.UUID, err) @@ -329,9 +326,7 @@ func (c *Client) UpdateCommunityBlocklist(ctx context.Context, alertItem *models valueList = append(valueList, *decisionItem.Value) } - deleteChunks := slicetools.Chunks(valueList, c.decisionBulkSize) - - for _, deleteChunk := range deleteChunks { + for deleteChunk := range slices.Chunk(valueList, max(1, cmp.Or(c.decisionBulkSize, len(valueList)))) { // Deleting older decisions from capi deletedDecisions, err := txClient.Decision.Delete(). Where(decision.And( @@ -346,9 +341,7 @@ func (c *Client) UpdateCommunityBlocklist(ctx context.Context, alertItem *models deleted += deletedDecisions } - builderChunks := slicetools.Chunks(decisionBuilders, c.decisionBulkSize) - - for _, builderChunk := range builderChunks { + for builderChunk := range slices.Chunk(decisionBuilders, max(1, cmp.Or(c.decisionBulkSize, len(decisionBuilders)))) { insertedDecisions, err := txClient.Decision.CreateBulk(builderChunk...).Save(ctx) if err != nil { return 0, 0, 0, rollbackOnError(txClient, err, "bulk creating decisions") @@ -545,8 +538,7 @@ func buildMetaCreates(ctx context.Context, logger log.FieldLogger, client *ent.C func buildDecisions(ctx context.Context, logger log.FieldLogger, client *Client, alertItem *models.Alert, stopAtTime time.Time) ([]*ent.Decision, int, error) { decisions := []*ent.Decision{} - decisionChunks := slicetools.Chunks(alertItem.Decisions, client.decisionBulkSize) - for _, decisionChunk := range decisionChunks { + for decisionChunk := range slices.Chunk(alertItem.Decisions, max(1, cmp.Or(client.decisionBulkSize, len(alertItem.Decisions)))) { decisionRet, err := client.createDecisionChunk(ctx, *alertItem.Simulated, stopAtTime, decisionChunk) if err != nil { return nil, 0, fmt.Errorf("creating alert decisions: %w", err) @@ -601,9 +593,8 @@ func saveAlerts(ctx context.Context, c *Client, alertBuilders []*ent.AlertCreate ret[i] = strconv.Itoa(a.ID) d := alertDecisions[i] - decisionsChunk := slicetools.Chunks(d, c.decisionBulkSize) - for _, d2 := range decisionsChunk { + for d2 := range slices.Chunk(d, max(1, cmp.Or(c.decisionBulkSize, len(d)))) { if err := retryOnBusy(func() error { _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(ctx) return err @@ -720,10 +711,9 @@ func (c *Client) CreateAlert(ctx context.Context, machineID string, alertList [] c.Log.Debugf("writing %d items", len(alertList)) - alertChunks := slicetools.Chunks(alertList, alertCreateBulkSize) alertIDs := []string{} - for _, alertChunk := range alertChunks { + for alertChunk := range slices.Chunk(alertList, max(1, cmp.Or(alertCreateBulkSize, len(alertList)))) { ids, err := c.createAlertChunk(ctx, machineID, owner, alertChunk) if err != nil { return nil, fmt.Errorf("machine '%s': %w", machineID, err) diff --git a/pkg/database/decisions.go b/pkg/database/decisions.go index 286b2473212..8b213bc2464 100644 --- a/pkg/database/decisions.go +++ b/pkg/database/decisions.go @@ -1,8 +1,10 @@ package database import ( + "cmp" "context" "fmt" + "slices" "strconv" "strings" "time" @@ -10,8 +12,6 @@ import ( "entgo.io/ent/dialect/sql" "github.com/pkg/errors" - "github.com/crowdsecurity/go-cs-lib/slicetools" - "github.com/crowdsecurity/crowdsec/pkg/csnet" "github.com/crowdsecurity/crowdsec/pkg/database/ent" "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision" @@ -414,7 +414,7 @@ func (c *Client) ExpireDecisions(ctx context.Context, decisions []*ent.Decision) total := 0 - for _, chunk := range slicetools.Chunks(decisions, decisionDeleteBulkSize) { + for chunk := range slices.Chunk(decisions, max(1, cmp.Or(decisionDeleteBulkSize, len(decisions)))) { rows, err := c.ExpireDecisions(ctx, chunk) if err != nil { return total, err @@ -446,7 +446,7 @@ func (c *Client) DeleteDecisions(ctx context.Context, decisions []*ent.Decision) tot := 0 - for _, chunk := range slicetools.Chunks(decisions, decisionDeleteBulkSize) { + for chunk := range slices.Chunk(decisions, max(1, cmp.Or(decisionDeleteBulkSize, len(decisions)))) { rows, err := c.DeleteDecisions(ctx, chunk) if err != nil { return tot, err