Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions assets/go-licenses.json

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions custom/conf/app.example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2713,3 +2713,9 @@ LEVEL = Info
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; storage type
;STORAGE_TYPE = local

;[global_lock]
;; Lock service type, could be memory or redis
;SERVICE_TYPE = memory
;; Ignored for the "memory" type. For "redis" use something like `redis://127.0.0.1:6379/0`
;SERVICE_CONN_STR =
12 changes: 10 additions & 2 deletions modules/globallock/globallock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@ package globallock
import (
"context"
"sync"

"code.gitea.io/gitea/modules/setting"
)

var (
defaultLocker Locker
initOnce sync.Once
initFunc = func() {
// TODO: read the setting and initialize the default locker.
// Before implementing this, don't use it.
switch setting.GlobalLock.ServiceType {
case "redis":
defaultLocker = NewRedisLocker(setting.GlobalLock.ServiceConnStr)
case "memory":
fallthrough
default:
defaultLocker = NewMemoryLocker()
}
} // define initFunc as a variable to make it possible to change it in tests
)

Expand Down
37 changes: 37 additions & 0 deletions modules/setting/gloabl_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package setting

import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/nosql"
)

// GlobalLock represents configuration of global lock
var GlobalLock = struct {
ServiceType string
ServiceConnStr string
}{
ServiceType: "memory",
}

func loadGlobalLockFrom(rootCfg ConfigProvider) {
sec := rootCfg.Section("global_lock")
GlobalLock.ServiceType = sec.Key("SERVICE_TYPE").MustString("memory")
switch GlobalLock.ServiceType {
case "memory":
case "redis":
connStr := sec.Key("SERVICE_CONN_STR").String()
if connStr == "" {
log.Fatal("SERVICE_CONN_STR is empty for redis")
}
u := nosql.ToRedisURI(connStr)
if u == nil {
log.Fatal("SERVICE_CONN_STR %s is not a valid redis connection string", connStr)
}
GlobalLock.ServiceConnStr = connStr
default:
log.Fatal("Unknown sync lock service type: %s", GlobalLock.ServiceType)
}
}
35 changes: 35 additions & 0 deletions modules/setting/global_lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package setting

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestLoadGlobalLockConfig(t *testing.T) {
t.Run("DefaultGlobalLockConfig", func(t *testing.T) {
iniStr := ``
cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)

loadGlobalLockFrom(cfg)
assert.EqualValues(t, "memory", GlobalLock.ServiceType)
})

t.Run("RedisGlobalLockConfig", func(t *testing.T) {
iniStr := `
[global_lock]
SERVICE_TYPE = redis
SERVICE_CONN_STR = addrs=127.0.0.1:6379 db=0
`
cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)

loadGlobalLockFrom(cfg)
assert.EqualValues(t, "redis", GlobalLock.ServiceType)
assert.EqualValues(t, "addrs=127.0.0.1:6379 db=0", GlobalLock.ServiceConnStr)
})
}
1 change: 1 addition & 0 deletions modules/setting/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func loadCommonSettingsFrom(cfg ConfigProvider) error {
loadGitFrom(cfg)
loadMirrorFrom(cfg)
loadMarkupFrom(cfg)
loadGlobalLockFrom(cfg)
loadOtherFrom(cfg)
return nil
}
Expand Down
69 changes: 0 additions & 69 deletions modules/sync/exclusive_pool.go

This file was deleted.

12 changes: 9 additions & 3 deletions services/pull/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/gitrepo"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
Expand Down Expand Up @@ -334,9 +335,14 @@ func handler(items ...string) []string {
}

func testPR(id int64) {
pullWorkingPool.CheckIn(fmt.Sprint(id))
defer pullWorkingPool.CheckOut(fmt.Sprint(id))
ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("Test PR[%d] from patch checking queue", id))
ctx, releaser, err := globallock.Lock(graceful.GetManager().HammerContext(), getPullWorkingLockKey(id))
if err != nil {
log.Error("lock.Lock(): %v", err)
return
}
defer releaser()

ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("Test PR[%d] from patch checking queue", id))
defer finished()

pr, err := issues_model.GetPullRequestByID(ctx, id)
Expand Down
17 changes: 13 additions & 4 deletions services/pull/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/cache"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/httplib"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/references"
Expand Down Expand Up @@ -169,8 +170,12 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U
return fmt.Errorf("unable to load head repo: %w", err)
}

pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
ctx, releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()

prUnit, err := pr.BaseRepo.GetUnit(ctx, unit.TypePullRequests)
if err != nil {
Expand Down Expand Up @@ -487,8 +492,12 @@ func CheckPullBranchProtections(ctx context.Context, pr *issues_model.PullReques

// MergedManually mark pr as merged manually
func MergedManually(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) error {
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
ctx, releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()

if err := db.WithTx(ctx, func(ctx context.Context) error {
if err := pr.LoadBaseRepo(ctx); err != nil {
Expand Down
15 changes: 10 additions & 5 deletions services/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@ import (
"code.gitea.io/gitea/modules/container"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/gitrepo"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
repo_module "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/sync"
"code.gitea.io/gitea/modules/util"
gitea_context "code.gitea.io/gitea/services/context"
issue_service "code.gitea.io/gitea/services/issue"
notify_service "code.gitea.io/gitea/services/notify"
)

// TODO: use clustered lock (unique queue? or *abuse* cache)
var pullWorkingPool = sync.NewExclusivePool()
func getPullWorkingLockKey(prID int64) string {
return fmt.Sprintf("pull_working_%d", prID)
}

// NewPullRequest creates new pull request with labels for repository.
func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *issues_model.Issue, labelIDs []int64, uuids []string, pr *issues_model.PullRequest, assigneeIDs []int64) error {
Expand Down Expand Up @@ -202,8 +203,12 @@ func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *iss

// ChangeTargetBranch changes the target branch of this pull request, as the given user.
func ChangeTargetBranch(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, targetBranch string) (err error) {
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
ctx, releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()

// Current target branch is already the same
if pr.BaseBranch == targetBranch {
Expand Down
9 changes: 7 additions & 2 deletions services/pull/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"code.gitea.io/gitea/models/unit"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/repository"
)
Expand All @@ -25,8 +26,12 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.
return fmt.Errorf("update of agit flow pull request's head branch is unsupported")
}

pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
ctx, releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()

diffCount, err := GetDiverging(ctx, pr)
if err != nil {
Expand Down
32 changes: 21 additions & 11 deletions services/repository/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ import (
project_model "code.gitea.io/gitea/models/project"
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/log"
repo_module "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/sync"
"code.gitea.io/gitea/modules/util"
notify_service "code.gitea.io/gitea/services/notify"
)

// repoWorkingPool represents a working pool to order the parallel changes to the same repository
// TODO: use clustered lock (unique queue? or *abuse* cache)
var repoWorkingPool = sync.NewExclusivePool()
func getRepoWorkingLockKey(repoID int64) string {
return fmt.Sprintf("repo_working_%d", repoID)
}

// TransferOwnership transfers all corresponding setting from old user to new one.
func TransferOwnership(ctx context.Context, doer, newOwner *user_model.User, repo *repo_model.Repository, teams []*organization.Team) error {
Expand All @@ -42,12 +42,17 @@ func TransferOwnership(ctx context.Context, doer, newOwner *user_model.User, rep

oldOwner := repo.Owner

repoWorkingPool.CheckIn(fmt.Sprint(repo.ID))
ctx, releaser, err := globallock.Lock(ctx, getRepoWorkingLockKey(repo.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}

if err := transferOwnership(ctx, doer, newOwner.Name, repo); err != nil {
repoWorkingPool.CheckOut(fmt.Sprint(repo.ID))
releaser()
return err
}
repoWorkingPool.CheckOut(fmt.Sprint(repo.ID))
releaser()

newRepo, err := repo_model.GetRepositoryByID(ctx, repo.ID)
if err != nil {
Expand Down Expand Up @@ -360,15 +365,20 @@ func ChangeRepositoryName(ctx context.Context, doer *user_model.User, repo *repo
oldRepoName := repo.Name

// Change repository directory name. We must lock the local copy of the
// repo so that we can atomically rename the repo path and updates the
// repo so that we can automatically rename the repo path and updates the
// local copy's origin accordingly.

repoWorkingPool.CheckIn(fmt.Sprint(repo.ID))
ctx, releaser, err := globallock.Lock(ctx, getRepoWorkingLockKey(repo.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}

if err := changeRepositoryName(ctx, repo, newRepoName); err != nil {
repoWorkingPool.CheckOut(fmt.Sprint(repo.ID))
releaser()
return err
}
repoWorkingPool.CheckOut(fmt.Sprint(repo.ID))
releaser()

repo.Name = newRepoName
notify_service.RenameRepository(ctx, doer, repo, oldRepoName)
Expand Down
Loading