Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 68 additions & 32 deletions server/store/datastore/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package datastore

import (
"context"
"strings"
"time"

"github.com/cenkalti/backoff/v5"
"xorm.io/builder"
"xorm.io/xorm"

Expand Down Expand Up @@ -128,47 +131,80 @@ func (s storage) GetPipelineCount() (int64, error) {
return s.engine.Count(new(model.Pipeline))
}

// CreatePipeline creates a new pipeline with retry logic for unique constraint errors.
func (s storage) CreatePipeline(pipeline *model.Pipeline, stepList ...*model.Step) error {
sess := s.engine.NewSession()
defer sess.Close()
if err := sess.Begin(); err != nil {
return err
}

repoExist, err := sess.Where("id = ?", pipeline.RepoID).Exist(&model.Repo{})
if err != nil {
return err
}
// Maximum number of retries
const maxRetries = 3

// Create backoff configuration
exponentialBackoff := backoff.NewExponentialBackOff()

// Execute with backoff retry
_, err := backoff.Retry(context.Background(), func() (struct{}, error) {
sess := s.engine.NewSession()
defer sess.Close()
if err := sess.Begin(); err != nil {
return struct{}{}, err
}

if !repoExist {
return ErrorRepoNotExist{RepoID: pipeline.RepoID}
}
repoExist, err := sess.Where("id = ?", pipeline.RepoID).Exist(&model.Repo{})
if err != nil {
return struct{}{}, err
}

// calc pipeline number
var number int64
if _, err := sess.Select("MAX(number)").
Table(new(model.Pipeline)).
Where("repo_id = ?", pipeline.RepoID).
Get(&number); err != nil {
return err
}
pipeline.Number = number + 1
if !repoExist {
return struct{}{}, ErrorRepoNotExist{RepoID: pipeline.RepoID}
}

pipeline.Created = time.Now().UTC().Unix()
// only Insert set auto created ID back to object
if _, err := sess.Insert(pipeline); err != nil {
return err
}
// calc pipeline number
var number int64
if _, err := sess.Select("MAX(number)").
Table(new(model.Pipeline)).
Where("repo_id = ?", pipeline.RepoID).
Get(&number); err != nil {
return struct{}{}, err
}
pipeline.Number = number + 1

for i := range stepList {
stepList[i].PipelineID = pipeline.ID
pipeline.Created = time.Now().UTC().Unix()
// only Insert set auto created ID back to object
if _, err := sess.Insert(stepList[i]); err != nil {
return err
if _, err := sess.Insert(pipeline); err != nil {
if isUniqueConstraintError(err) {
return struct{}{}, err
}
return struct{}{}, backoff.Permanent(err)
}

for i := range stepList {
stepList[i].PipelineID = pipeline.ID
// only Insert set auto created ID back to object
if _, err := sess.Insert(stepList[i]); err != nil {
if isUniqueConstraintError(err) {
return struct{}{}, err
}
return struct{}{}, backoff.Permanent(err)
}
}

return struct{}{}, sess.Commit()
}, backoff.WithBackOff(exponentialBackoff), backoff.WithMaxTries(maxRetries))

return err
}

// isUniqueConstraintError checks if an error is a unique constraint violation error.
func isUniqueConstraintError(err error) bool {
if err == nil {
return false
}

return sess.Commit()
errStr := err.Error()
// Check for common unique constraint error patterns across different databases
return strings.Contains(errStr, "duplicate key") ||
strings.Contains(errStr, "Duplicate entry") ||
strings.Contains(errStr, "UNIQUE constraint failed") ||
strings.Contains(errStr, "unique constraint") ||
strings.Contains(errStr, "UNIQUE violation")
}

func (s storage) UpdatePipeline(pipeline *model.Pipeline) error {
Expand Down