Skip to content

Commit be26c58

Browse files
authored
ah2 (#3191)
* feat(tasks): memory storage impl * feat: redis impl * feat(tasks): redis impl * feat: add ha config * feat(ha): redis pub sub * feat: store to redis * fix(ha): restore runner id * fix(tasks): task not found * refactor(ha): move impl to pro dir * test(unit): fix tests
1 parent 3dfa389 commit be26c58

File tree

13 files changed

+1075
-66
lines changed

13 files changed

+1075
-66
lines changed

api/tasks/tasks.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package tasks
22

33
import (
4-
"github.com/semaphoreui/semaphore/pkg/task_logger"
54
"net/http"
65

6+
"github.com/semaphoreui/semaphore/pkg/task_logger"
7+
78
"github.com/semaphoreui/semaphore/api/helpers"
89
"github.com/semaphoreui/semaphore/db"
910
task2 "github.com/semaphoreui/semaphore/services/tasks"
@@ -44,7 +45,7 @@ func GetTasks(w http.ResponseWriter, r *http.Request) {
4445

4546
res := []taskRes{}
4647

47-
for _, task := range pool.Queue {
48+
for _, task := range pool.GetQueuedTasks() {
4849
res = append(res, taskRes{
4950
TaskID: task.Task.ID,
5051
ProjectID: task.Task.ProjectID,
@@ -55,7 +56,7 @@ func GetTasks(w http.ResponseWriter, r *http.Request) {
5556
})
5657
}
5758

58-
for _, task := range pool.RunningTasks {
59+
for _, task := range pool.GetRunningTasks() {
5960
res = append(res, taskRes{
6061
TaskID: task.Task.ID,
6162
ProjectID: task.Task.ProjectID,
@@ -77,15 +78,15 @@ func DeleteTask(w http.ResponseWriter, r *http.Request) {
7778

7879
var task *db.Task
7980

80-
for _, t := range pool.Queue {
81+
for _, t := range pool.GetQueuedTasks() {
8182
if t.Task.ID == taskID {
8283
task = &t.Task
8384
break
8485
}
8586
}
8687

8788
if task == nil {
88-
for _, t := range pool.RunningTasks {
89+
for _, t := range pool.GetRunningTasks() {
8990
if t.Task.ID == taskID {
9091
task = &t.Task
9192
break

cli/cmd/root.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/semaphoreui/semaphore/db/factory"
1818
proFactory "github.com/semaphoreui/semaphore/pro/db/factory"
1919
proServer "github.com/semaphoreui/semaphore/pro/services/server"
20+
proTasks "github.com/semaphoreui/semaphore/pro/services/tasks"
2021
"github.com/semaphoreui/semaphore/services/schedules"
2122
"github.com/semaphoreui/semaphore/services/tasks"
2223
"github.com/semaphoreui/semaphore/util"
@@ -72,6 +73,7 @@ func Execute() {
7273

7374
func runService() {
7475
store := createStore("root")
76+
state := proTasks.NewTaskStateStore()
7577
terraformStore := proFactory.NewTerraformStore(store)
7678
ansibleTaskRepo := proFactory.NewAnsibleTaskRepository(store)
7779

@@ -93,6 +95,7 @@ func runService() {
9395

9496
taskPool := tasks.CreateTaskPool(
9597
store,
98+
state,
9699
ansibleTaskRepo,
97100
inventoryService,
98101
encryptionService,

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/lib/pq v1.10.9
1919
github.com/mdp/qrterminal/v3 v3.2.1
2020
github.com/pquerna/otp v1.4.0
21+
github.com/redis/go-redis/v9 v9.7.0
2122
github.com/robfig/cron/v3 v3.0.1
2223
github.com/semaphoreui/semaphore/pro v0.0.0
2324
github.com/sirupsen/logrus v1.9.3
@@ -41,9 +42,11 @@ require (
4142
github.com/Microsoft/go-winio v0.6.2 // indirect
4243
github.com/ProtonMail/go-crypto v1.1.6 // indirect
4344
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc // indirect
45+
github.com/cespare/xxhash/v2 v2.2.0 // indirect
4446
github.com/cloudflare/circl v1.6.1 // indirect
4547
github.com/cyphar/filepath-securejoin v0.4.1 // indirect
4648
github.com/davecgh/go-spew v1.1.1 // indirect
49+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
4750
github.com/dustin/go-humanize v1.0.1 // indirect
4851
github.com/emirpasic/gods v1.18.1 // indirect
4952
github.com/felixge/httpsnoop v1.0.4 // indirect

go.sum

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd
1919
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
2020
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc h1:biVzkmvwrH8WK8raXaxBx6fRVTlJILwEwQGL1I/ByEI=
2121
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
22+
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
23+
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
24+
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
25+
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
26+
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
27+
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
2228
github.com/cloudflare/circl v1.6.1 h1:zqIqSPIndyBh1bjLVVDHMPpVKqp8Su/V+6MeDzzQBQ0=
2329
github.com/cloudflare/circl v1.6.1/go.mod h1:uddAzsPgqdMAYatqJ0lsjX1oECcQLIlRpzZh3pJrofs=
2430
github.com/coreos/go-oidc/v3 v3.14.1 h1:9ePWwfdwC4QKRlCXsJGou56adA/owXczOzwKdOumLqk=
@@ -31,6 +37,8 @@ github.com/cyphar/filepath-securejoin v0.4.1/go.mod h1:Sdj7gXlvMcPZsbhwhQ33GguGL
3137
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3238
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
3339
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
40+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
41+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
3442
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
3543
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
3644
github.com/elazarl/goproxy v1.7.2 h1:Y2o6urb7Eule09PjlhQRGNsqRfPmYI3KKQLFpCAV3+o=
@@ -135,6 +143,8 @@ github.com/poy/onpar v1.1.2 h1:QaNrNiZx0+Nar5dLgTVp5mXkyoVFIbepjyEoGSnhbAY=
135143
github.com/poy/onpar v1.1.2/go.mod h1:6X8FLNoxyr9kkmnlqpK6LSoiOtrO6MICtWwEuWkLjzg=
136144
github.com/pquerna/otp v1.4.0 h1:wZvl1TIVxKRThZIBiwOOHOGP/1+nZyWBil9Y2XNEDzg=
137145
github.com/pquerna/otp v1.4.0/go.mod h1:dkJfzwRKNiegxyNb54X/3fLwhCynbMspSyWKnvi1AEg=
146+
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
147+
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
138148
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
139149
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
140150
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package tasks
2+
3+
import (
4+
"github.com/semaphoreui/semaphore/services/tasks"
5+
)
6+
7+
func NewTaskStateStore() tasks.TaskStateStore {
8+
return tasks.NewMemoryTaskStateStore()
9+
}

services/tasks/RemoteJob.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import (
44
"bytes"
55
"encoding/json"
66
"fmt"
7-
"github.com/semaphoreui/semaphore/pkg/tz"
8-
log "github.com/sirupsen/logrus"
97
"net/http"
108
"time"
119

10+
"github.com/semaphoreui/semaphore/pkg/tz"
11+
log "github.com/sirupsen/logrus"
12+
1213
"github.com/semaphoreui/semaphore/db"
1314
"github.com/semaphoreui/semaphore/pkg/task_logger"
1415
"github.com/semaphoreui/semaphore/util"
@@ -93,6 +94,9 @@ func (t *RemoteJob) Run(username string, incomingVersion *string, alias string)
9394
tsk.IncomingVersion = incomingVersion
9495
tsk.Username = username
9596
tsk.Alias = alias
97+
if t.taskPool != nil && t.taskPool.state != nil {
98+
t.taskPool.state.UpdateRuntimeFields(tsk)
99+
}
96100

97101
var runners []db.Runner
98102
db.StoreSession(t.taskPool.store, "run remote job", func() {
@@ -141,6 +145,9 @@ func (t *RemoteJob) Run(username string, incomingVersion *string, alias string)
141145
}
142146

143147
tsk.RunnerID = runner.ID
148+
if t.taskPool != nil && t.taskPool.state != nil {
149+
t.taskPool.state.UpdateRuntimeFields(tsk)
150+
}
144151

145152
startTime := tz.Now()
146153

@@ -154,6 +161,12 @@ func (t *RemoteJob) Run(username string, incomingVersion *string, alias string)
154161

155162
time.Sleep(1_000_000_000)
156163
tsk = t.taskPool.GetTask(t.Task.ID)
164+
165+
if tsk == nil {
166+
err = fmt.Errorf("task %d not found", t.Task.ID)
167+
return
168+
}
169+
157170
if tsk.Task.Status == task_logger.TaskSuccessStatus ||
158171
tsk.Task.Status == task_logger.TaskStoppedStatus ||
159172
tsk.Task.Status == task_logger.TaskFailStatus {

0 commit comments

Comments
 (0)