-
Notifications
You must be signed in to change notification settings - Fork 9
Database Performance Optimizations #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: db-cleanup-baseline
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ package annotationsimpl | |
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "testing" | ||
| "time" | ||
|
|
||
|
|
@@ -14,31 +15,30 @@ import ( | |
| "github.com/grafana/grafana/pkg/setting" | ||
| ) | ||
|
|
||
| func TestAnnotationCleanUp(t *testing.T) { | ||
| fakeSQL := db.InitTestDB(t) | ||
|
|
||
| t.Cleanup(func() { | ||
| err := fakeSQL.WithDbSession(context.Background(), func(session *db.Session) error { | ||
| _, err := session.Exec("DELETE FROM annotation") | ||
| return err | ||
| }) | ||
| assert.NoError(t, err) | ||
| }) | ||
| func TestIntegrationAnnotationCleanUp(t *testing.T) { | ||
| if testing.Short() { | ||
| t.Skip("Skipping integration test") | ||
| } | ||
|
|
||
| createTestAnnotations(t, fakeSQL, 21, 6) | ||
| assertAnnotationCount(t, fakeSQL, "", 21) | ||
| assertAnnotationTagCount(t, fakeSQL, 42) | ||
| fakeSQL := db.InitTestDB(t) | ||
|
|
||
| tests := []struct { | ||
| name string | ||
| cfg *setting.Cfg | ||
| alertAnnotationCount int64 | ||
| dashboardAnnotationCount int64 | ||
| APIAnnotationCount int64 | ||
| affectedAnnotations int64 | ||
| name string | ||
| createAnnotationsNum int | ||
| createOldAnnotationsNum int | ||
|
|
||
| cfg *setting.Cfg | ||
| alertAnnotationCount int64 | ||
| annotationCleanupJobBatchSize int | ||
| dashboardAnnotationCount int64 | ||
| APIAnnotationCount int64 | ||
| affectedAnnotations int64 | ||
| }{ | ||
| { | ||
| name: "default settings should not delete any annotations", | ||
| name: "default settings should not delete any annotations", | ||
| createAnnotationsNum: 21, | ||
| createOldAnnotationsNum: 6, | ||
| annotationCleanupJobBatchSize: 1, | ||
| cfg: &setting.Cfg{ | ||
| AlertingAnnotationCleanupSetting: settingsFn(0, 0), | ||
| DashboardAnnotationCleanupSettings: settingsFn(0, 0), | ||
|
|
@@ -50,7 +50,10 @@ func TestAnnotationCleanUp(t *testing.T) { | |
| affectedAnnotations: 0, | ||
| }, | ||
| { | ||
| name: "should remove annotations created before cut off point", | ||
| name: "should remove annotations created before cut off point", | ||
| createAnnotationsNum: 21, | ||
| createOldAnnotationsNum: 6, | ||
| annotationCleanupJobBatchSize: 1, | ||
| cfg: &setting.Cfg{ | ||
| AlertingAnnotationCleanupSetting: settingsFn(time.Hour*48, 0), | ||
| DashboardAnnotationCleanupSettings: settingsFn(time.Hour*48, 0), | ||
|
|
@@ -62,7 +65,10 @@ func TestAnnotationCleanUp(t *testing.T) { | |
| affectedAnnotations: 6, | ||
| }, | ||
| { | ||
| name: "should only keep three annotations", | ||
| name: "should only keep three annotations", | ||
| createAnnotationsNum: 15, | ||
| createOldAnnotationsNum: 6, | ||
| annotationCleanupJobBatchSize: 1, | ||
| cfg: &setting.Cfg{ | ||
| AlertingAnnotationCleanupSetting: settingsFn(0, 3), | ||
| DashboardAnnotationCleanupSettings: settingsFn(0, 3), | ||
|
|
@@ -74,7 +80,10 @@ func TestAnnotationCleanUp(t *testing.T) { | |
| affectedAnnotations: 6, | ||
| }, | ||
| { | ||
| name: "running the max count delete again should not remove any annotations", | ||
| name: "running the max count delete again should not remove any annotations", | ||
| createAnnotationsNum: 9, | ||
| createOldAnnotationsNum: 6, | ||
| annotationCleanupJobBatchSize: 1, | ||
| cfg: &setting.Cfg{ | ||
| AlertingAnnotationCleanupSetting: settingsFn(0, 3), | ||
| DashboardAnnotationCleanupSettings: settingsFn(0, 3), | ||
|
|
@@ -85,12 +94,40 @@ func TestAnnotationCleanUp(t *testing.T) { | |
| APIAnnotationCount: 3, | ||
| affectedAnnotations: 0, | ||
| }, | ||
| { | ||
| name: "should not fail if batch size is larger than SQLITE_MAX_VARIABLE_NUMBER for SQLite >= 3.32.0", | ||
| createAnnotationsNum: 40003, | ||
| createOldAnnotationsNum: 0, | ||
| annotationCleanupJobBatchSize: 32767, | ||
| cfg: &setting.Cfg{ | ||
| AlertingAnnotationCleanupSetting: settingsFn(0, 1), | ||
| DashboardAnnotationCleanupSettings: settingsFn(0, 1), | ||
| APIAnnotationCleanupSettings: settingsFn(0, 1), | ||
| }, | ||
| alertAnnotationCount: 1, | ||
| dashboardAnnotationCount: 1, | ||
| APIAnnotationCount: 1, | ||
| affectedAnnotations: 40000, | ||
| }, | ||
| } | ||
|
|
||
| for _, test := range tests { | ||
| t.Run(test.name, func(t *testing.T) { | ||
| createTestAnnotations(t, fakeSQL, test.createAnnotationsNum, test.createOldAnnotationsNum) | ||
| assertAnnotationCount(t, fakeSQL, "", int64(test.createAnnotationsNum)) | ||
| assertAnnotationTagCount(t, fakeSQL, 2*int64(test.createAnnotationsNum)) | ||
|
|
||
| t.Cleanup(func() { | ||
| err := fakeSQL.WithDbSession(context.Background(), func(session *db.Session) error { | ||
| _, deleteAnnotationErr := session.Exec("DELETE FROM annotation") | ||
| _, deleteAnnotationTagErr := session.Exec("DELETE FROM annotation_tag") | ||
| return errors.Join(deleteAnnotationErr, deleteAnnotationTagErr) | ||
| }) | ||
| assert.NoError(t, err) | ||
| }) | ||
|
|
||
| cfg := setting.NewCfg() | ||
| cfg.AnnotationCleanupJobBatchSize = 1 | ||
| cfg.AnnotationCleanupJobBatchSize = int64(test.annotationCleanupJobBatchSize) | ||
| cleaner := ProvideCleanupService(fakeSQL, cfg) | ||
| affectedAnnotations, affectedAnnotationTags, err := cleaner.Run(context.Background(), test.cfg) | ||
| require.NoError(t, err) | ||
|
|
@@ -111,7 +148,11 @@ func TestAnnotationCleanUp(t *testing.T) { | |
| } | ||
| } | ||
|
|
||
| func TestOldAnnotationsAreDeletedFirst(t *testing.T) { | ||
| func TestIntegrationOldAnnotationsAreDeletedFirst(t *testing.T) { | ||
| if testing.Short() { | ||
| t.Skip("Skipping integration test") | ||
| } | ||
|
|
||
| fakeSQL := db.InitTestDB(t) | ||
|
|
||
| t.Cleanup(func() { | ||
|
|
@@ -193,8 +234,11 @@ func createTestAnnotations(t *testing.T, store db.DB, expectedCount int, oldAnno | |
|
|
||
| cutoffDate := time.Now() | ||
|
|
||
| newAnnotations := make([]*annotations.Item, 0, expectedCount) | ||
| newAnnotationTags := make([]*annotationTag, 0, 2*expectedCount) | ||
| for i := 0; i < expectedCount; i++ { | ||
| a := &annotations.Item{ | ||
| ID: int64(i + 1), | ||
| DashboardID: 1, | ||
| OrgID: 1, | ||
| UserID: 1, | ||
|
|
@@ -222,20 +266,29 @@ func createTestAnnotations(t *testing.T, store db.DB, expectedCount int, oldAnno | |
| a.Created = cutoffDate.AddDate(-10, 0, -10).UnixNano() / int64(time.Millisecond) | ||
| } | ||
|
|
||
| err := store.WithDbSession(context.Background(), func(sess *db.Session) error { | ||
| _, err := sess.Insert(a) | ||
| require.NoError(t, err, "should be able to save annotation", err) | ||
|
|
||
| // mimick the SQL annotation Save logic by writing records to the annotation_tag table | ||
| // we need to ensure they get deleted when we clean up annotations | ||
| for tagID := range []int{1, 2} { | ||
| _, err = sess.Exec("INSERT INTO annotation_tag (annotation_id, tag_id) VALUES(?,?)", a.ID, tagID) | ||
| require.NoError(t, err, "should be able to save annotation tag ID", err) | ||
| } | ||
| return err | ||
| }) | ||
| require.NoError(t, err) | ||
| newAnnotations = append(newAnnotations, a) | ||
| newAnnotationTags = append(newAnnotationTags, &annotationTag{AnnotationID: a.ID, TagID: 1}, &annotationTag{AnnotationID: a.ID, TagID: 2}) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. logic: Missing definition for |
||
| } | ||
|
|
||
| err := store.WithDbSession(context.Background(), func(sess *db.Session) error { | ||
| batchsize := 500 | ||
| for i := 0; i < len(newAnnotations); i += batchsize { | ||
| _, err := sess.InsertMulti(newAnnotations[i:min(i+batchsize, len(newAnnotations))]) | ||
| require.NoError(t, err) | ||
| } | ||
| return nil | ||
| }) | ||
| require.NoError(t, err) | ||
|
|
||
| err = store.WithDbSession(context.Background(), func(sess *db.Session) error { | ||
| batchsize := 500 | ||
| for i := 0; i < len(newAnnotationTags); i += batchsize { | ||
| _, err := sess.InsertMulti(newAnnotationTags[i:min(i+batchsize, len(newAnnotationTags))]) | ||
| require.NoError(t, err) | ||
| } | ||
| return nil | ||
| }) | ||
| require.NoError(t, err) | ||
| } | ||
|
|
||
| func settingsFn(maxAge time.Duration, maxCount int64) setting.AnnotationCleanupSettings { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ import ( | |
| "time" | ||
|
|
||
| "github.com/grafana/grafana/pkg/services/annotations/accesscontrol" | ||
| "github.com/grafana/grafana/pkg/services/sqlstore/migrator" | ||
|
|
||
| "github.com/grafana/grafana/pkg/infra/db" | ||
| "github.com/grafana/grafana/pkg/infra/log" | ||
|
|
@@ -519,52 +520,135 @@ func (r *xormRepositoryImpl) CleanAnnotations(ctx context.Context, cfg setting.A | |
| var totalAffected int64 | ||
| if cfg.MaxAge > 0 { | ||
| cutoffDate := timeNow().Add(-cfg.MaxAge).UnixNano() / int64(time.Millisecond) | ||
| deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s AND created < %v ORDER BY id DESC %s) a)` | ||
| sql := fmt.Sprintf(deleteQuery, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) | ||
| // Single-statement approaches, specifically ones using batched sub-queries, seem to deadlock with concurrent inserts on MySQL. | ||
| // We have a bounded batch size, so work around this by first loading the IDs into memory and allowing any locks to flush inside each batch. | ||
| // This may under-delete when concurrent inserts happen, but any such annotations will simply be cleaned on the next cycle. | ||
| // | ||
| // We execute the following batched operation repeatedly until either we run out of objects, the context is cancelled, or there is an error. | ||
| affected, err := untilDoneOrCancelled(ctx, func() (int64, error) { | ||
| cond := fmt.Sprintf(`%s AND created < %v ORDER BY id DESC %s`, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) | ||
| ids, err := r.fetchIDs(ctx, "annotation", cond) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| r.log.Error("Annotations to clean by time", "count", len(ids), "ids", ids, "cond", cond, "err", err) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: Using Error level for normal operations will spam logs. Consider using Debug level for routine cleanup activities There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: Using Error level for normal operations will spam logs. Consider using Debug level for routine cleanup activities There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: Using Error level for normal operations will spam logs. Consider using Debug level for routine cleanup activities |
||
|
|
||
| affected, err := r.executeUntilDoneOrCancelled(ctx, sql) | ||
| x, y := r.deleteByIDs(ctx, "annotation", ids) | ||
| r.log.Error("cleaned annotations by time", "count", len(ids), "affected", x, "err", y) | ||
| return x, y | ||
| }) | ||
| totalAffected += affected | ||
| if err != nil { | ||
| return totalAffected, err | ||
| } | ||
| } | ||
|
|
||
| if cfg.MaxCount > 0 { | ||
| deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s ORDER BY id DESC %s) a)` | ||
| sql := fmt.Sprintf(deleteQuery, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount)) | ||
| affected, err := r.executeUntilDoneOrCancelled(ctx, sql) | ||
| // Similar strategy as the above cleanup process, to avoid deadlocks. | ||
| affected, err := untilDoneOrCancelled(ctx, func() (int64, error) { | ||
| cond := fmt.Sprintf(`%s ORDER BY id DESC %s`, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount)) | ||
| ids, err := r.fetchIDs(ctx, "annotation", cond) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| r.log.Error("Annotations to clean by count", "count", len(ids), "ids", ids, "cond", cond, "err", err) | ||
|
|
||
| x, y := r.deleteByIDs(ctx, "annotation", ids) | ||
| r.log.Error("cleaned annotations by count", "count", len(ids), "affected", x, "err", y) | ||
| return x, y | ||
| }) | ||
| totalAffected += affected | ||
| return totalAffected, err | ||
| if err != nil { | ||
| return totalAffected, err | ||
| } | ||
| } | ||
|
|
||
| return totalAffected, nil | ||
| } | ||
|
|
||
| func (r *xormRepositoryImpl) CleanOrphanedAnnotationTags(ctx context.Context) (int64, error) { | ||
| deleteQuery := `DELETE FROM annotation_tag WHERE id IN ( SELECT id FROM (SELECT id FROM annotation_tag WHERE NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s) a)` | ||
| sql := fmt.Sprintf(deleteQuery, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) | ||
| return r.executeUntilDoneOrCancelled(ctx, sql) | ||
| return untilDoneOrCancelled(ctx, func() (int64, error) { | ||
| cond := fmt.Sprintf(`NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s`, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) | ||
| ids, err := r.fetchIDs(ctx, "annotation_tag", cond) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| r.log.Error("Tags to clean", "count", len(ids), "ids", ids, "cond", cond, "err", err) | ||
|
|
||
| x, y := r.deleteByIDs(ctx, "annotation_tag", ids) | ||
| r.log.Error("cleaned tags", "count", len(ids), "affected", x, "err", y) | ||
| return x, y | ||
| }) | ||
| } | ||
|
|
||
| func (r *xormRepositoryImpl) fetchIDs(ctx context.Context, table, condition string) ([]int64, error) { | ||
| sql := fmt.Sprintf(`SELECT id FROM %s`, table) | ||
| if condition == "" { | ||
| return nil, fmt.Errorf("condition must be supplied; cannot fetch IDs from entire table") | ||
| } | ||
| sql += fmt.Sprintf(` WHERE %s`, condition) | ||
| ids := make([]int64, 0) | ||
| err := r.db.WithDbSession(ctx, func(session *db.Session) error { | ||
| return session.SQL(sql).Find(&ids) | ||
| }) | ||
| return ids, err | ||
| } | ||
|
|
||
| func (r *xormRepositoryImpl) executeUntilDoneOrCancelled(ctx context.Context, sql string) (int64, error) { | ||
| func (r *xormRepositoryImpl) deleteByIDs(ctx context.Context, table string, ids []int64) (int64, error) { | ||
| if len(ids) == 0 { | ||
| return 0, nil | ||
| } | ||
|
|
||
| sql := "" | ||
| args := make([]any, 0) | ||
|
|
||
| // SQLite has a parameter limit of 999. | ||
| // If the batch size is bigger than that, and we're on SQLite, we have to put the IDs directly into the statement. | ||
| const sqliteParameterLimit = 999 | ||
| if r.db.GetDBType() == migrator.SQLite && r.cfg.AnnotationCleanupJobBatchSize > sqliteParameterLimit { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: The batch size check should also validate against the actual number of IDs to prevent unnecessary string building when len(ids) <= sqliteParameterLimit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: The batch size check should also validate against the actual number of IDs to prevent unnecessary string building when len(ids) <= sqliteParameterLimit |
||
| values := fmt.Sprint(ids[0]) | ||
| for _, v := range ids[1:] { | ||
| values = fmt.Sprintf("%s, %d", values, v) | ||
| } | ||
| sql = fmt.Sprintf(`DELETE FROM %s WHERE id IN (%s)`, table, values) | ||
| } else { | ||
| placeholders := "?" + strings.Repeat(",?", len(ids)-1) | ||
| sql = fmt.Sprintf(`DELETE FROM %s WHERE id IN (%s)`, table, placeholders) | ||
| args = asAny(ids) | ||
| } | ||
|
|
||
| var affected int64 | ||
| err := r.db.WithDbSession(ctx, func(session *db.Session) error { | ||
| res, err := session.Exec(append([]any{sql}, args...)...) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| affected, err = res.RowsAffected() | ||
| return err | ||
| }) | ||
| return affected, err | ||
| } | ||
|
|
||
| func asAny(vs []int64) []any { | ||
| r := make([]any, len(vs)) | ||
| for i, v := range vs { | ||
| r[i] = v | ||
| } | ||
| return r | ||
| } | ||
|
|
||
| // untilDoneOrCancelled repeatedly executes batched work until that work is either done (i.e., returns zero affected objects), | ||
| // a batch produces an error, or the provided context is cancelled. | ||
| // The work to be done is given as a callback that returns the number of affected objects for each batch, plus that batch's errors. | ||
| func untilDoneOrCancelled(ctx context.Context, batchWork func() (int64, error)) (int64, error) { | ||
| var totalAffected int64 | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return totalAffected, ctx.Err() | ||
| default: | ||
| var affected int64 | ||
| err := r.db.WithDbSession(ctx, func(session *db.Session) error { | ||
| res, err := session.Exec(sql) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| affected, err = res.RowsAffected() | ||
| totalAffected += affected | ||
|
|
||
| return err | ||
| }) | ||
| affected, err := batchWork() | ||
| totalAffected += affected | ||
| if err != nil { | ||
| return totalAffected, err | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Missing definition for
annotationTagstruct - ensure this type is properly imported or defined