Skip to content

Commit c237087

Browse files
authored
refact pkg/database batching (#3906)
* Batch function * use Batch helper for decisions * use Batch helper for alerts / 1 * use Batch helper for community blocklist * use Batch helper for alerts / 2 * move Batch to go-cs-lib * papi: attach decisions to alert during creation
1 parent 6a73994 commit c237087

File tree

2 files changed

+99
-96
lines changed

2 files changed

+99
-96
lines changed

pkg/database/alerts.go

Lines changed: 46 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -163,33 +163,24 @@ func (c *Client) CreateOrUpdateAlert(ctx context.Context, machineID string, aler
163163
SetScope(*decisionItem.Scope).
164164
SetOrigin(*decisionItem.Origin).
165165
SetSimulated(*alertItem.Simulated).
166-
SetUUID(decisionItem.UUID)
166+
SetUUID(decisionItem.UUID).
167+
SetOwnerID(foundAlert.ID)
167168

168169
decisionBuilders = append(decisionBuilders, decisionBuilder)
169170
}
170171

171-
decisions := []*ent.Decision{}
172-
173-
builderChunks := slicetools.Chunks(decisionBuilders, c.decisionBulkSize)
174-
175-
for _, builderChunk := range builderChunks {
176-
decisionsCreateRet, err := c.Ent.Decision.CreateBulk(builderChunk...).Save(ctx)
177-
if err != nil {
178-
return "", fmt.Errorf("creating alert decisions: %w", err)
179-
}
180-
181-
decisions = append(decisions, decisionsCreateRet...)
182-
}
183-
184-
// now that we bulk created missing decisions, let's update the alert
185-
186-
decisionChunks := slicetools.Chunks(decisions, c.decisionBulkSize)
172+
// create missing decisions in batches
187173

188-
for _, decisionChunk := range decisionChunks {
189-
err = c.Ent.Alert.Update().Where(alert.UUID(alertItem.UUID)).AddDecisions(decisionChunk...).Exec(ctx)
174+
decisions := make([]*ent.Decision, 0, len(decisionBuilders))
175+
if err := slicetools.Batch(ctx, decisionBuilders, c.decisionBulkSize, func(ctx context.Context, b []*ent.DecisionCreate) error {
176+
ret, err := c.Ent.Decision.CreateBulk(b...).Save(ctx)
190177
if err != nil {
191-
return "", fmt.Errorf("updating alert %s: %w", alertItem.UUID, err)
178+
return fmt.Errorf("creating alert decisions: %w", err)
192179
}
180+
decisions = append(decisions, ret...)
181+
return nil
182+
}); err != nil {
183+
return "", err
193184
}
194185

195186
return "", nil
@@ -333,32 +324,35 @@ func (c *Client) UpdateCommunityBlocklist(ctx context.Context, alertItem *models
333324
valueList = append(valueList, *decisionItem.Value)
334325
}
335326

336-
deleteChunks := slicetools.Chunks(valueList, c.decisionBulkSize)
327+
// Delete older decisions from capi
337328

338-
for _, deleteChunk := range deleteChunks {
339-
// Deleting older decisions from capi
329+
if err := slicetools.Batch(ctx, valueList, c.decisionBulkSize, func(ctx context.Context, vals []string) error {
340330
deletedDecisions, err := txClient.Decision.Delete().
341331
Where(decision.And(
342332
decision.OriginEQ(decOrigin),
343333
decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
344-
decision.ValueIn(deleteChunk...),
345-
)).Exec(ctx)
334+
decision.ValueIn(vals...),
335+
)).Exec(ctx)
346336
if err != nil {
347-
return 0, 0, 0, rollbackOnError(txClient, err, "deleting older community blocklist decisions")
337+
return err
348338
}
349-
350339
deleted += deletedDecisions
340+
return nil
341+
}); err != nil {
342+
return 0, 0, 0, rollbackOnError(txClient, err, "deleting older community blocklist decisions")
351343
}
352344

353-
builderChunks := slicetools.Chunks(decisionBuilders, c.decisionBulkSize)
345+
// Insert new decisions
354346

355-
for _, builderChunk := range builderChunks {
356-
insertedDecisions, err := txClient.Decision.CreateBulk(builderChunk...).Save(ctx)
347+
if err := slicetools.Batch(ctx, decisionBuilders, c.decisionBulkSize, func(ctx context.Context, b []*ent.DecisionCreate) error {
348+
insertedDecisions, err := txClient.Decision.CreateBulk(b...).Save(ctx)
357349
if err != nil {
358-
return 0, 0, 0, rollbackOnError(txClient, err, "bulk creating decisions")
350+
return err
359351
}
360-
361352
inserted += len(insertedDecisions)
353+
return nil
354+
}); err != nil {
355+
return 0, 0, 0, rollbackOnError(txClient, err, "bulk creating decisions")
362356
}
363357

364358
log.Debugf("deleted %d decisions for %s vs %s", deleted, decOrigin, *alertItem.Decisions[0].Origin)
@@ -371,7 +365,7 @@ func (c *Client) UpdateCommunityBlocklist(ctx context.Context, alertItem *models
371365
return alertRef.ID, inserted, deleted, nil
372366
}
373367

374-
func (c *Client) createDecisionChunk(ctx context.Context, simulated bool, stopAtTime time.Time, decisions []*models.Decision) ([]*ent.Decision, error) {
368+
func (c *Client) createDecisionBatch(ctx context.Context, simulated bool, stopAtTime time.Time, decisions []*models.Decision) ([]*ent.Decision, error) {
375369
decisionCreate := []*ent.DecisionCreate{}
376370

377371
for _, decisionItem := range decisions {
@@ -548,15 +542,15 @@ func buildMetaCreates(ctx context.Context, logger log.FieldLogger, client *ent.C
548542

549543
func buildDecisions(ctx context.Context, logger log.FieldLogger, client *Client, alertItem *models.Alert, stopAtTime time.Time) ([]*ent.Decision, int, error) {
550544
decisions := []*ent.Decision{}
551-
552-
decisionChunks := slicetools.Chunks(alertItem.Decisions, client.decisionBulkSize)
553-
for _, decisionChunk := range decisionChunks {
554-
decisionRet, err := client.createDecisionChunk(ctx, *alertItem.Simulated, stopAtTime, decisionChunk)
545+
if err := slicetools.Batch(ctx, alertItem.Decisions, client.decisionBulkSize, func(ctx context.Context, part []*models.Decision) error {
546+
ret, err := client.createDecisionBatch(ctx, *alertItem.Simulated, stopAtTime, part)
555547
if err != nil {
556-
return nil, 0, fmt.Errorf("creating alert decisions: %w", err)
548+
return fmt.Errorf("creating alert decisions: %w", err)
557549
}
558-
559-
decisions = append(decisions, decisionRet...)
550+
decisions = append(decisions, ret...)
551+
return nil
552+
}); err != nil {
553+
return nil, 0, err
560554
}
561555

562556
discarded := len(alertItem.Decisions) - len(decisions)
@@ -617,15 +611,13 @@ func saveAlerts(ctx context.Context, c *Client, batch []alertCreatePlan) ([]stri
617611
continue
618612
}
619613

620-
decisionsChunk := slicetools.Chunks(d, c.decisionBulkSize)
621-
622-
for _, d2 := range decisionsChunk {
623-
if err := retryOnBusy(func() error {
614+
if err := slicetools.Batch(ctx, d, c.decisionBulkSize, func(ctx context.Context, d2 []*ent.Decision) error {
615+
return retryOnBusy(func() error {
624616
_, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(ctx)
625617
return err
626-
}); err != nil {
627-
return nil, fmt.Errorf("attach decisions to alert %d: %w", a.ID, err)
628-
}
618+
})
619+
}); err != nil {
620+
return nil, fmt.Errorf("attach decisions to alert %d: %w", a.ID, err)
629621
}
630622
}
631623

@@ -637,7 +629,7 @@ type alertCreatePlan struct {
637629
decisions []*ent.Decision
638630
}
639631

640-
func (c *Client) createAlertChunk(ctx context.Context, machineID string, owner *ent.Machine, alerts []*models.Alert) ([]string, error) {
632+
func (c *Client) createAlertBatch(ctx context.Context, machineID string, owner *ent.Machine, alerts []*models.Alert) ([]string, error) {
641633
batch := make([]alertCreatePlan, 0, len(alerts))
642634

643635
for _, alertItem := range alerts {
@@ -737,16 +729,16 @@ func (c *Client) CreateAlert(ctx context.Context, machineID string, alertList []
737729

738730
c.Log.Debugf("writing %d items", len(alertList))
739731

740-
alertChunks := slicetools.Chunks(alertList, alertCreateBulkSize)
741732
alertIDs := []string{}
742-
743-
for _, alertChunk := range alertChunks {
744-
ids, err := c.createAlertChunk(ctx, machineID, owner, alertChunk)
733+
if err := slicetools.Batch(ctx, alertList, alertCreateBulkSize, func(ctx context.Context, part []*models.Alert) error {
734+
ids, err := c.createAlertBatch(ctx, machineID, owner, part)
745735
if err != nil {
746-
return nil, fmt.Errorf("machine '%s': %w", machineID, err)
736+
return fmt.Errorf("machine %q: %w", machineID, err)
747737
}
748-
749738
alertIDs = append(alertIDs, ids...)
739+
return nil
740+
}); err != nil {
741+
return nil, err
750742
}
751743

752744
if owner != nil {

pkg/database/decisions.go

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -304,68 +304,79 @@ func decisionIDs(decisions []*ent.Decision) []int {
304304
return ids
305305
}
306306

307-
// ExpireDecisions sets the expiration of a list of decisions to now()
308-
// It returns the number of impacted decisions for the CAPI/PAPI
309-
func (c *Client) ExpireDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) {
310-
if len(decisions) <= decisionDeleteBulkSize {
311-
ids := decisionIDs(decisions)
307+
// expireDecisionBatch expires the decisions as a single operation.
308+
func (c *Client) expireDecisionBatch(ctx context.Context, batch []*ent.Decision, now time.Time) (int, error) {
309+
ids := decisionIDs(batch)
310+
311+
rows, err := c.Ent.Decision.
312+
Update().
313+
Where(decision.IDIn(ids...)).
314+
SetUntil(now).
315+
Save(ctx)
316+
if err != nil {
317+
return 0, fmt.Errorf("expire decisions with provided filter: %w", err)
318+
}
312319

313-
rows, err := c.Ent.Decision.Update().Where(
314-
decision.IDIn(ids...),
315-
).SetUntil(time.Now().UTC()).Save(ctx)
316-
if err != nil {
317-
return 0, fmt.Errorf("expire decisions with provided filter: %w", err)
318-
}
320+
return rows, nil
321+
}
319322

320-
return rows, nil
323+
// ExpireDecisions sets the expiration of a list of decisions to now(),
324+
// in multiple operations if len(decisions) > decisionDeleteBulkSize.
325+
// It returns the number of impacted decisions for the CAPI/PAPI, even in case of error.
326+
func (c *Client) ExpireDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) {
327+
if len(decisions) == 0 {
328+
return 0, nil
321329
}
322330

323-
// big batch, let's split it and recurse
331+
now := time.Now().UTC()
324332

325333
total := 0
326-
327-
for _, chunk := range slicetools.Chunks(decisions, decisionDeleteBulkSize) {
328-
rows, err := c.ExpireDecisions(ctx, chunk)
334+
err := slicetools.Batch(ctx, decisions, decisionDeleteBulkSize, func(ctx context.Context, batch []*ent.Decision) error {
335+
rows, err := c.expireDecisionBatch(ctx, batch, now)
329336
if err != nil {
330-
return total, err
337+
return err
331338
}
332-
333339
total += rows
334-
}
340+
return nil
341+
})
335342

336-
return total, nil
343+
return total, err
337344
}
338345

339-
// DeleteDecisions removes a list of decisions from the database
340-
// It returns the number of impacted decisions for the CAPI/PAPI
341-
func (c *Client) DeleteDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) {
342-
if len(decisions) < decisionDeleteBulkSize {
343-
ids := decisionIDs(decisions)
344-
345-
rows, err := c.Ent.Decision.Delete().Where(
346-
decision.IDIn(ids...),
347-
).Exec(ctx)
348-
if err != nil {
349-
return 0, fmt.Errorf("hard delete decisions with provided filter: %w", err)
350-
}
346+
// deleteDecisionBatch removes the decisions as a single operation.
347+
func (c *Client) deleteDecisionBatch(ctx context.Context, batch []*ent.Decision) (int, error) {
348+
ids := decisionIDs(batch)
351349

352-
return rows, nil
350+
rows, err := c.Ent.Decision.
351+
Delete().
352+
Where(decision.IDIn(ids...)).
353+
Exec(ctx)
354+
if err != nil {
355+
return 0, fmt.Errorf("hard delete decisions with provided filter: %w", err)
353356
}
354357

355-
// big batch, let's split it and recurse
358+
return rows, nil
359+
}
356360

357-
tot := 0
361+
// DeleteDecisions removes a list of decisions from the database,
362+
// in multiple operations if len(decisions) > decisionDeleteBulkSize.
363+
// It returns the number of impacted decisions for the CAPI/PAPI, even in case of error.
364+
func (c *Client) DeleteDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) {
365+
if len(decisions) == 0 {
366+
return 0, nil
367+
}
358368

359-
for _, chunk := range slicetools.Chunks(decisions, decisionDeleteBulkSize) {
360-
rows, err := c.DeleteDecisions(ctx, chunk)
369+
total := 0
370+
err := slicetools.Batch(ctx, decisions, decisionDeleteBulkSize, func(ctx context.Context, batch []*ent.Decision) error {
371+
rows, err := c.deleteDecisionBatch(ctx, batch)
361372
if err != nil {
362-
return tot, err
373+
return err
363374
}
375+
total += rows
376+
return nil
377+
})
364378

365-
tot += rows
366-
}
367-
368-
return tot, nil
379+
return total, err
369380
}
370381

371382
// ExpireDecisionByID set the expiration of a decision to now()

0 commit comments

Comments
 (0)