Skip to content

Commit 81c45bf

Browse files
alexweavpapagian
andauthored
Annotations: Split cleanup into separate queries and deletes to avoid deadlocks on MySQL (#80329)
* Split subquery when cleaning annotations * update comment * Raise batch size, now that we pay attention to it * Iterate in batches * Separate cancellable batch implementation to allow for multi-statement callbacks, add overload for single-statement use * Use split-out utility in outer batching loop so it respects context cancellation * guard against empty queries * Use SQL parameters * Use same approach for tags * drop unused function * Work around parameter limit on sqlite for large batches * Bulk insert test data in DB * Refactor test to customise test data creation * Add test for catching SQLITE_MAX_VARIABLE_NUMBER limit * Turn annotation cleanup test to integration tests * lint --------- Co-authored-by: Sofia Papagiannaki <[email protected]>
1 parent f84c8f6 commit 81c45bf

File tree

3 files changed

+199
-62
lines changed

3 files changed

+199
-62
lines changed

pkg/services/annotations/annotationsimpl/cleanup_test.go

Lines changed: 91 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package annotationsimpl
22

33
import (
44
"context"
5+
"errors"
56
"testing"
67
"time"
78

@@ -14,31 +15,30 @@ import (
1415
"github.com/grafana/grafana/pkg/setting"
1516
)
1617

17-
func TestAnnotationCleanUp(t *testing.T) {
18-
fakeSQL := db.InitTestDB(t)
19-
20-
t.Cleanup(func() {
21-
err := fakeSQL.WithDbSession(context.Background(), func(session *db.Session) error {
22-
_, err := session.Exec("DELETE FROM annotation")
23-
return err
24-
})
25-
assert.NoError(t, err)
26-
})
18+
func TestIntegrationAnnotationCleanUp(t *testing.T) {
19+
if testing.Short() {
20+
t.Skip("Skipping integration test")
21+
}
2722

28-
createTestAnnotations(t, fakeSQL, 21, 6)
29-
assertAnnotationCount(t, fakeSQL, "", 21)
30-
assertAnnotationTagCount(t, fakeSQL, 42)
23+
fakeSQL := db.InitTestDB(t)
3124

3225
tests := []struct {
33-
name string
34-
cfg *setting.Cfg
35-
alertAnnotationCount int64
36-
dashboardAnnotationCount int64
37-
APIAnnotationCount int64
38-
affectedAnnotations int64
26+
name string
27+
createAnnotationsNum int
28+
createOldAnnotationsNum int
29+
30+
cfg *setting.Cfg
31+
alertAnnotationCount int64
32+
annotationCleanupJobBatchSize int
33+
dashboardAnnotationCount int64
34+
APIAnnotationCount int64
35+
affectedAnnotations int64
3936
}{
4037
{
41-
name: "default settings should not delete any annotations",
38+
name: "default settings should not delete any annotations",
39+
createAnnotationsNum: 21,
40+
createOldAnnotationsNum: 6,
41+
annotationCleanupJobBatchSize: 1,
4242
cfg: &setting.Cfg{
4343
AlertingAnnotationCleanupSetting: settingsFn(0, 0),
4444
DashboardAnnotationCleanupSettings: settingsFn(0, 0),
@@ -50,7 +50,10 @@ func TestAnnotationCleanUp(t *testing.T) {
5050
affectedAnnotations: 0,
5151
},
5252
{
53-
name: "should remove annotations created before cut off point",
53+
name: "should remove annotations created before cut off point",
54+
createAnnotationsNum: 21,
55+
createOldAnnotationsNum: 6,
56+
annotationCleanupJobBatchSize: 1,
5457
cfg: &setting.Cfg{
5558
AlertingAnnotationCleanupSetting: settingsFn(time.Hour*48, 0),
5659
DashboardAnnotationCleanupSettings: settingsFn(time.Hour*48, 0),
@@ -62,7 +65,10 @@ func TestAnnotationCleanUp(t *testing.T) {
6265
affectedAnnotations: 6,
6366
},
6467
{
65-
name: "should only keep three annotations",
68+
name: "should only keep three annotations",
69+
createAnnotationsNum: 15,
70+
createOldAnnotationsNum: 6,
71+
annotationCleanupJobBatchSize: 1,
6672
cfg: &setting.Cfg{
6773
AlertingAnnotationCleanupSetting: settingsFn(0, 3),
6874
DashboardAnnotationCleanupSettings: settingsFn(0, 3),
@@ -74,7 +80,10 @@ func TestAnnotationCleanUp(t *testing.T) {
7480
affectedAnnotations: 6,
7581
},
7682
{
77-
name: "running the max count delete again should not remove any annotations",
83+
name: "running the max count delete again should not remove any annotations",
84+
createAnnotationsNum: 9,
85+
createOldAnnotationsNum: 6,
86+
annotationCleanupJobBatchSize: 1,
7887
cfg: &setting.Cfg{
7988
AlertingAnnotationCleanupSetting: settingsFn(0, 3),
8089
DashboardAnnotationCleanupSettings: settingsFn(0, 3),
@@ -85,12 +94,40 @@ func TestAnnotationCleanUp(t *testing.T) {
8594
APIAnnotationCount: 3,
8695
affectedAnnotations: 0,
8796
},
97+
{
98+
name: "should not fail if batch size is larger than SQLITE_MAX_VARIABLE_NUMBER for SQLite >= 3.32.0",
99+
createAnnotationsNum: 40003,
100+
createOldAnnotationsNum: 0,
101+
annotationCleanupJobBatchSize: 32767,
102+
cfg: &setting.Cfg{
103+
AlertingAnnotationCleanupSetting: settingsFn(0, 1),
104+
DashboardAnnotationCleanupSettings: settingsFn(0, 1),
105+
APIAnnotationCleanupSettings: settingsFn(0, 1),
106+
},
107+
alertAnnotationCount: 1,
108+
dashboardAnnotationCount: 1,
109+
APIAnnotationCount: 1,
110+
affectedAnnotations: 40000,
111+
},
88112
}
89113

90114
for _, test := range tests {
91115
t.Run(test.name, func(t *testing.T) {
116+
createTestAnnotations(t, fakeSQL, test.createAnnotationsNum, test.createOldAnnotationsNum)
117+
assertAnnotationCount(t, fakeSQL, "", int64(test.createAnnotationsNum))
118+
assertAnnotationTagCount(t, fakeSQL, 2*int64(test.createAnnotationsNum))
119+
120+
t.Cleanup(func() {
121+
err := fakeSQL.WithDbSession(context.Background(), func(session *db.Session) error {
122+
_, deleteAnnotationErr := session.Exec("DELETE FROM annotation")
123+
_, deleteAnnotationTagErr := session.Exec("DELETE FROM annotation_tag")
124+
return errors.Join(deleteAnnotationErr, deleteAnnotationTagErr)
125+
})
126+
assert.NoError(t, err)
127+
})
128+
92129
cfg := setting.NewCfg()
93-
cfg.AnnotationCleanupJobBatchSize = 1
130+
cfg.AnnotationCleanupJobBatchSize = int64(test.annotationCleanupJobBatchSize)
94131
cleaner := ProvideCleanupService(fakeSQL, cfg)
95132
affectedAnnotations, affectedAnnotationTags, err := cleaner.Run(context.Background(), test.cfg)
96133
require.NoError(t, err)
@@ -111,7 +148,11 @@ func TestAnnotationCleanUp(t *testing.T) {
111148
}
112149
}
113150

114-
func TestOldAnnotationsAreDeletedFirst(t *testing.T) {
151+
func TestIntegrationOldAnnotationsAreDeletedFirst(t *testing.T) {
152+
if testing.Short() {
153+
t.Skip("Skipping integration test")
154+
}
155+
115156
fakeSQL := db.InitTestDB(t)
116157

117158
t.Cleanup(func() {
@@ -193,8 +234,11 @@ func createTestAnnotations(t *testing.T, store db.DB, expectedCount int, oldAnno
193234

194235
cutoffDate := time.Now()
195236

237+
newAnnotations := make([]*annotations.Item, 0, expectedCount)
238+
newAnnotationTags := make([]*annotationTag, 0, 2*expectedCount)
196239
for i := 0; i < expectedCount; i++ {
197240
a := &annotations.Item{
241+
ID: int64(i + 1),
198242
DashboardID: 1,
199243
OrgID: 1,
200244
UserID: 1,
@@ -222,20 +266,29 @@ func createTestAnnotations(t *testing.T, store db.DB, expectedCount int, oldAnno
222266
a.Created = cutoffDate.AddDate(-10, 0, -10).UnixNano() / int64(time.Millisecond)
223267
}
224268

225-
err := store.WithDbSession(context.Background(), func(sess *db.Session) error {
226-
_, err := sess.Insert(a)
227-
require.NoError(t, err, "should be able to save annotation", err)
228-
229-
// mimick the SQL annotation Save logic by writing records to the annotation_tag table
230-
// we need to ensure they get deleted when we clean up annotations
231-
for tagID := range []int{1, 2} {
232-
_, err = sess.Exec("INSERT INTO annotation_tag (annotation_id, tag_id) VALUES(?,?)", a.ID, tagID)
233-
require.NoError(t, err, "should be able to save annotation tag ID", err)
234-
}
235-
return err
236-
})
237-
require.NoError(t, err)
269+
newAnnotations = append(newAnnotations, a)
270+
newAnnotationTags = append(newAnnotationTags, &annotationTag{AnnotationID: a.ID, TagID: 1}, &annotationTag{AnnotationID: a.ID, TagID: 2})
238271
}
272+
273+
err := store.WithDbSession(context.Background(), func(sess *db.Session) error {
274+
batchsize := 500
275+
for i := 0; i < len(newAnnotations); i += batchsize {
276+
_, err := sess.InsertMulti(newAnnotations[i:min(i+batchsize, len(newAnnotations))])
277+
require.NoError(t, err)
278+
}
279+
return nil
280+
})
281+
require.NoError(t, err)
282+
283+
err = store.WithDbSession(context.Background(), func(sess *db.Session) error {
284+
batchsize := 500
285+
for i := 0; i < len(newAnnotationTags); i += batchsize {
286+
_, err := sess.InsertMulti(newAnnotationTags[i:min(i+batchsize, len(newAnnotationTags))])
287+
require.NoError(t, err)
288+
}
289+
return nil
290+
})
291+
require.NoError(t, err)
239292
}
240293

241294
func settingsFn(maxAge time.Duration, maxCount int64) setting.AnnotationCleanupSettings {

pkg/services/annotations/annotationsimpl/xorm_store.go

Lines changed: 107 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/grafana/grafana/pkg/services/annotations/accesscontrol"
13+
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
1314

1415
"github.com/grafana/grafana/pkg/infra/db"
1516
"github.com/grafana/grafana/pkg/infra/log"
@@ -519,52 +520,135 @@ func (r *xormRepositoryImpl) CleanAnnotations(ctx context.Context, cfg setting.A
519520
var totalAffected int64
520521
if cfg.MaxAge > 0 {
521522
cutoffDate := timeNow().Add(-cfg.MaxAge).UnixNano() / int64(time.Millisecond)
522-
deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s AND created < %v ORDER BY id DESC %s) a)`
523-
sql := fmt.Sprintf(deleteQuery, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize))
523+
// Single-statement approaches, specifically ones using batched sub-queries, seem to deadlock with concurrent inserts on MySQL.
524+
// We have a bounded batch size, so work around this by first loading the IDs into memory and allowing any locks to flush inside each batch.
525+
// This may under-delete when concurrent inserts happen, but any such annotations will simply be cleaned on the next cycle.
526+
//
527+
// We execute the following batched operation repeatedly until either we run out of objects, the context is cancelled, or there is an error.
528+
affected, err := untilDoneOrCancelled(ctx, func() (int64, error) {
529+
cond := fmt.Sprintf(`%s AND created < %v ORDER BY id DESC %s`, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize))
530+
ids, err := r.fetchIDs(ctx, "annotation", cond)
531+
if err != nil {
532+
return 0, err
533+
}
534+
r.log.Error("Annotations to clean by time", "count", len(ids), "ids", ids, "cond", cond, "err", err)
524535

525-
affected, err := r.executeUntilDoneOrCancelled(ctx, sql)
536+
x, y := r.deleteByIDs(ctx, "annotation", ids)
537+
r.log.Error("cleaned annotations by time", "count", len(ids), "affected", x, "err", y)
538+
return x, y
539+
})
526540
totalAffected += affected
527541
if err != nil {
528542
return totalAffected, err
529543
}
530544
}
531545

532546
if cfg.MaxCount > 0 {
533-
deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s ORDER BY id DESC %s) a)`
534-
sql := fmt.Sprintf(deleteQuery, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount))
535-
affected, err := r.executeUntilDoneOrCancelled(ctx, sql)
547+
// Similar strategy as the above cleanup process, to avoid deadlocks.
548+
affected, err := untilDoneOrCancelled(ctx, func() (int64, error) {
549+
cond := fmt.Sprintf(`%s ORDER BY id DESC %s`, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount))
550+
ids, err := r.fetchIDs(ctx, "annotation", cond)
551+
if err != nil {
552+
return 0, err
553+
}
554+
r.log.Error("Annotations to clean by count", "count", len(ids), "ids", ids, "cond", cond, "err", err)
555+
556+
x, y := r.deleteByIDs(ctx, "annotation", ids)
557+
r.log.Error("cleaned annotations by count", "count", len(ids), "affected", x, "err", y)
558+
return x, y
559+
})
536560
totalAffected += affected
537-
return totalAffected, err
561+
if err != nil {
562+
return totalAffected, err
563+
}
538564
}
539565

540566
return totalAffected, nil
541567
}
542568

543569
func (r *xormRepositoryImpl) CleanOrphanedAnnotationTags(ctx context.Context) (int64, error) {
544-
deleteQuery := `DELETE FROM annotation_tag WHERE id IN ( SELECT id FROM (SELECT id FROM annotation_tag WHERE NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s) a)`
545-
sql := fmt.Sprintf(deleteQuery, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize))
546-
return r.executeUntilDoneOrCancelled(ctx, sql)
570+
return untilDoneOrCancelled(ctx, func() (int64, error) {
571+
cond := fmt.Sprintf(`NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s`, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize))
572+
ids, err := r.fetchIDs(ctx, "annotation_tag", cond)
573+
if err != nil {
574+
return 0, err
575+
}
576+
r.log.Error("Tags to clean", "count", len(ids), "ids", ids, "cond", cond, "err", err)
577+
578+
x, y := r.deleteByIDs(ctx, "annotation_tag", ids)
579+
r.log.Error("cleaned tags", "count", len(ids), "affected", x, "err", y)
580+
return x, y
581+
})
582+
}
583+
584+
func (r *xormRepositoryImpl) fetchIDs(ctx context.Context, table, condition string) ([]int64, error) {
585+
sql := fmt.Sprintf(`SELECT id FROM %s`, table)
586+
if condition == "" {
587+
return nil, fmt.Errorf("condition must be supplied; cannot fetch IDs from entire table")
588+
}
589+
sql += fmt.Sprintf(` WHERE %s`, condition)
590+
ids := make([]int64, 0)
591+
err := r.db.WithDbSession(ctx, func(session *db.Session) error {
592+
return session.SQL(sql).Find(&ids)
593+
})
594+
return ids, err
547595
}
548596

549-
func (r *xormRepositoryImpl) executeUntilDoneOrCancelled(ctx context.Context, sql string) (int64, error) {
597+
func (r *xormRepositoryImpl) deleteByIDs(ctx context.Context, table string, ids []int64) (int64, error) {
598+
if len(ids) == 0 {
599+
return 0, nil
600+
}
601+
602+
sql := ""
603+
args := make([]any, 0)
604+
605+
// SQLite has a parameter limit of 999.
606+
// If the batch size is bigger than that, and we're on SQLite, we have to put the IDs directly into the statement.
607+
const sqliteParameterLimit = 999
608+
if r.db.GetDBType() == migrator.SQLite && r.cfg.AnnotationCleanupJobBatchSize > sqliteParameterLimit {
609+
values := fmt.Sprint(ids[0])
610+
for _, v := range ids[1:] {
611+
values = fmt.Sprintf("%s, %d", values, v)
612+
}
613+
sql = fmt.Sprintf(`DELETE FROM %s WHERE id IN (%s)`, table, values)
614+
} else {
615+
placeholders := "?" + strings.Repeat(",?", len(ids)-1)
616+
sql = fmt.Sprintf(`DELETE FROM %s WHERE id IN (%s)`, table, placeholders)
617+
args = asAny(ids)
618+
}
619+
620+
var affected int64
621+
err := r.db.WithDbSession(ctx, func(session *db.Session) error {
622+
res, err := session.Exec(append([]any{sql}, args...)...)
623+
if err != nil {
624+
return err
625+
}
626+
affected, err = res.RowsAffected()
627+
return err
628+
})
629+
return affected, err
630+
}
631+
632+
func asAny(vs []int64) []any {
633+
r := make([]any, len(vs))
634+
for i, v := range vs {
635+
r[i] = v
636+
}
637+
return r
638+
}
639+
640+
// untilDoneOrCancelled repeatedly executes batched work until that work is either done (i.e., returns zero affected objects),
641+
// a batch produces an error, or the provided context is cancelled.
642+
// The work to be done is given as a callback that returns the number of affected objects for each batch, plus that batch's errors.
643+
func untilDoneOrCancelled(ctx context.Context, batchWork func() (int64, error)) (int64, error) {
550644
var totalAffected int64
551645
for {
552646
select {
553647
case <-ctx.Done():
554648
return totalAffected, ctx.Err()
555649
default:
556-
var affected int64
557-
err := r.db.WithDbSession(ctx, func(session *db.Session) error {
558-
res, err := session.Exec(sql)
559-
if err != nil {
560-
return err
561-
}
562-
563-
affected, err = res.RowsAffected()
564-
totalAffected += affected
565-
566-
return err
567-
})
650+
affected, err := batchWork()
651+
totalAffected += affected
568652
if err != nil {
569653
return totalAffected, err
570654
}

pkg/services/cleanup/cleanup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (j cleanUpJob) String() string {
7474
func (srv *CleanUpService) Run(ctx context.Context) error {
7575
srv.cleanUpTmpFiles(ctx)
7676

77-
ticker := time.NewTicker(time.Minute * 10)
77+
ticker := time.NewTicker(time.Minute * 1)
7878
for {
7979
select {
8080
case <-ticker.C:

0 commit comments

Comments
 (0)