diff --git a/api/tasks/tasks.go b/api/tasks/tasks.go index 0d7f68fb4..9d8125c75 100644 --- a/api/tasks/tasks.go +++ b/api/tasks/tasks.go @@ -1,9 +1,10 @@ package tasks import ( - "github.com/semaphoreui/semaphore/pkg/task_logger" "net/http" + "github.com/semaphoreui/semaphore/pkg/task_logger" + "github.com/semaphoreui/semaphore/api/helpers" "github.com/semaphoreui/semaphore/db" task2 "github.com/semaphoreui/semaphore/services/tasks" @@ -44,7 +45,7 @@ func GetTasks(w http.ResponseWriter, r *http.Request) { res := []taskRes{} - for _, task := range pool.Queue { + for _, task := range pool.GetQueuedTasks() { res = append(res, taskRes{ TaskID: task.Task.ID, ProjectID: task.Task.ProjectID, @@ -55,7 +56,7 @@ func GetTasks(w http.ResponseWriter, r *http.Request) { }) } - for _, task := range pool.RunningTasks { + for _, task := range pool.GetRunningTasks() { res = append(res, taskRes{ TaskID: task.Task.ID, ProjectID: task.Task.ProjectID, @@ -77,7 +78,7 @@ func DeleteTask(w http.ResponseWriter, r *http.Request) { var task *db.Task - for _, t := range pool.Queue { + for _, t := range pool.GetQueuedTasks() { if t.Task.ID == taskID { task = &t.Task break @@ -85,7 +86,7 @@ func DeleteTask(w http.ResponseWriter, r *http.Request) { } if task == nil { - for _, t := range pool.RunningTasks { + for _, t := range pool.GetRunningTasks() { if t.Task.ID == taskID { task = &t.Task break diff --git a/cli/cmd/root.go b/cli/cmd/root.go index fdcf5c920..0d5e5d23c 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -17,6 +17,7 @@ import ( "github.com/semaphoreui/semaphore/db/factory" proFactory "github.com/semaphoreui/semaphore/pro/db/factory" proServer "github.com/semaphoreui/semaphore/pro/services/server" + proTasks "github.com/semaphoreui/semaphore/pro/services/tasks" "github.com/semaphoreui/semaphore/services/schedules" "github.com/semaphoreui/semaphore/services/tasks" "github.com/semaphoreui/semaphore/util" @@ -72,6 +73,7 @@ func Execute() { func runService() { store := createStore("root") + state := proTasks.NewTaskStateStore() terraformStore := proFactory.NewTerraformStore(store) ansibleTaskRepo := proFactory.NewAnsibleTaskRepository(store) @@ -93,6 +95,7 @@ func runService() { taskPool := tasks.CreateTaskPool( store, + state, ansibleTaskRepo, inventoryService, encryptionService, diff --git a/go.mod b/go.mod index 984237ce2..2ff61dce8 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/lib/pq v1.10.9 github.com/mdp/qrterminal/v3 v3.2.1 github.com/pquerna/otp v1.4.0 + github.com/redis/go-redis/v9 v9.7.0 github.com/robfig/cron/v3 v3.0.1 github.com/semaphoreui/semaphore/pro v0.0.0 github.com/sirupsen/logrus v1.9.3 @@ -41,9 +42,11 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/ProtonMail/go-crypto v1.1.6 // indirect github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cloudflare/circl v1.6.1 // indirect github.com/cyphar/filepath-securejoin v0.4.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect diff --git a/go.sum b/go.sum index cdde3b46c..008b98fcf 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,12 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc h1:biVzkmvwrH8WK8raXaxBx6fRVTlJILwEwQGL1I/ByEI= github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudflare/circl v1.6.1 h1:zqIqSPIndyBh1bjLVVDHMPpVKqp8Su/V+6MeDzzQBQ0= github.com/cloudflare/circl v1.6.1/go.mod h1:uddAzsPgqdMAYatqJ0lsjX1oECcQLIlRpzZh3pJrofs= github.com/coreos/go-oidc/v3 v3.14.1 h1:9ePWwfdwC4QKRlCXsJGou56adA/owXczOzwKdOumLqk= @@ -31,6 +37,8 @@ github.com/cyphar/filepath-securejoin v0.4.1/go.mod h1:Sdj7gXlvMcPZsbhwhQ33GguGL github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elazarl/goproxy v1.7.2 h1:Y2o6urb7Eule09PjlhQRGNsqRfPmYI3KKQLFpCAV3+o= @@ -135,6 +143,8 @@ github.com/poy/onpar v1.1.2 h1:QaNrNiZx0+Nar5dLgTVp5mXkyoVFIbepjyEoGSnhbAY= github.com/poy/onpar v1.1.2/go.mod h1:6X8FLNoxyr9kkmnlqpK6LSoiOtrO6MICtWwEuWkLjzg= github.com/pquerna/otp v1.4.0 h1:wZvl1TIVxKRThZIBiwOOHOGP/1+nZyWBil9Y2XNEDzg= github.com/pquerna/otp v1.4.0/go.mod h1:dkJfzwRKNiegxyNb54X/3fLwhCynbMspSyWKnvi1AEg= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= diff --git a/pro/services/tasks/task_state_store_factory.go b/pro/services/tasks/task_state_store_factory.go new file mode 100644 index 000000000..19e8fdcb5 --- /dev/null +++ b/pro/services/tasks/task_state_store_factory.go @@ -0,0 +1,9 @@ +package tasks + +import ( + "github.com/semaphoreui/semaphore/services/tasks" +) + +func NewTaskStateStore() tasks.TaskStateStore { + return tasks.NewMemoryTaskStateStore() +} diff --git a/services/tasks/RemoteJob.go b/services/tasks/RemoteJob.go index c02a22823..c78e19654 100644 --- a/services/tasks/RemoteJob.go +++ b/services/tasks/RemoteJob.go @@ -4,11 +4,12 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/semaphoreui/semaphore/pkg/tz" - log "github.com/sirupsen/logrus" "net/http" "time" + "github.com/semaphoreui/semaphore/pkg/tz" + log "github.com/sirupsen/logrus" + "github.com/semaphoreui/semaphore/db" "github.com/semaphoreui/semaphore/pkg/task_logger" "github.com/semaphoreui/semaphore/util" @@ -93,6 +94,9 @@ func (t *RemoteJob) Run(username string, incomingVersion *string, alias string) tsk.IncomingVersion = incomingVersion tsk.Username = username tsk.Alias = alias + if t.taskPool != nil && t.taskPool.state != nil { + t.taskPool.state.UpdateRuntimeFields(tsk) + } var runners []db.Runner db.StoreSession(t.taskPool.store, "run remote job", func() { @@ -141,6 +145,9 @@ func (t *RemoteJob) Run(username string, incomingVersion *string, alias string) } tsk.RunnerID = runner.ID + if t.taskPool != nil && t.taskPool.state != nil { + t.taskPool.state.UpdateRuntimeFields(tsk) + } startTime := tz.Now() @@ -154,6 +161,12 @@ func (t *RemoteJob) Run(username string, incomingVersion *string, alias string) time.Sleep(1_000_000_000) tsk = t.taskPool.GetTask(t.Task.ID) + + if tsk == nil { + err = fmt.Errorf("task %d not found", t.Task.ID) + return + } + if tsk.Task.Status == task_logger.TaskSuccessStatus || tsk.Task.Status == task_logger.TaskStoppedStatus || tsk.Task.Status == task_logger.TaskFailStatus { diff --git a/services/tasks/TaskPool.go b/services/tasks/TaskPool.go index d95d22563..bad4d309b 100644 --- a/services/tasks/TaskPool.go +++ b/services/tasks/TaskPool.go @@ -2,16 +2,16 @@ package tasks import ( "fmt" + "regexp" + "strconv" + "strings" + "time" + "github.com/semaphoreui/semaphore/pkg/random" "github.com/semaphoreui/semaphore/pkg/tz" "github.com/semaphoreui/semaphore/pro/pkg/stage_parsers" "github.com/semaphoreui/semaphore/pro_interfaces" "github.com/semaphoreui/semaphore/services/server" - "regexp" - "slices" - "strconv" - "strings" - "time" "github.com/semaphoreui/semaphore/db" "github.com/semaphoreui/semaphore/db_lib" @@ -42,18 +42,9 @@ type PoolEvent struct { } type TaskPool struct { - // Queue contains list of tasks in status TaskWaitingStatus. - Queue []*TaskRunner - // register channel used to put tasks to queue. register chan *TaskRunner - // activeProj ??? - activeProj map[int]map[int]*TaskRunner - - // RunningTasks contains tasks with status TaskRunningStatus. Map key is a task ID. - RunningTasks map[int]*TaskRunner - // logger channel used to putting log records to database. logger chan logRecord @@ -66,10 +57,39 @@ type TaskPool struct { queueEvents chan PoolEvent - aliases map[string]*TaskRunner + // state provides pluggable storage for Queue, active projects, running tasks and aliases + state TaskStateStore } func CreateTaskPool( + store db.Store, + state TaskStateStore, + ansibleTaskRepo db.AnsibleTaskRepository, + inventoryService server.InventoryService, + encryptionService server.AccessKeyEncryptionService, + keyInstallationService server.AccessKeyInstallationService, + logWriteService pro_interfaces.LogWriteService, +) TaskPool { + p := TaskPool{ + register: make(chan *TaskRunner), // add TaskRunner to queue + logger: make(chan logRecord, 10000), // store log records to database + store: store, + state: state, + queueEvents: make(chan PoolEvent), + inventoryService: inventoryService, + ansibleTaskRepo: ansibleTaskRepo, + encryptionService: encryptionService, + logWriteService: logWriteService, + keyInstallationService: keyInstallationService, + } + // attempt to start HA state store (no-op for memory) + _ = p.state.Start(p.hydrateTaskRunner) + return p +} + +// CreateTaskPoolWithState allows passing a custom TaskStateStore (e.g., Redis-backed) +func CreateTaskPoolWithState( + stateStore TaskStateStore, store db.Store, ansibleTaskRepo db.AnsibleTaskRepository, inventoryService server.InventoryService, @@ -77,24 +97,23 @@ func CreateTaskPool( keyInstallationService server.AccessKeyInstallationService, logWriteService pro_interfaces.LogWriteService, ) TaskPool { - return TaskPool{ - Queue: make([]*TaskRunner, 0), // queue of waiting tasks - register: make(chan *TaskRunner), // add TaskRunner to queue - activeProj: make(map[int]map[int]*TaskRunner), - RunningTasks: make(map[int]*TaskRunner), // working tasks + p := TaskPool{ + register: make(chan *TaskRunner), // add TaskRunner to queue logger: make(chan logRecord, 10000), // store log records to database store: store, queueEvents: make(chan PoolEvent), - aliases: make(map[string]*TaskRunner), + state: stateStore, inventoryService: inventoryService, ansibleTaskRepo: ansibleTaskRepo, encryptionService: encryptionService, logWriteService: logWriteService, keyInstallationService: keyInstallationService, } + _ = p.state.Start(p.hydrateTaskRunner) + return p } func (p *TaskPool) GetNumberOfRunningTasksOfRunner(runnerID int) (res int) { - for _, task := range p.RunningTasks { + for _, task := range p.state.RunningRange() { if task.RunnerID == runnerID { res++ } @@ -103,15 +122,11 @@ func (p *TaskPool) GetNumberOfRunningTasksOfRunner(runnerID int) (res int) { } func (p *TaskPool) GetRunningTasks() (res []*TaskRunner) { - for _, task := range p.RunningTasks { - res = append(res, task) - } - return + return p.state.RunningRange() } func (p *TaskPool) GetTask(id int) (task *TaskRunner) { - - for _, t := range p.Queue { + for _, t := range p.state.QueueRange() { if t.Task.ID == id { task = t break @@ -119,7 +134,7 @@ func (p *TaskPool) GetTask(id int) (task *TaskRunner) { } if task == nil { - for _, t := range p.RunningTasks { + for _, t := range p.state.RunningRange() { if t.Task.ID == id { task = t break @@ -131,7 +146,7 @@ func (p *TaskPool) GetTask(id int) (task *TaskRunner) { } func (p *TaskPool) GetTaskByAlias(alias string) (task *TaskRunner) { - return p.aliases[alias] + return p.state.GetByAlias(alias) } // nolint: gocyclo @@ -169,22 +184,26 @@ func (p *TaskPool) handleQueue() { for t := range p.queueEvents { switch t.eventType { case EventTypeNew: - p.Queue = append(p.Queue, t.task) + p.state.Enqueue(t.task) case EventTypeFinished: p.onTaskStop(t.task) } - if len(p.Queue) == 0 { + if p.state.QueueLen() == 0 { continue } var i = 0 - for i < len(p.Queue) { - curr := p.Queue[i] + for i < p.state.QueueLen() { + curr := p.state.QueueGet(i) + if curr == nil { // item may no longer be local, move ahead + i = i + 1 + continue + } if curr.Task.Status == task_logger.TaskFailStatus { //delete failed TaskRunner from queue - p.Queue = slices.Delete(p.Queue, i, i+1) + _ = p.state.DequeueAt(i) log.Info("Task " + strconv.Itoa(curr.Task.ID) + " removed from queue") continue } @@ -194,7 +213,13 @@ func (p *TaskPool) handleQueue() { continue } - p.Queue = slices.Delete(p.Queue, i, i+1) + // ensure only one instance claims the task before dequeue + if !p.state.TryClaim(curr.Task.ID) { + i = i + 1 + continue + } + + _ = p.state.DequeueAt(i) runTask(curr, p) } } @@ -246,7 +271,6 @@ func (p *TaskPool) handleLogs() { func runTask(task *TaskRunner, p *TaskPool) { log.Info("Set resource locker with TaskRunner " + strconv.Itoa(task.Task.ID)) - p.onTaskRun(task) log.Info("Task " + strconv.Itoa(task.Task.ID) + " started") @@ -254,39 +278,73 @@ func runTask(task *TaskRunner, p *TaskPool) { } func (p *TaskPool) onTaskRun(t *TaskRunner) { - projTasks, ok := p.activeProj[t.Task.ProjectID] - if !ok { - projTasks = make(map[int]*TaskRunner) - p.activeProj[t.Task.ProjectID] = projTasks - } - projTasks[t.Task.ID] = t - p.RunningTasks[t.Task.ID] = t - p.aliases[t.Alias] = t + p.state.AddActive(t.Task.ProjectID, t) + p.state.SetRunning(t) + if t.Alias != "" { + p.state.SetAlias(t.Alias, t) + } } func (p *TaskPool) onTaskStop(t *TaskRunner) { - if p.activeProj[t.Task.ProjectID] != nil && p.activeProj[t.Task.ProjectID][t.Task.ID] != nil { - delete(p.activeProj[t.Task.ProjectID], t.Task.ID) - if len(p.activeProj[t.Task.ProjectID]) == 0 { - delete(p.activeProj, t.Task.ProjectID) - } + p.state.RemoveActive(t.Task.ProjectID, t.Task.ID) + p.state.DeleteRunning(t.Task.ID) + p.state.DeleteClaim(t.Task.ID) + if t.Alias != "" { + p.state.DeleteAlias(t.Alias) } +} - delete(p.RunningTasks, t.Task.ID) - delete(p.aliases, t.Alias) +// hydrateTaskRunner builds a TaskRunner for an existing task from DB without starting it +func (p *TaskPool) hydrateTaskRunner(taskID int, projectID int) (*TaskRunner, error) { + task, err := p.store.GetTask(projectID, taskID) + if err != nil { + return nil, err + } + tr := NewTaskRunner(task, p, "", p.keyInstallationService) + if err := tr.populateDetails(); err != nil { + return nil, err + } + // load runtime fields from HA store (e.g., Redis) + if p.state != nil { + p.state.LoadRuntimeFields(tr) + } + // set appropriate job handler for consistency (not run) + var job Job + if util.Config.UseRemoteRunner || tr.Template.RunnerTag != nil || tr.Inventory.RunnerTag != nil { + tag := tr.Template.RunnerTag + if tag == nil { + tag = tr.Inventory.RunnerTag + } + job = &RemoteJob{RunnerTag: tag, Task: tr.Task, taskPool: p} + } else { + app := db_lib.CreateApp(tr.Template, tr.Repository, tr.Inventory, tr) + job = &LocalJob{ + Task: tr.Task, + Template: tr.Template, + Inventory: tr.Inventory, + Repository: tr.Repository, + Environment: tr.Environment, + Secret: "{}", + Logger: app.SetLogger(tr), + App: app, + KeyInstaller: p.keyInstallationService, + } + } + tr.job = job + return tr, nil } func (p *TaskPool) blocks(t *TaskRunner) bool { - if util.Config.MaxParallelTasks > 0 && len(p.RunningTasks) >= util.Config.MaxParallelTasks { + if util.Config.MaxParallelTasks > 0 && p.state.RunningCount() >= util.Config.MaxParallelTasks { return true } - if p.activeProj[t.Task.ProjectID] == nil || len(p.activeProj[t.Task.ProjectID]) == 0 { + if p.state.ActiveCount(t.Task.ProjectID) == 0 { return false } - for _, r := range p.activeProj[t.Task.ProjectID] { + for _, r := range p.state.GetActive(t.Task.ProjectID) { if r.Task.Status.IsFinished() { continue } @@ -302,7 +360,7 @@ func (p *TaskPool) blocks(t *TaskRunner) bool { return false } - res := proj.MaxParallelTasks > 0 && len(p.activeProj[t.Task.ProjectID]) >= proj.MaxParallelTasks + res := proj.MaxParallelTasks > 0 && p.state.ActiveCount(t.Task.ProjectID) >= proj.MaxParallelTasks if res { return true @@ -364,6 +422,11 @@ func (p *TaskPool) StopTask(targetTask db.Task, forceStop bool) error { return nil } +// GetQueuedTasks returns a snapshot of tasks currently queued +func (p *TaskPool) GetQueuedTasks() []*TaskRunner { + return p.state.QueueRange() +} + func getNextBuildVersion(startVersion string, currentVersion string) string { re := regexp.MustCompile(`^(.*[^\d])?(\d+)([^\d].*)?$`) m := re.FindStringSubmatch(startVersion) diff --git a/services/tasks/TaskRunner.go b/services/tasks/TaskRunner.go index ba97d343a..257118c61 100644 --- a/services/tasks/TaskRunner.go +++ b/services/tasks/TaskRunner.go @@ -103,6 +103,10 @@ func (t *TaskRunner) saveStatus() { if err := t.pool.store.UpdateTask(t.Task); err != nil { t.panicOnError(err, "Failed to update TaskRunner status") } + // persist runtime fields in HA store + if t.pool != nil && t.pool.state != nil { + t.pool.state.UpdateRuntimeFields(t) + } } func (t *TaskRunner) kill() { diff --git a/services/tasks/TaskRunner_logging.go b/services/tasks/TaskRunner_logging.go index 464800e30..0dca84e7d 100644 --- a/services/tasks/TaskRunner_logging.go +++ b/services/tasks/TaskRunner_logging.go @@ -4,11 +4,12 @@ import ( "bufio" "encoding/json" "fmt" - "github.com/semaphoreui/semaphore/pkg/tz" "io" "os/exec" "time" + "github.com/semaphoreui/semaphore/pkg/tz" + "github.com/semaphoreui/semaphore/api/sockets" "github.com/semaphoreui/semaphore/pkg/task_logger" "github.com/semaphoreui/semaphore/util" @@ -180,7 +181,6 @@ func (t *TaskRunner) logPipe(reader io.Reader) { return // it is ok case "bufio.Scanner: token too long": msg = "TaskRunner output exceeds the maximum allowed size of 10MB" - break } t.kill() // kill the job because stdout cannot be read. diff --git a/services/tasks/TaskRunner_test.go b/services/tasks/TaskRunner_test.go index 1cc2ab3e9..9f35b0c6c 100644 --- a/services/tasks/TaskRunner_test.go +++ b/services/tasks/TaskRunner_test.go @@ -1,13 +1,14 @@ package tasks import ( - "github.com/semaphoreui/semaphore/pkg/ssh" "math/rand" "os" "path" "strings" "testing" + "github.com/semaphoreui/semaphore/pkg/ssh" + "github.com/semaphoreui/semaphore/pkg/task_logger" "github.com/semaphoreui/semaphore/pro_interfaces" "github.com/stretchr/testify/assert" @@ -73,6 +74,7 @@ func TestTaskRunnerRun(t *testing.T) { pool := CreateTaskPool( store, + &MemoryTaskStateStore{}, nil, &InventoryServiceMock{}, nil, diff --git a/services/tasks/redis_task_state_store.go b/services/tasks/redis_task_state_store.go new file mode 100644 index 000000000..72a46da76 --- /dev/null +++ b/services/tasks/redis_task_state_store.go @@ -0,0 +1,671 @@ +package tasks + +import ( + "context" + "crypto/tls" + "encoding/json" + "strconv" + "strings" + "sync" + + "github.com/redis/go-redis/v9" + "github.com/semaphoreui/semaphore/util" + log "github.com/sirupsen/logrus" +) + +// RedisTaskStateStore is a Redis-backed implementation of TaskStateStore. +// Notes: +// - It stores only task identifiers in Redis and keeps an in-process pointer cache +// to resolve TaskRunner instances. This is sufficient for single-process +// deployments and basic multi-process visibility. For true cross-process +// pointer resolution, a separate hydration mechanism would be required. +type RedisTaskStateStore struct { + client *redis.Client + keyPrefix string + + mu sync.RWMutex + byID map[int]*TaskRunner + byAlias map[string]*TaskRunner + + // pub/sub + pubsub *redis.PubSub + cancelListen context.CancelFunc +} + +func NewRedisTaskStateStore() *RedisTaskStateStore { + keyPrefix := "tasks:" + + p := keyPrefix + if p != "" && !strings.HasSuffix(p, ":") { + p += ":" + } + + var redisTLS *tls.Config + var addr string + var dbNum int + var pass string + var user string + var skipVerify bool + var enableTLS bool + + if util.Config.HA != nil && util.Config.HA.Redis != nil { + addr = util.Config.HA.Redis.Addr + dbNum = util.Config.HA.Redis.DB + pass = util.Config.HA.Redis.Pass + user = util.Config.HA.Redis.User + enableTLS = util.Config.HA.Redis.TLS + skipVerify = util.Config.HA.Redis.TLSSkipVerify + } + if enableTLS { + redisTLS = &tls.Config{InsecureSkipVerify: skipVerify} + } + + if addr == "" { + addr = "127.0.0.1:6379" + } + + client := redis.NewClient(&redis.Options{ + Addr: addr, + DB: dbNum, + Password: pass, + Username: user, + TLSConfig: redisTLS, + }) + + return &RedisTaskStateStore{ + client: client, + keyPrefix: p, + byID: make(map[int]*TaskRunner), + byAlias: make(map[string]*TaskRunner), + } +} + +func (s *RedisTaskStateStore) key(parts ...string) string { + return s.keyPrefix + strings.Join(parts, ":") +} + +// redis message envelope +type redisEvent struct { + Type string `json:"type"` + Data json.RawMessage `json:"data"` +} + +type taskRef struct { + TaskID int `json:"task_id"` + ProjectID int `json:"project_id"` + Alias string `json:"alias,omitempty"` + RunnerID int `json:"runner_id,omitempty"` + Username string `json:"username,omitempty"` + Incoming string `json:"incoming_version,omitempty"` +} + +func (s *RedisTaskStateStore) publish(ctx context.Context, ev redisEvent) { + b, _ := json.Marshal(ev) + if err := s.client.Publish(ctx, s.key("events"), string(b)).Err(); err != nil { + log.WithError(err).Error("redis publish failed") + } +} + +// Start restores state from Redis and begins listening to Pub/Sub events +func (s *RedisTaskStateStore) Start(hydrator TaskRunnerHydrator) error { + ctx, cancel := context.WithCancel(context.Background()) + s.cancelListen = cancel + + // Restore queued tasks + ids, err := s.client.LRange(ctx, s.key("queue"), 0, -1).Result() + if err != nil { + log.WithError(err).Error("redis restore queue failed") + } else { + for _, idStr := range ids { + id, convErr := strconv.Atoi(idStr) + if convErr != nil { + continue + } + // We need project id; store it next to task id in a hash + projStr, herr := s.client.HGet(ctx, s.key("task_project"), idStr).Result() + if herr != nil { + continue + } + projID, _ := strconv.Atoi(projStr) + if hydrator != nil { + if tr, hErr := hydrator(id, projID); hErr == nil && tr != nil { + s.mu.Lock() + s.byID[id] = tr + if tr.Alias != "" { + s.byAlias[tr.Alias] = tr + } + s.mu.Unlock() + } + } + } + } + + // Restore active tasks by project + // Find all keys tasks:active:* + var cursor uint64 + for { + keys, cur, err := s.client.Scan(ctx, cursor, s.key("active", "*"), 100).Result() + if err != nil { + log.WithError(err).Error("redis scan active keys failed") + break + } + for _, k := range keys { + // extract project id from key suffix + parts := strings.Split(k, ":") + if len(parts) == 0 { + continue + } + projStr := parts[len(parts)-1] + projectID, _ := strconv.Atoi(projStr) + ids, gerr := s.client.SMembers(ctx, k).Result() + if gerr != nil { + continue + } + for _, idStr := range ids { + id, convErr := strconv.Atoi(idStr) + if convErr != nil { + continue + } + if hydrator != nil { + if tr, hErr := hydrator(id, projectID); hErr == nil && tr != nil { + s.mu.Lock() + s.byID[id] = tr + if tr.Alias != "" { + s.byAlias[tr.Alias] = tr + } + s.mu.Unlock() + } + } + } + } + cursor = cur + if cursor == 0 { + break + } + } + + // Restore running tasks set + runIDs, err := s.client.SMembers(ctx, s.key("running")).Result() + if err != nil { + log.WithError(err).Error("redis restore running failed") + } else { + for _, idStr := range runIDs { + id, convErr := strconv.Atoi(idStr) + if convErr != nil { + continue + } + projStr, herr := s.client.HGet(ctx, s.key("task_project"), idStr).Result() + if herr != nil { + continue + } + projID, _ := strconv.Atoi(projStr) + if hydrator != nil { + if tr, hErr := hydrator(id, projID); hErr == nil && tr != nil { + s.mu.Lock() + s.byID[id] = tr + if tr.Alias != "" { + s.byAlias[tr.Alias] = tr + } + s.mu.Unlock() + } + } + } + } + + // Restore aliases pointers from Redis hash where value is task id + aliasMap, err := s.client.HGetAll(ctx, s.key("aliases")).Result() + if err == nil { + for alias, idStr := range aliasMap { + id, convErr := strconv.Atoi(idStr) + if convErr != nil { + continue + } + projStr, herr := s.client.HGet(ctx, s.key("task_project"), idStr).Result() + if herr != nil { + continue + } + projID, _ := strconv.Atoi(projStr) + if hydrator != nil { + if tr, hErr := hydrator(id, projID); hErr == nil && tr != nil { + s.mu.Lock() + s.byID[id] = tr + s.byAlias[alias] = tr + s.mu.Unlock() + } + } + } + } + + // Start Pub/Sub listener + s.pubsub = s.client.Subscribe(ctx, s.key("events")) + go func() { + for { + msg, rerr := s.pubsub.ReceiveMessage(ctx) + if rerr != nil { + return + } + var ev redisEvent + if err := json.Unmarshal([]byte(msg.Payload), &ev); err != nil { + continue + } + switch ev.Type { + case "enqueue": + var ref taskRef + if json.Unmarshal(ev.Data, &ref) == nil && hydrator != nil { + if tr, hErr := hydrator(ref.TaskID, ref.ProjectID); hErr == nil && tr != nil { + s.mu.Lock() + s.byID[ref.TaskID] = tr + if ref.Alias != "" { + s.byAlias[ref.Alias] = tr + } + // restore runtime fields + if ref.RunnerID != 0 { + tr.RunnerID = ref.RunnerID + } + if ref.Username != "" { + tr.Username = ref.Username + } + if ref.Incoming != "" { + v := ref.Incoming + tr.IncomingVersion = &v + } + s.mu.Unlock() + } + } + case "dequeue": + var ref taskRef + if json.Unmarshal(ev.Data, &ref) == nil { + s.mu.Lock() + delete(s.byID, ref.TaskID) + s.mu.Unlock() + } + case "set_running": + var ref taskRef + if json.Unmarshal(ev.Data, &ref) == nil && hydrator != nil { + if tr, hErr := hydrator(ref.TaskID, ref.ProjectID); hErr == nil && tr != nil { + s.mu.Lock() + s.byID[ref.TaskID] = tr + if ref.Alias != "" { + s.byAlias[ref.Alias] = tr + } + if ref.RunnerID != 0 { + tr.RunnerID = ref.RunnerID + } + if ref.Username != "" { + tr.Username = ref.Username + } + if ref.Incoming != "" { + v := ref.Incoming + tr.IncomingVersion = &v + } + s.mu.Unlock() + } + } + case "delete_running": + var ref taskRef + if json.Unmarshal(ev.Data, &ref) == nil { + s.mu.Lock() + delete(s.byID, ref.TaskID) + s.mu.Unlock() + } + case "active_add": + var ref taskRef + if json.Unmarshal(ev.Data, &ref) == nil && hydrator != nil { + if tr, hErr := hydrator(ref.TaskID, ref.ProjectID); hErr == nil && tr != nil { + s.mu.Lock() + s.byID[ref.TaskID] = tr + if ref.RunnerID != 0 { + tr.RunnerID = ref.RunnerID + } + if ref.Username != "" { + tr.Username = ref.Username + } + if ref.Incoming != "" { + v := ref.Incoming + tr.IncomingVersion = &v + } + s.mu.Unlock() + } + } + case "active_remove": + var ref taskRef + if json.Unmarshal(ev.Data, &ref) == nil { + s.mu.Lock() + delete(s.byID, ref.TaskID) + s.mu.Unlock() + } + case "alias_set": + var ref taskRef + if json.Unmarshal(ev.Data, &ref) == nil && hydrator != nil { + if tr, hErr := hydrator(ref.TaskID, ref.ProjectID); hErr == nil && tr != nil { + s.mu.Lock() + s.byID[ref.TaskID] = tr + if ref.Alias != "" { + s.byAlias[ref.Alias] = tr + } + if ref.RunnerID != 0 { + tr.RunnerID = ref.RunnerID + } + if ref.Username != "" { + tr.Username = ref.Username + } + if ref.Incoming != "" { + v := ref.Incoming + tr.IncomingVersion = &v + } + s.mu.Unlock() + } + } + case "alias_delete": + var ref taskRef + if json.Unmarshal(ev.Data, &ref) == nil { + s.mu.Lock() + delete(s.byAlias, ref.Alias) + s.mu.Unlock() + } + } + } + }() + + return nil +} + +// Queue operations +func (s *RedisTaskStateStore) Enqueue(task *TaskRunner) { + s.mu.Lock() + s.byID[task.Task.ID] = task + s.mu.Unlock() + ctx := context.Background() + if err := s.client.RPush(ctx, s.key("queue"), strconv.Itoa(task.Task.ID)).Err(); err != nil { + log.WithError(err).Error("redis enqueue failed") + } + // store project for hydrator + _ = s.client.HSet(ctx, s.key("task_project"), strconv.Itoa(task.Task.ID), strconv.Itoa(task.Task.ProjectID)).Err() + // notify others with runtime fields + s.publish(ctx, redisEvent{Type: "enqueue", Data: mustJSON(taskRef{TaskID: task.Task.ID, ProjectID: task.Task.ProjectID, Alias: task.Alias, RunnerID: task.RunnerID, Username: task.Username, Incoming: derefStr(task.IncomingVersion)})}) +} + +func (s *RedisTaskStateStore) DequeueAt(index int) error { + ctx := context.Background() + idStr, err := s.client.LIndex(ctx, s.key("queue"), int64(index)).Result() + if err != nil { + return nil + } + if err := s.client.LRem(ctx, s.key("queue"), 1, idStr).Err(); err != nil { + log.WithError(err).Error("redis dequeue failed") + } + if id, convErr := strconv.Atoi(idStr); convErr == nil { + s.publish(ctx, redisEvent{Type: "dequeue", Data: mustJSON(taskRef{TaskID: id})}) + } + return nil +} + +func (s *RedisTaskStateStore) QueueRange() []*TaskRunner { + ctx := context.Background() + ids, err := s.client.LRange(ctx, s.key("queue"), 0, -1).Result() + if err != nil { + log.WithError(err).Error("redis queue range failed") + return nil + } + s.mu.RLock() + res := make([]*TaskRunner, 0, len(ids)) + for _, idStr := range ids { + id, convErr := strconv.Atoi(idStr) + if convErr != nil { + continue + } + if t := s.byID[id]; t != nil { + res = append(res, t) + } + } + s.mu.RUnlock() + return res +} + +func (s *RedisTaskStateStore) QueueGet(index int) *TaskRunner { + ctx := context.Background() + idStr, err := s.client.LIndex(ctx, s.key("queue"), int64(index)).Result() + if err != nil { + return nil + } + id, convErr := strconv.Atoi(idStr) + if convErr != nil { + return nil + } + s.mu.RLock() + t := s.byID[id] + s.mu.RUnlock() + return t +} + +func (s *RedisTaskStateStore) QueueLen() int { + ctx := context.Background() + n, err := s.client.LLen(ctx, s.key("queue")).Result() + if err != nil { + log.WithError(err).Error("redis queue len failed") + return 0 + } + return int(n) +} + +// Running operations +func (s *RedisTaskStateStore) SetRunning(task *TaskRunner) { + s.mu.Lock() + s.byID[task.Task.ID] = task + s.mu.Unlock() + ctx := context.Background() + if err := s.client.SAdd(ctx, s.key("running"), task.Task.ID).Err(); err != nil { + log.WithError(err).Error("redis set running failed") + } + s.publish(ctx, redisEvent{Type: "set_running", Data: mustJSON(taskRef{TaskID: task.Task.ID, ProjectID: task.Task.ProjectID, Alias: task.Alias, RunnerID: task.RunnerID, Username: task.Username, Incoming: derefStr(task.IncomingVersion)})}) +} + +func (s *RedisTaskStateStore) DeleteRunning(taskID int) { + ctx := context.Background() + if err := s.client.SRem(ctx, s.key("running"), taskID).Err(); err != nil { + log.WithError(err).Error("redis delete running failed") + } + s.publish(ctx, redisEvent{Type: "delete_running", Data: mustJSON(taskRef{TaskID: taskID})}) +} + +func (s *RedisTaskStateStore) RunningRange() []*TaskRunner { + ctx := context.Background() + ids, err := s.client.SMembers(ctx, s.key("running")).Result() + if err != nil { + log.WithError(err).Error("redis running range failed") + return nil + } + s.mu.RLock() + res := make([]*TaskRunner, 0, len(ids)) + for _, idStr := range ids { + id, convErr := strconv.Atoi(idStr) + if convErr != nil { + continue + } + if t := s.byID[id]; t != nil { + res = append(res, t) + } + } + s.mu.RUnlock() + return res +} + +func (s *RedisTaskStateStore) RunningCount() int { + ctx := context.Background() + n, err := s.client.SCard(ctx, s.key("running")).Result() + if err != nil { + log.WithError(err).Error("redis running count failed") + return 0 + } + return int(n) +} + +// Active-by-project operations +func (s *RedisTaskStateStore) AddActive(projectID int, task *TaskRunner) { + s.mu.Lock() + s.byID[task.Task.ID] = task + s.mu.Unlock() + ctx := context.Background() + if err := s.client.SAdd(ctx, s.key("active", strconv.Itoa(projectID)), task.Task.ID).Err(); err != nil { + log.WithError(err).Error("redis add active failed") + } + _ = s.client.HSet(ctx, s.key("task_project"), strconv.Itoa(task.Task.ID), strconv.Itoa(projectID)).Err() + s.publish(ctx, redisEvent{Type: "active_add", Data: mustJSON(taskRef{TaskID: task.Task.ID, ProjectID: projectID, RunnerID: task.RunnerID, Username: task.Username, Incoming: derefStr(task.IncomingVersion)})}) +} + +func (s *RedisTaskStateStore) RemoveActive(projectID int, taskID int) { + ctx := context.Background() + if err := s.client.SRem(ctx, s.key("active", strconv.Itoa(projectID)), taskID).Err(); err != nil { + log.WithError(err).Error("redis remove active failed") + } + s.publish(ctx, redisEvent{Type: "active_remove", Data: mustJSON(taskRef{TaskID: taskID, ProjectID: projectID})}) +} + +func (s *RedisTaskStateStore) GetActive(projectID int) []*TaskRunner { + ctx := context.Background() + ids, err := s.client.SMembers(ctx, s.key("active", strconv.Itoa(projectID))).Result() + if err != nil { + log.WithError(err).Error("redis get active failed") + return nil + } + s.mu.RLock() + res := make([]*TaskRunner, 0, len(ids)) + for _, idStr := range ids { + id, convErr := strconv.Atoi(idStr) + if convErr != nil { + continue + } + if t := s.byID[id]; t != nil { + res = append(res, t) + } + } + s.mu.RUnlock() + return res +} + +func (s *RedisTaskStateStore) ActiveCount(projectID int) int { + ctx := context.Background() + n, err := s.client.SCard(ctx, s.key("active", strconv.Itoa(projectID))).Result() + if err != nil { + log.WithError(err).Error("redis active count failed") + return 0 + } + return int(n) +} + +// Alias operations +func (s *RedisTaskStateStore) SetAlias(alias string, task *TaskRunner) { + s.mu.Lock() + s.byAlias[alias] = task + s.byID[task.Task.ID] = task + s.mu.Unlock() + ctx := context.Background() + if err := s.client.HSet(ctx, s.key("aliases"), alias, task.Task.ID).Err(); err != nil { + log.WithError(err).Error("redis set alias failed") + } + s.publish(ctx, redisEvent{Type: "alias_set", Data: mustJSON(taskRef{TaskID: task.Task.ID, ProjectID: task.Task.ProjectID, Alias: alias, RunnerID: task.RunnerID, Username: task.Username, Incoming: derefStr(task.IncomingVersion)})}) +} + +func (s *RedisTaskStateStore) GetByAlias(alias string) *TaskRunner { + s.mu.RLock() + if t := s.byAlias[alias]; t != nil { + s.mu.RUnlock() + return t + } + s.mu.RUnlock() + ctx := context.Background() + idStr, err := s.client.HGet(ctx, s.key("aliases"), alias).Result() + if err != nil { + return nil + } + id, convErr := strconv.Atoi(idStr) + if convErr != nil { + return nil + } + s.mu.RLock() + t := s.byID[id] + s.mu.RUnlock() + return t +} + +func (s *RedisTaskStateStore) DeleteAlias(alias string) { + ctx := context.Background() + if err := s.client.HDel(ctx, s.key("aliases"), alias).Err(); err != nil { + log.WithError(err).Error("redis delete alias failed") + } + s.mu.Lock() + delete(s.byAlias, alias) + s.mu.Unlock() + s.publish(ctx, redisEvent{Type: "alias_delete", Data: mustJSON(taskRef{Alias: alias})}) +} + +func mustJSON(v any) json.RawMessage { + b, _ := json.Marshal(v) + return b +} + +func derefStr(p *string) string { + if p == nil { + return "" + } + return *p +} + +// UpdateRuntimeFields persists transient TaskRunner fields in Redis +func (s *RedisTaskStateStore) UpdateRuntimeFields(task *TaskRunner) { + ctx := context.Background() + // We store them in a hash keyed by task id for easy update + fields := map[string]interface{}{ + "runner_id": strconv.Itoa(task.RunnerID), + "username": task.Username, + "incoming_version": derefStr(task.IncomingVersion), + "alias": task.Alias, + "project_id": strconv.Itoa(task.Task.ProjectID), + } + if err := s.client.HSet(ctx, s.key("runtime", strconv.Itoa(task.Task.ID)), fields).Err(); err != nil { + log.WithError(err).Error("redis update runtime failed") + } +} + +// LoadRuntimeFields restores transient fields from Redis for a task +func (s *RedisTaskStateStore) LoadRuntimeFields(task *TaskRunner) { + ctx := context.Background() + m, err := s.client.HGetAll(ctx, s.key("runtime", strconv.Itoa(task.Task.ID))).Result() + if err != nil || len(m) == 0 { + return + } + if v := m["runner_id"]; v != "" { + if id, err := strconv.Atoi(v); err == nil { + task.RunnerID = id + } + } + if v := m["username"]; v != "" { + task.Username = v + } + if v := m["incoming_version"]; v != "" { + task.IncomingVersion = &v + } + if v := m["alias"]; v != "" { + task.Alias = v + } +} + +// TryClaim atomically tries to claim a task for execution using SET NX +func (s *RedisTaskStateStore) TryClaim(taskID int) bool { + ctx := context.Background() + key := s.key("claim", strconv.Itoa(taskID)) + ok, err := s.client.SetNX(ctx, key, "1", 0).Result() + if err != nil { + log.WithError(err).Error("redis try claim failed") + return false + } + return ok +} + +// DeleteClaim releases the execution claim for a task +func (s *RedisTaskStateStore) DeleteClaim(taskID int) { + ctx := context.Background() + if err := s.client.Del(ctx, s.key("claim", strconv.Itoa(taskID))).Err(); err != nil { + log.WithError(err).Error("redis delete claim failed") + } +} diff --git a/services/tasks/task_state_store.go b/services/tasks/task_state_store.go new file mode 100644 index 000000000..63aaf8e0f --- /dev/null +++ b/services/tasks/task_state_store.go @@ -0,0 +1,214 @@ +package tasks + +import "sync" + +// TaskRunnerHydrator constructs a TaskRunner for an existing task +// identified by taskID and projectID without starting it. +type TaskRunnerHydrator func(taskID int, projectID int) (*TaskRunner, error) + +// TaskStateStore defines pluggable storage for task pool state +type TaskStateStore interface { + // Start allows the store to initialize, restore its in-memory + // pointers from the underlying backend and start background + // sync listeners (e.g., Redis Pub/Sub). Implementations may no-op. + Start(hydrator TaskRunnerHydrator) error + + // Queue operations + Enqueue(task *TaskRunner) + DequeueAt(index int) error + QueueRange() []*TaskRunner + QueueGet(index int) *TaskRunner + QueueLen() int + + // Running tasks map operations + SetRunning(task *TaskRunner) + DeleteRunning(taskID int) + RunningRange() []*TaskRunner + RunningCount() int + + // Active-by-project operations + AddActive(projectID int, task *TaskRunner) + RemoveActive(projectID int, taskID int) + GetActive(projectID int) []*TaskRunner + ActiveCount(projectID int) int + + // Aliases operations + SetAlias(alias string, task *TaskRunner) + GetByAlias(alias string) *TaskRunner + DeleteAlias(alias string) + + // Distributed claim to ensure single runner starts a task + TryClaim(taskID int) bool + DeleteClaim(taskID int) + + // UpdateRuntimeFields persists transient fields of TaskRunner so + // they can be restored after restart in HA mode. + UpdateRuntimeFields(task *TaskRunner) + // LoadRuntimeFields fills runtime fields (RunnerID, Username, IncomingVersion, Alias) + // from the backend into the provided task. No-op if not supported. + LoadRuntimeFields(task *TaskRunner) +} + +// MemoryTaskStateStore is an in-memory implementation of TaskStateStore +type MemoryTaskStateStore struct { + mu sync.RWMutex + queue []*TaskRunner + running map[int]*TaskRunner + activeProj map[int]map[int]*TaskRunner // projectID -> taskID -> task + aliases map[string]*TaskRunner +} + +func NewMemoryTaskStateStore() *MemoryTaskStateStore { + return &MemoryTaskStateStore{ + queue: make([]*TaskRunner, 0), + running: make(map[int]*TaskRunner), + activeProj: make(map[int]map[int]*TaskRunner), + aliases: make(map[string]*TaskRunner), + } +} + +// Start is a no-op for the in-memory store +func (s *MemoryTaskStateStore) Start(_ TaskRunnerHydrator) error { return nil } + +// Claims always succeed in memory single-process mode +func (s *MemoryTaskStateStore) TryClaim(_ int) bool { return true } +func (s *MemoryTaskStateStore) DeleteClaim(_ int) {} +func (s *MemoryTaskStateStore) UpdateRuntimeFields(_ *TaskRunner) {} +func (s *MemoryTaskStateStore) LoadRuntimeFields(_ *TaskRunner) {} + +// Queue +func (s *MemoryTaskStateStore) Enqueue(task *TaskRunner) { + s.mu.Lock() + s.queue = append(s.queue, task) + s.mu.Unlock() +} + +func (s *MemoryTaskStateStore) DequeueAt(index int) error { + s.mu.Lock() + if index < 0 || index >= len(s.queue) { + s.mu.Unlock() + return nil + } + s.queue = append(s.queue[:index], s.queue[index+1:]...) + s.mu.Unlock() + return nil +} + +func (s *MemoryTaskStateStore) QueueRange() []*TaskRunner { + s.mu.RLock() + out := make([]*TaskRunner, len(s.queue)) + copy(out, s.queue) + s.mu.RUnlock() + return out +} + +func (s *MemoryTaskStateStore) QueueGet(index int) *TaskRunner { + s.mu.RLock() + defer s.mu.RUnlock() + if index < 0 || index >= len(s.queue) { + return nil + } + return s.queue[index] +} + +func (s *MemoryTaskStateStore) QueueLen() int { + s.mu.RLock() + l := len(s.queue) + s.mu.RUnlock() + return l +} + +// Running +func (s *MemoryTaskStateStore) SetRunning(task *TaskRunner) { + s.mu.Lock() + s.running[task.Task.ID] = task + s.mu.Unlock() +} + +func (s *MemoryTaskStateStore) DeleteRunning(taskID int) { + s.mu.Lock() + delete(s.running, taskID) + s.mu.Unlock() +} + +func (s *MemoryTaskStateStore) RunningRange() []*TaskRunner { + s.mu.RLock() + res := make([]*TaskRunner, 0, len(s.running)) + for _, t := range s.running { + res = append(res, t) + } + s.mu.RUnlock() + return res +} + +func (s *MemoryTaskStateStore) RunningCount() int { + s.mu.RLock() + l := len(s.running) + s.mu.RUnlock() + return l +} + +// Active by project +func (s *MemoryTaskStateStore) AddActive(projectID int, task *TaskRunner) { + s.mu.Lock() + m, ok := s.activeProj[projectID] + if !ok { + m = make(map[int]*TaskRunner) + s.activeProj[projectID] = m + } + m[task.Task.ID] = task + s.mu.Unlock() +} + +func (s *MemoryTaskStateStore) RemoveActive(projectID int, taskID int) { + s.mu.Lock() + if s.activeProj[projectID] != nil { + delete(s.activeProj[projectID], taskID) + if len(s.activeProj[projectID]) == 0 { + delete(s.activeProj, projectID) + } + } + s.mu.Unlock() +} + +func (s *MemoryTaskStateStore) GetActive(projectID int) []*TaskRunner { + s.mu.RLock() + res := make([]*TaskRunner, 0) + if s.activeProj[projectID] != nil { + for _, t := range s.activeProj[projectID] { + res = append(res, t) + } + } + s.mu.RUnlock() + return res +} + +func (s *MemoryTaskStateStore) ActiveCount(projectID int) int { + s.mu.RLock() + l := 0 + if s.activeProj[projectID] != nil { + l = len(s.activeProj[projectID]) + } + s.mu.RUnlock() + return l +} + +// Aliases +func (s *MemoryTaskStateStore) SetAlias(alias string, task *TaskRunner) { + s.mu.Lock() + s.aliases[alias] = task + s.mu.Unlock() +} + +func (s *MemoryTaskStateStore) GetByAlias(alias string) *TaskRunner { + s.mu.RLock() + t := s.aliases[alias] + s.mu.RUnlock() + return t +} + +func (s *MemoryTaskStateStore) DeleteAlias(alias string) { + s.mu.Lock() + delete(s.aliases, alias) + s.mu.Unlock() +} diff --git a/util/config.go b/util/config.go index 4df6764d5..d12774a63 100644 --- a/util/config.go +++ b/util/config.go @@ -178,6 +178,20 @@ type DebuggingConfig struct { PprofDumpDir string `json:"pprof_dump_dir,omitempty" env:"SEMAPHORE_PPROF_DUMP_DIR"` } +type HARedisConfig struct { + Addr string `json:"addr,omitempty" env:"SEMAPHORE_HA_REDIS_ADDR"` + DB int `json:"db,omitempty" env:"SEMAPHORE_HA_REDIS_DB"` + Pass string `json:"pass,omitempty" env:"SEMAPHORE_HA_REDIS_PASS"` + User string `json:"user,omitempty" env:"SEMAPHORE_HA_REDIS_USER"` + TLS bool `json:"tls,omitempty" env:"SEMAPHORE_HA_REDIS_TLS"` + TLSSkipVerify bool `json:"tls_skip_verify,omitempty" env:"SEMAPHORE_HA_REDIS_TLS_SKIP_VERIFY"` +} + +type HAConfig struct { + Enabled bool `json:"enabled" env:"SEMAPHORE_HA_ENABLED"` + Redis *HARedisConfig `json:"redis,omitempty"` +} + type TeamInviteType string const ( @@ -304,6 +318,8 @@ type ConfigType struct { Schedule *ScheduleConfig `json:"schedule,omitempty"` Debugging *DebuggingConfig `json:"debugging,omitempty"` + + HA *HAConfig `json:"ha,omitempty"` } func NewConfigType() *ConfigType {