Skip to content

Commit 929e115

Browse files
fiftinCopilotCopilot
authored
feat(runners): flag all runners busy (#3519)
* feat(runners): flag all runners busy * Initial plan * Update services/tasks/TaskRunner.go Co-authored-by: Copilot <[email protected]> * refactor: use sentinel error for all runners busy check Co-authored-by: fiftin <[email protected]> * Use sentinel error for all runners busy condition Co-authored-by: fiftin <[email protected]> * feat(runners): requene flag * chore: remove extra check --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: Copilot <[email protected]> Co-authored-by: fiftin <[email protected]>
1 parent 0f191db commit 929e115

File tree

6 files changed

+164
-11
lines changed

6 files changed

+164
-11
lines changed

pro/go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ require (
3838
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
3939
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
4040
github.com/sirupsen/logrus v1.9.3 // indirect
41+
github.com/skeema/knownhosts v1.3.1 // indirect
42+
github.com/xanzy/ssh-agent v0.3.3 // indirect
4143
golang.org/x/crypto v0.45.0 // indirect
44+
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
45+
golang.org/x/net v0.47.0 // indirect
4246
golang.org/x/sys v0.38.0 // indirect
4347
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
4448
gopkg.in/warnings.v0 v0.1.2 // indirect

pro/go.sum

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,29 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
111111
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
112112
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
113113
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
114-
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
115-
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
116-
golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8=
117-
golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw=
118-
golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc=
114+
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
115+
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
116+
github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM=
117+
github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw=
118+
go.etcd.io/bbolt v1.4.1 h1:5mOV+HWjIPLEAlUGMsveaUvK2+byZMFOzojoi7bh7uI=
119+
go.etcd.io/bbolt v1.4.1/go.mod h1:c8zu2BnXWTu2XM4XcICtbGSl9cFwsXtcf9zLt2OncM8=
120+
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
121+
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
119122
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
123+
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
124+
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
125+
golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ=
126+
golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc=
127+
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
128+
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
129+
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
130+
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
131+
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
132+
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
133+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
134+
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
135+
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
136+
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
120137
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
121138
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
122139
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=

services/tasks/RemoteJob.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tasks
33
import (
44
"bytes"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"net/http"
89
"time"
@@ -15,6 +16,9 @@ import (
1516
"github.com/semaphoreui/semaphore/util"
1617
)
1718

19+
// ErrAllRunnersBusy is returned when all available runners are busy
20+
var ErrAllRunnersBusy = errors.New("all runners busy")
21+
1822
type RemoteJob struct {
1923
RunnerTag *string
2024
Task db.Task
@@ -72,7 +76,7 @@ func callRunnerWebhook(runner *db.Runner, tsk *TaskRunner, action string) (err e
7276
if resp != nil {
7377
defer resp.Body.Close() //nolint:errcheck
7478
}
75-
79+
7680
if resp.StatusCode != 200 && resp.StatusCode != 204 {
7781
err = fmt.Errorf("webhook returned incorrect status")
7882
return
@@ -136,7 +140,7 @@ func (t *RemoteJob) Run(username string, incomingVersion *string, alias string)
136140
}
137141

138142
if runner == nil {
139-
err = fmt.Errorf("no runners available")
143+
err = ErrAllRunnersBusy
140144
return
141145
}
142146

services/tasks/TaskPool.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ type logRecord struct {
3131
type EventType uint
3232

3333
const (
34-
EventTypeNew EventType = 0
35-
EventTypeFinished EventType = 1
36-
EventTypeFailed EventType = 2
37-
EventTypeEmpty EventType = 3
34+
EventTypeNew EventType = 0 // EventTypeNew represents an event when a new task is created, typically sent during a periodic check or timer.
35+
EventTypeFinished EventType = 1 // EventTypeFinished represents an event when a task finishes, typically sent during a periodic check or timer.
36+
EventTypeFailed EventType = 2 // EventTypeFailed represents an event when a task fails, typically sent during a periodic check or timer.
37+
EventTypeEmpty EventType = 3 // EventTypeEmpty represents an event when the queue is empty, typically sent during a periodic check or timer.
38+
EventTypeRequeued EventType = 4 // EventTypeRequeued represents an event when a task is moved back to the waiting state for reprocessing.
3839
)
3940

4041
const (
@@ -190,7 +191,21 @@ func getTaskName(t *TaskRunner) string {
190191

191192
func (p *TaskPool) handleQueue() {
192193
for t := range p.queueEvents {
194+
// When a task is re-queued (e.g., no remote runner available), we should
195+
// clean up its "running" bookkeeping but avoid immediately retrying it in
196+
// the same queue pass to prevent hot retry loops.
197+
skipTaskID := 0
198+
193199
switch t.eventType {
200+
case EventTypeRequeued:
201+
// Task was started but moved back to waiting. It must not remain in
202+
// running/active sets and must release its claim so it can be picked
203+
// up again later.
204+
p.onTaskStop(t.task)
205+
// Avoid immediate retry in this same event handling iteration; it
206+
// will be retried on the next periodic tick or when another event
207+
// triggers queue processing.
208+
skipTaskID = t.task.Task.ID
194209
case EventTypeNew:
195210
p.state.Enqueue(t.task)
196211
case EventTypeFinished:
@@ -209,6 +224,12 @@ func (p *TaskPool) handleQueue() {
209224
continue
210225
}
211226

227+
// When handling a requeue event, don't immediately start the same task again.
228+
if skipTaskID != 0 && curr.Task.ID == skipTaskID {
229+
i = i + 1
230+
continue
231+
}
232+
212233
if curr.Task.Status == task_logger.TaskFailStatus {
213234
//delete failed TaskRunner from queue
214235
_ = p.state.DequeueAt(i)

services/tasks/TaskPool_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package tasks
2+
3+
import (
4+
"sync"
5+
"testing"
6+
7+
"github.com/semaphoreui/semaphore/db"
8+
"github.com/semaphoreui/semaphore/db/bolt"
9+
"github.com/semaphoreui/semaphore/pkg/task_logger"
10+
"github.com/semaphoreui/semaphore/util"
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
type spyTaskStateStore struct {
15+
*MemoryTaskStateStore
16+
tryClaimCalls int
17+
}
18+
19+
func newSpyTaskStateStore() *spyTaskStateStore {
20+
return &spyTaskStateStore{
21+
MemoryTaskStateStore: NewMemoryTaskStateStore(),
22+
}
23+
}
24+
25+
// TryClaim returns false to ensure tests don't actually start tasks; we only want to
26+
// observe whether the queue loop attempted to claim a task.
27+
func (s *spyTaskStateStore) TryClaim(_ int) bool {
28+
s.tryClaimCalls++
29+
return false
30+
}
31+
32+
func TestTaskPool_RequeuedEventCleansRunningStateAndSkipsImmediateRetry(t *testing.T) {
33+
// Ensure util.Config is non-nil for p.blocks() checks.
34+
prevCfg := util.Config
35+
t.Cleanup(func() { util.Config = prevCfg })
36+
util.Config = &util.ConfigType{MaxParallelTasks: 0}
37+
38+
store := bolt.CreateTestStore()
39+
proj, err := store.CreateProject(db.Project{})
40+
assert.NoError(t, err)
41+
42+
state := newSpyTaskStateStore()
43+
44+
pool := TaskPool{
45+
queueEvents: make(chan PoolEvent),
46+
state: state,
47+
store: store,
48+
}
49+
50+
tr := &TaskRunner{
51+
Task: db.Task{
52+
ID: 42,
53+
ProjectID: proj.ID,
54+
TemplateID: 7,
55+
Status: task_logger.TaskWaitingStatus,
56+
},
57+
Template: db.Template{
58+
ID: 7,
59+
Name: "Test Template",
60+
},
61+
Alias: "alias-42",
62+
}
63+
64+
// Simulate a task that was marked as running and then re-queued (the state that
65+
// exists right before EventTypeRequeued is handled).
66+
state.SetRunning(tr)
67+
state.AddActive(tr.Task.ProjectID, tr)
68+
state.SetAlias(tr.Alias, tr)
69+
state.Enqueue(tr)
70+
71+
var wg sync.WaitGroup
72+
wg.Add(1)
73+
go func() {
74+
defer wg.Done()
75+
pool.handleQueue()
76+
}()
77+
78+
pool.queueEvents <- PoolEvent{EventTypeRequeued, tr}
79+
close(pool.queueEvents)
80+
wg.Wait()
81+
82+
assert.Equal(t, 0, state.RunningCount(), "requeued task must be removed from running set")
83+
assert.Equal(t, 0, state.ActiveCount(tr.Task.ProjectID), "requeued task must be removed from active-by-project set")
84+
assert.Nil(t, state.GetByAlias(tr.Alias), "requeued task alias mapping must be cleared")
85+
assert.Equal(t, 1, state.QueueLen(), "requeued task must remain queued")
86+
assert.Equal(t, 0, state.tryClaimCalls, "requeued task should not be immediately retried in the same queue pass")
87+
}
88+
89+

services/tasks/TaskRunner.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,17 @@ func (t *TaskRunner) run() {
171171
defer t.pool.store.Close("run task " + strconv.Itoa(t.Task.ID))
172172
}
173173

174+
// requeued indicates task should go back to waiting state (e.g., all runners busy)
175+
requeued := false
176+
174177
defer func() {
178+
if requeued {
179+
// Task is being re-queued, don't mark as finished
180+
log.Info("Task " + strconv.Itoa(t.Task.ID) + " re-queued (waiting for available runner)")
181+
t.pool.queueEvents <- PoolEvent{EventTypeRequeued, t}
182+
return
183+
}
184+
175185
log.Info("Stopped running TaskRunner " + strconv.Itoa(t.Task.ID))
176186
log.Info("Release resource locker with TaskRunner " + strconv.Itoa(t.Task.ID))
177187

@@ -214,6 +224,14 @@ func (t *TaskRunner) run() {
214224
err = t.job.Run(username, incomingVersion, t.Alias)
215225

216226
if err != nil {
227+
if errors.Is(err, ErrAllRunnersBusy) {
228+
// No runners available right now, put task back in waiting state
229+
t.SetStatus(task_logger.TaskWaitingStatus)
230+
t.pool.state.Enqueue(t)
231+
requeued = true
232+
return
233+
}
234+
217235
if t.job.IsKilled() {
218236
t.SetStatus(task_logger.TaskStoppedStatus)
219237
} else {

0 commit comments

Comments
 (0)