diff --git a/server/store/datastore/pipeline.go b/server/store/datastore/pipeline.go index 602ccc78e44..c8fc2cf67ce 100644 --- a/server/store/datastore/pipeline.go +++ b/server/store/datastore/pipeline.go @@ -15,8 +15,11 @@ package datastore import ( + "context" + "strings" "time" + "github.com/cenkalti/backoff/v5" "xorm.io/builder" "xorm.io/xorm" @@ -128,47 +131,80 @@ func (s storage) GetPipelineCount() (int64, error) { return s.engine.Count(new(model.Pipeline)) } +// CreatePipeline creates a new pipeline with retry logic for unique constraint errors. func (s storage) CreatePipeline(pipeline *model.Pipeline, stepList ...*model.Step) error { - sess := s.engine.NewSession() - defer sess.Close() - if err := sess.Begin(); err != nil { - return err - } - - repoExist, err := sess.Where("id = ?", pipeline.RepoID).Exist(&model.Repo{}) - if err != nil { - return err - } + // Maximum number of retries + const maxRetries = 3 + + // Create backoff configuration + exponentialBackoff := backoff.NewExponentialBackOff() + + // Execute with backoff retry + _, err := backoff.Retry(context.Background(), func() (struct{}, error) { + sess := s.engine.NewSession() + defer sess.Close() + if err := sess.Begin(); err != nil { + return struct{}{}, err + } - if !repoExist { - return ErrorRepoNotExist{RepoID: pipeline.RepoID} - } + repoExist, err := sess.Where("id = ?", pipeline.RepoID).Exist(&model.Repo{}) + if err != nil { + return struct{}{}, err + } - // calc pipeline number - var number int64 - if _, err := sess.Select("MAX(number)"). - Table(new(model.Pipeline)). - Where("repo_id = ?", pipeline.RepoID). - Get(&number); err != nil { - return err - } - pipeline.Number = number + 1 + if !repoExist { + return struct{}{}, ErrorRepoNotExist{RepoID: pipeline.RepoID} + } - pipeline.Created = time.Now().UTC().Unix() - // only Insert set auto created ID back to object - if _, err := sess.Insert(pipeline); err != nil { - return err - } + // calc pipeline number + var number int64 + if _, err := sess.Select("MAX(number)"). + Table(new(model.Pipeline)). + Where("repo_id = ?", pipeline.RepoID). + Get(&number); err != nil { + return struct{}{}, err + } + pipeline.Number = number + 1 - for i := range stepList { - stepList[i].PipelineID = pipeline.ID + pipeline.Created = time.Now().UTC().Unix() // only Insert set auto created ID back to object - if _, err := sess.Insert(stepList[i]); err != nil { - return err + if _, err := sess.Insert(pipeline); err != nil { + if isUniqueConstraintError(err) { + return struct{}{}, err + } + return struct{}{}, backoff.Permanent(err) + } + + for i := range stepList { + stepList[i].PipelineID = pipeline.ID + // only Insert set auto created ID back to object + if _, err := sess.Insert(stepList[i]); err != nil { + if isUniqueConstraintError(err) { + return struct{}{}, err + } + return struct{}{}, backoff.Permanent(err) + } } + + return struct{}{}, sess.Commit() + }, backoff.WithBackOff(exponentialBackoff), backoff.WithMaxTries(maxRetries)) + + return err +} + +// isUniqueConstraintError checks if an error is a unique constraint violation error. +func isUniqueConstraintError(err error) bool { + if err == nil { + return false } - return sess.Commit() + errStr := err.Error() + // Check for common unique constraint error patterns across different databases + return strings.Contains(errStr, "duplicate key") || + strings.Contains(errStr, "Duplicate entry") || + strings.Contains(errStr, "UNIQUE constraint failed") || + strings.Contains(errStr, "unique constraint") || + strings.Contains(errStr, "UNIQUE violation") } func (s storage) UpdatePipeline(pipeline *model.Pipeline) error {