Skip to content

Commit 0ecefb0

Browse files
committed
feat: Add max-parallel implementation inside the GItea server
1 parent f9d3983 commit 0ecefb0

File tree

18 files changed

+939
-3
lines changed

18 files changed

+939
-3
lines changed

models/actions/run_job.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ type ActionRunJob struct {
5151
ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` // evaluated concurrency.group
5252
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` // evaluated concurrency.cancel-in-progress
5353

54+
// Matrix job support
55+
MatrixID string `xorm:"VARCHAR(255) index"` // Unique identifier for matrix combination (e.g., "os:ubuntu,node:16")
56+
MaxParallel int // Max parallel jobs from strategy.max-parallel (0 = unlimited)
57+
5458
Started timeutil.TimeStamp
5559
Stopped timeutil.TimeStamp
5660
Created timeutil.TimeStamp `xorm:"created"`
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Copyright 2026 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package actions
5+
6+
import (
7+
"testing"
8+
9+
"code.gitea.io/gitea/models/db"
10+
"code.gitea.io/gitea/models/unittest"
11+
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
func TestActionRunJob_MaxParallel(t *testing.T) {
16+
assert.NoError(t, unittest.PrepareTestDatabase())
17+
ctx := db.DefaultContext
18+
19+
t.Run("NoMaxParallel", func(t *testing.T) {
20+
job := &ActionRunJob{
21+
RunID: 1,
22+
RepoID: 1,
23+
OwnerID: 1,
24+
JobID: "test-job-1",
25+
Name: "Test Job",
26+
Status: StatusWaiting,
27+
MaxParallel: 0, // No limit
28+
}
29+
assert.NoError(t, db.Insert(ctx, job))
30+
31+
retrieved, err := GetRunJobByID(ctx, job.ID)
32+
assert.NoError(t, err)
33+
assert.Equal(t, 0, retrieved.MaxParallel)
34+
})
35+
36+
t.Run("WithMaxParallel", func(t *testing.T) {
37+
job := &ActionRunJob{
38+
RunID: 1,
39+
RepoID: 1,
40+
OwnerID: 1,
41+
JobID: "test-job-2",
42+
Name: "Matrix Job",
43+
Status: StatusWaiting,
44+
MaxParallel: 3,
45+
}
46+
assert.NoError(t, db.Insert(ctx, job))
47+
48+
retrieved, err := GetRunJobByID(ctx, job.ID)
49+
assert.NoError(t, err)
50+
assert.Equal(t, 3, retrieved.MaxParallel)
51+
})
52+
53+
t.Run("MatrixID", func(t *testing.T) {
54+
job := &ActionRunJob{
55+
RunID: 1,
56+
RepoID: 1,
57+
OwnerID: 1,
58+
JobID: "test-job-3",
59+
Name: "Matrix Job with ID",
60+
Status: StatusWaiting,
61+
MaxParallel: 2,
62+
MatrixID: "os:ubuntu,node:16",
63+
}
64+
assert.NoError(t, db.Insert(ctx, job))
65+
66+
retrieved, err := GetRunJobByID(ctx, job.ID)
67+
assert.NoError(t, err)
68+
assert.Equal(t, 2, retrieved.MaxParallel)
69+
assert.Equal(t, "os:ubuntu,node:16", retrieved.MatrixID)
70+
})
71+
72+
t.Run("UpdateMaxParallel", func(t *testing.T) {
73+
job := &ActionRunJob{
74+
RunID: 1,
75+
RepoID: 1,
76+
OwnerID: 1,
77+
JobID: "test-job-4",
78+
Name: "Updatable Job",
79+
Status: StatusWaiting,
80+
MaxParallel: 5,
81+
}
82+
assert.NoError(t, db.Insert(ctx, job))
83+
84+
// Update max parallel
85+
job.MaxParallel = 10
86+
_, err := UpdateRunJob(ctx, job, nil, "max_parallel")
87+
assert.NoError(t, err)
88+
89+
retrieved, err := GetRunJobByID(ctx, job.ID)
90+
assert.NoError(t, err)
91+
assert.Equal(t, 10, retrieved.MaxParallel)
92+
})
93+
}
94+
95+
func TestActionRunJob_MaxParallelEnforcement(t *testing.T) {
96+
assert.NoError(t, unittest.PrepareTestDatabase())
97+
ctx := db.DefaultContext
98+
99+
t.Run("EnforceMaxParallel", func(t *testing.T) {
100+
runID := int64(5000)
101+
jobID := "parallel-enforced-job"
102+
maxParallel := 2
103+
104+
// Create jobs simulating matrix execution
105+
jobs := []*ActionRunJob{
106+
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel, MatrixID: "version:1"},
107+
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusRunning, MaxParallel: maxParallel, MatrixID: "version:2"},
108+
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel, MatrixID: "version:3"},
109+
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 4", Status: StatusWaiting, MaxParallel: maxParallel, MatrixID: "version:4"},
110+
}
111+
112+
for _, job := range jobs {
113+
assert.NoError(t, db.Insert(ctx, job))
114+
}
115+
116+
// Verify running count
117+
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
118+
assert.NoError(t, err)
119+
assert.Equal(t, maxParallel, runningCount, "Should have exactly max-parallel jobs running")
120+
121+
// Simulate job completion
122+
jobs[0].Status = StatusSuccess
123+
_, err = UpdateRunJob(ctx, jobs[0], nil, "status")
124+
assert.NoError(t, err)
125+
126+
// Now running count should be 1
127+
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
128+
assert.NoError(t, err)
129+
assert.Equal(t, 1, runningCount)
130+
131+
// Simulate next job starting
132+
jobs[2].Status = StatusRunning
133+
_, err = UpdateRunJob(ctx, jobs[2], nil, "status")
134+
assert.NoError(t, err)
135+
136+
// Back to max-parallel
137+
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
138+
assert.NoError(t, err)
139+
assert.Equal(t, maxParallel, runningCount)
140+
})
141+
}

models/actions/runner.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ type ActionRunner struct {
6262
AgentLabels []string `xorm:"TEXT"`
6363
// Store if this is a runner that only ever get one single job assigned
6464
Ephemeral bool `xorm:"ephemeral NOT NULL DEFAULT false"`
65+
// Maximum number of parallel tasks this runner can execute (0 = unlimited, defaults to 1)
66+
Capacity int `xorm:"NOT NULL DEFAULT 1"`
6567

6668
Created timeutil.TimeStamp `xorm:"created"`
6769
Updated timeutil.TimeStamp `xorm:"updated"`
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2026 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package actions
5+
6+
import (
7+
"testing"
8+
9+
"code.gitea.io/gitea/models/db"
10+
"code.gitea.io/gitea/models/unittest"
11+
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
func TestActionRunner_Capacity(t *testing.T) {
16+
assert.NoError(t, unittest.PrepareTestDatabase())
17+
ctx := db.DefaultContext
18+
19+
t.Run("DefaultCapacity", func(t *testing.T) {
20+
runner := &ActionRunner{
21+
UUID: "test-uuid-1",
22+
Name: "test-runner",
23+
OwnerID: 0,
24+
RepoID: 0,
25+
}
26+
assert.NoError(t, db.Insert(ctx, runner))
27+
28+
// Default capacity should be 1
29+
assert.Equal(t, 1, runner.Capacity)
30+
31+
// Verify in database
32+
retrieved, err := GetRunnerByID(ctx, runner.ID)
33+
assert.NoError(t, err)
34+
assert.Equal(t, 1, retrieved.Capacity)
35+
})
36+
37+
t.Run("CustomCapacity", func(t *testing.T) {
38+
runner := &ActionRunner{
39+
UUID: "test-uuid-2",
40+
Name: "test-runner-2",
41+
OwnerID: 0,
42+
RepoID: 0,
43+
Capacity: 5,
44+
}
45+
assert.NoError(t, db.Insert(ctx, runner))
46+
47+
assert.Equal(t, 5, runner.Capacity)
48+
49+
// Verify in database
50+
retrieved, err := GetRunnerByID(ctx, runner.ID)
51+
assert.NoError(t, err)
52+
assert.Equal(t, 5, retrieved.Capacity)
53+
})
54+
55+
t.Run("UpdateCapacity", func(t *testing.T) {
56+
runner := &ActionRunner{
57+
UUID: "test-uuid-3",
58+
Name: "test-runner-3",
59+
OwnerID: 0,
60+
RepoID: 0,
61+
Capacity: 1,
62+
}
63+
assert.NoError(t, db.Insert(ctx, runner))
64+
65+
// Update capacity
66+
runner.Capacity = 10
67+
assert.NoError(t, UpdateRunner(ctx, runner, "capacity"))
68+
69+
// Verify update
70+
retrieved, err := GetRunnerByID(ctx, runner.ID)
71+
assert.NoError(t, err)
72+
assert.Equal(t, 10, retrieved.Capacity)
73+
})
74+
75+
t.Run("ZeroCapacity", func(t *testing.T) {
76+
runner := &ActionRunner{
77+
UUID: "test-uuid-4",
78+
Name: "test-runner-4",
79+
OwnerID: 0,
80+
RepoID: 0,
81+
Capacity: 0, // Unlimited
82+
}
83+
assert.NoError(t, db.Insert(ctx, runner))
84+
85+
assert.Equal(t, 0, runner.Capacity)
86+
87+
retrieved, err := GetRunnerByID(ctx, runner.ID)
88+
assert.NoError(t, err)
89+
assert.Equal(t, 0, retrieved.Capacity)
90+
})
91+
}

models/actions/task.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,10 +244,26 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
244244
var job *ActionRunJob
245245
log.Trace("runner labels: %v", runner.AgentLabels)
246246
for _, v := range jobs {
247-
if runner.CanMatchLabels(v.RunsOn) {
248-
job = v
249-
break
247+
if !runner.CanMatchLabels(v.RunsOn) {
248+
continue
250249
}
250+
251+
// Check max-parallel constraint for matrix jobs
252+
if v.MaxParallel > 0 {
253+
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, v.RunID, v.JobID)
254+
if err != nil {
255+
log.Error("Failed to count running jobs for max-parallel check: %v", err)
256+
continue
257+
}
258+
if runningCount >= v.MaxParallel {
259+
log.Debug("Job %s (run %d) skipped: %d/%d jobs already running (max-parallel)",
260+
v.JobID, v.RunID, runningCount, v.MaxParallel)
261+
continue
262+
}
263+
}
264+
265+
job = v
266+
break
251267
}
252268
if job == nil {
253269
return nil, false, nil
@@ -505,3 +521,23 @@ func getTaskIDFromCache(token string) int64 {
505521
}
506522
return t
507523
}
524+
525+
// CountRunningTasksByRunner counts the number of running tasks assigned to a specific runner
526+
func CountRunningTasksByRunner(ctx context.Context, runnerID int64) (int, error) {
527+
count, err := db.GetEngine(ctx).
528+
Where("runner_id = ?", runnerID).
529+
And("status = ?", StatusRunning).
530+
Count(new(ActionTask))
531+
return int(count), err
532+
}
533+
534+
// CountRunningJobsByWorkflowAndRun counts running jobs for a specific workflow/run combo
535+
// Used to enforce max-parallel limits on matrix jobs
536+
func CountRunningJobsByWorkflowAndRun(ctx context.Context, runID int64, jobID string) (int, error) {
537+
count, err := db.GetEngine(ctx).
538+
Where("run_id = ?", runID).
539+
And("job_id = ?", jobID).
540+
And("status = ?", StatusRunning).
541+
Count(new(ActionRunJob))
542+
return int(count), err
543+
}

0 commit comments

Comments
 (0)