Skip to content

Commit a68fb3f

Browse files
fix: process pool concurrency (#1358)
* remove telemetry from bindings * optimise process pool * optimise test cases * fix test cases * revert telemetry related changes
1 parent ec87b76 commit a68fb3f

File tree

7 files changed

+121
-90
lines changed

7 files changed

+121
-90
lines changed

foundation/application.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ func (r *Application) Context() context.Context {
128128
return r.ctx
129129
}
130130

131+
// GetJson get the JSON implementation.
132+
// DEPRECATED, use Json instead.
131133
func (r *Application) GetJson() foundation.Json {
132134
return r.json
133135
}

process/pool.go

Lines changed: 41 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,26 @@ func (r *PoolBuilder) WithSpinner(message ...string) contractsprocess.PoolBuilde
7979
return r
8080
}
8181

82-
type job struct {
83-
id int
84-
command *PoolCommand
85-
}
86-
8782
type result struct {
8883
key string
8984
res contractsprocess.Result
9085
}
9186

92-
// start initiates the execution of all configured commands concurrently but does not wait for them to complete.
93-
// It orchestrates a pool of worker goroutines to process commands up to the specified concurrency limit.
87+
// start initiates the execution of all configured commands concurrently based on the
88+
// concurrency limit, but does not wait for them to complete.
9489
//
95-
// This method is non-blocking. It returns a RunningPool instance immediately, which can be used to
96-
// wait for the completion of all processes and retrieve their results.
90+
// This method is non-blocking. It returns a RunningPool instance immediately, which
91+
// can be used to wait for the completion of all processes and retrieve their results.
9792
//
9893
// The core concurrency pattern is as follows:
9994
// 1. A job channel (`jobCh`) distributes commands to a pool of worker goroutines.
100-
// 2. A result channel (`resultCh`) collects the outcome of each command from a dedicated waiter goroutine.
101-
// 3. A separate "collector" goroutine safely populates the final results map from the result channel.
102-
// 4. WaitGroups synchronize the completion of all workers and the collection of all results
103-
// before the entire operation is marked as "done".
95+
// 2. Workers pick up a job, start the process, and wait synchronously for it to finish.
96+
// This synchronous wait ensures the concurrency limit is strictly respected.
97+
// 3. A result channel (`resultCh`) collects the outcome (success/failure) of each command.
98+
// 4. A separate "collector" goroutine safely populates the RunningPool's internal map
99+
// from the result channel to avoid concurrent map write panics.
100+
// 5. A background orchestrator waits for all workers and results to finish, then
101+
// cleanly closes resources and signals the `done` channel.
104102
func (r *PoolBuilder) start(configurer func(contractsprocess.Pool)) (contractsprocess.RunningPool, error) {
105103
if configurer == nil {
106104
return nil, errors.ProcessPoolNilConfigurer
@@ -118,25 +116,28 @@ func (r *PoolBuilder) start(configurer func(contractsprocess.Pool)) (contractspr
118116
var cancel context.CancelFunc
119117
if r.timeout > 0 {
120118
ctx, cancel = context.WithTimeout(ctx, r.timeout)
119+
} else {
120+
ctx, cancel = context.WithCancel(ctx)
121121
}
122122

123-
concurrency := r.concurrency
124-
if concurrency <= 0 || concurrency > len(commands) {
125-
concurrency = len(commands)
126-
}
127-
128-
jobCh := make(chan job, len(commands))
123+
jobCh := make(chan *PoolCommand, len(commands))
129124
resultCh := make(chan result, len(commands))
130125
done := make(chan struct{})
131126

132-
results := make(map[string]contractsprocess.Result, len(commands))
133-
runningProcesses := make([]contractsprocess.Running, len(commands))
134127
keys := make([]string, len(commands))
128+
for i, cmd := range commands {
129+
keys[i] = cmd.key
130+
}
131+
132+
runningPool := NewRunningPool(ctx, cancel, keys, done, r.loading, r.loadingMessage)
133+
134+
concurrency := r.concurrency
135+
if concurrency <= 0 || concurrency > len(commands) {
136+
concurrency = len(commands)
137+
}
135138

136139
var resultsWg sync.WaitGroup
137140
var workersWg sync.WaitGroup
138-
var startsWg sync.WaitGroup
139-
var mu sync.Mutex
140141

141142
// The results collector goroutine centralizes writing to the results map
142143
// to avoid race conditions, as map writes are not concurrent-safe.
@@ -145,9 +146,7 @@ func (r *PoolBuilder) start(configurer func(contractsprocess.Pool)) (contractspr
145146
go func() {
146147
for i := 0; i < len(commands); i++ {
147148
rc := <-resultCh
148-
mu.Lock()
149-
results[rc.key] = rc.res
150-
mu.Unlock()
149+
runningPool.setResult(rc.key, rc.res)
151150
resultsWg.Done()
152151
}
153152
}()
@@ -156,8 +155,16 @@ func (r *PoolBuilder) start(configurer func(contractsprocess.Pool)) (contractspr
156155
workersWg.Add(1)
157156
go func() {
158157
defer workersWg.Done()
159-
for currentJob := range jobCh {
160-
command := currentJob.command
158+
for command := range jobCh {
159+
if ctx.Err() != nil {
160+
// If the pool was stopped (Stop() called or timeout reached), we skip execution.
161+
// We must still send a result to ensure resultsWg decrements correctly.
162+
resultCh <- result{
163+
key: command.key,
164+
res: NewResult(ctx.Err(), -1, "", "", ""),
165+
}
166+
continue
167+
}
161168
cmdCtx := command.ctx
162169
if cmdCtx == nil {
163170
cmdCtx = ctx
@@ -184,36 +191,19 @@ func (r *PoolBuilder) start(configurer func(contractsprocess.Pool)) (contractspr
184191
if err != nil {
185192
resultCh <- result{key: command.key, res: NewResult(err, -1, command.name, "", "")}
186193
} else {
187-
mu.Lock()
188-
runningProcesses[currentJob.id] = run
189-
mu.Unlock()
190-
191-
// Launch a dedicated goroutine to wait for the process to finish.
192-
// This prevents the worker from being blocked by a long-running process
193-
// and allows it to immediately pick up the next job from jobCh.
194-
go func(p contractsprocess.Running, k string) {
195-
res := p.Wait()
196-
resultCh <- result{key: k, res: res}
197-
}(run, command.key)
194+
runningPool.setProcess(command.key, run)
195+
res := run.Wait()
196+
resultCh <- result{key: command.key, res: res}
198197
}
199-
200-
// Signal that this process has completed its start attempt
201-
startsWg.Done()
202198
}
203199
}()
204200
}
205201

206-
startsWg.Add(len(commands))
207-
for i, command := range commands {
208-
keys[i] = command.key
209-
jobCh <- job{id: i, command: command}
202+
for _, command := range commands {
203+
jobCh <- command
210204
}
211205
close(jobCh)
212206

213-
// Wait for all processes to complete their start attempts before returning.
214-
// This ensures the runningProcesses slice is fully populated and safe to access.
215-
startsWg.Wait()
216-
217207
// This goroutine orchestrates the clean shutdown. It waits for all workers
218208
// to finish processing jobs, then waits for all results to be collected.
219209
// Finally, it cancels the context (if a timeout was set) and signals
@@ -227,7 +217,7 @@ func (r *PoolBuilder) start(configurer func(contractsprocess.Pool)) (contractspr
227217
close(done)
228218
}()
229219

230-
return NewRunningPool(ctx, runningProcesses, keys, cancel, results, done, r.loading, r.loadingMessage), nil
220+
return runningPool, nil
231221
}
232222

233223
type Pool struct {

process/pool_unix_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,12 +259,12 @@ func TestPool_Concurrency_Unix(t *testing.T) {
259259

260260
func TestPool_OnOutput_Unix(t *testing.T) {
261261
t.Run("captures output via callback", func(t *testing.T) {
262-
mx := sync.Mutex{}
262+
var mu sync.Mutex
263263
outputs := make(map[string][]string)
264264
builder := NewPool().OnOutput(func(typ contractsprocess.OutputType, line []byte, key string) {
265-
mx.Lock()
265+
mu.Lock()
266266
outputs[key] = append(outputs[key], string(line))
267-
mx.Unlock()
267+
mu.Unlock()
268268
})
269269

270270
rp, err := builder.Pool(func(p contractsprocess.Pool) {
@@ -285,10 +285,13 @@ func TestPool_OnOutput_Unix(t *testing.T) {
285285
})
286286

287287
t.Run("distinguishes stdout and stderr", func(t *testing.T) {
288+
var mu sync.Mutex
288289
stdoutLines := make(map[string][]string)
289290
stderrLines := make(map[string][]string)
290291

291292
builder := NewPool().OnOutput(func(typ contractsprocess.OutputType, line []byte, key string) {
293+
mu.Lock()
294+
defer mu.Unlock()
292295
if typ == contractsprocess.OutputTypeStdout {
293296
stdoutLines[key] = append(stdoutLines[key], string(line))
294297
} else {

process/pool_windows_test.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"os"
99
"strings"
10+
"sync"
1011
"testing"
1112
"time"
1213

@@ -257,11 +258,13 @@ func TestPool_Concurrency_Windows(t *testing.T) {
257258
}
258259

259260
func TestPool_OnOutput_Windows(t *testing.T) {
260-
// TODO: fix sync issue when writing concurrently
261261
t.Run("captures output via callback", func(t *testing.T) {
262+
var mu sync.Mutex
262263
outputs := make(map[string][]string)
263264
builder := NewPool().OnOutput(func(typ contractsprocess.OutputType, line []byte, key string) {
265+
mu.Lock()
264266
outputs[key] = append(outputs[key], string(line))
267+
mu.Unlock()
265268
})
266269

267270
rp, err := builder.Pool(func(p contractsprocess.Pool) {
@@ -282,10 +285,13 @@ func TestPool_OnOutput_Windows(t *testing.T) {
282285
})
283286

284287
t.Run("distinguishes stdout and stderr", func(t *testing.T) {
288+
var mu sync.Mutex
285289
stdoutLines := make(map[string][]string)
286290
stderrLines := make(map[string][]string)
287291

288292
builder := NewPool().OnOutput(func(typ contractsprocess.OutputType, line []byte, key string) {
293+
mu.Lock()
294+
defer mu.Unlock()
289295
if typ == contractsprocess.OutputTypeStdout {
290296
stdoutLines[key] = append(stdoutLines[key], string(line))
291297
} else {
@@ -414,19 +420,17 @@ func TestPool_SignalHandling_Windows(t *testing.T) {
414420
t.Run("forwards signals to child processes", func(t *testing.T) {
415421
builder := NewPool()
416422
rp, err := builder.Pool(func(p contractsprocess.Pool) {
417-
// Set up a PowerShell script that can handle CTRL_BREAK_EVENT (mapped from os.Interrupt)
418-
p.Command("powershell", "-Command", "$global:interrupted = $false; [console]::TreatControlCAsInput = $true; $handler = [Console]::CancelKeyPress; [Console]::CancelKeyPress = { $global:interrupted = $true; Write-Output 'caught'; $_.Cancel = $true }; Start-Sleep -Seconds 5; if ($global:interrupted) { exit 0 } else { exit 1 }").As("trap")
423+
// Start a long sleep that would normally take 10 seconds
424+
p.Command("powershell", "-Command", "Start-Sleep -Seconds 10").As("long-running")
419425
}).Start()
420426
assert.NoError(t, err)
421427

422-
time.Sleep(100 * time.Millisecond)
423-
// Send os.Interrupt which should be mapped to CTRL_BREAK_EVENT on Windows
424-
err = rp.Signal(os.Interrupt)
425-
// We don't assert on the error because Windows process signal behavior can be inconsistent
428+
time.Sleep(1 * time.Second)
429+
err = rp.Signal(os.Kill)
430+
assert.NoError(t, err)
426431

427432
results := rp.Wait()
428-
// The important thing is that the process completes
429-
assert.Contains(t, results, "trap")
433+
assert.True(t, results["long-running"].Failed())
430434
})
431435
}
432436

process/running_pool.go

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package process
33
import (
44
"context"
55
"os"
6+
"sync"
67
"time"
78

89
contractsprocess "github.com/goravel/framework/contracts/process"
@@ -11,46 +12,50 @@ import (
1112
var _ contractsprocess.RunningPool = (*RunningPool)(nil)
1213

1314
type RunningPool struct {
15+
mu sync.RWMutex
1416
ctx context.Context
15-
running []contractsprocess.Running
16-
keys []string
1717
cancel context.CancelFunc
18+
done chan struct{}
19+
processes map[string]contractsprocess.Running
20+
results map[string]contractsprocess.Result
21+
keys []string
1822
loading bool
1923
loadingMessage string
20-
results map[string]contractsprocess.Result
21-
done chan struct{}
2224
}
2325

2426
func NewRunningPool(
2527
ctx context.Context,
26-
running []contractsprocess.Running,
27-
keys []string,
2828
cancel context.CancelFunc,
29-
results map[string]contractsprocess.Result,
29+
keys []string,
3030
done chan struct{},
3131
loading bool,
3232
loadingMessage string,
3333
) *RunningPool {
34+
processes := make(map[string]contractsprocess.Running, len(keys))
35+
results := make(map[string]contractsprocess.Result, len(keys))
36+
3437
return &RunningPool{
3538
ctx: ctx,
36-
running: running,
37-
keys: keys,
3839
cancel: cancel,
40+
keys: keys,
41+
done: done,
42+
processes: processes,
43+
results: results,
3944
loading: loading,
4045
loadingMessage: loadingMessage,
41-
results: results,
42-
done: done,
4346
}
4447
}
4548

4649
func (r *RunningPool) PIDs() map[string]int {
47-
m := make(map[string]int, len(r.running))
48-
for i, proc := range r.running {
50+
r.mu.RLock()
51+
defer r.mu.RUnlock()
52+
m := make(map[string]int, len(r.keys))
53+
for _, key := range r.keys {
4954
pid := 0
50-
if proc != nil {
55+
if proc, ok := r.processes[key]; ok {
5156
pid = proc.PID()
5257
}
53-
m[r.keys[i]] = pid
58+
m[key] = pid
5459
}
5560
return m
5661
}
@@ -69,18 +74,27 @@ func (r *RunningPool) Done() <-chan struct{} {
6974
}
7075

7176
func (r *RunningPool) Wait() map[string]contractsprocess.Result {
72-
if err := r.spinner(func() error {
77+
_ = r.spinner(func() error {
7378
<-r.Done()
7479
return nil
75-
}); err != nil {
76-
return r.results
77-
}
80+
})
81+
82+
r.mu.RLock()
83+
defer r.mu.RUnlock()
84+
7885
return r.results
7986
}
8087

8188
func (r *RunningPool) Stop(timeout time.Duration, sig ...os.Signal) error {
89+
if r.cancel != nil {
90+
r.cancel()
91+
}
92+
93+
r.mu.RLock()
94+
defer r.mu.RUnlock()
95+
8296
var firstErr error
83-
for _, proc := range r.running {
97+
for _, proc := range r.processes {
8498
if proc == nil {
8599
continue
86100
}
@@ -92,8 +106,10 @@ func (r *RunningPool) Stop(timeout time.Duration, sig ...os.Signal) error {
92106
}
93107

94108
func (r *RunningPool) Signal(sig os.Signal) error {
109+
r.mu.RLock()
110+
defer r.mu.RUnlock()
95111
var firstErr error
96-
for _, proc := range r.running {
112+
for _, proc := range r.processes {
97113
if proc == nil {
98114
continue
99115
}
@@ -104,6 +120,18 @@ func (r *RunningPool) Signal(sig os.Signal) error {
104120
return firstErr
105121
}
106122

123+
func (r *RunningPool) setProcess(key string, proc contractsprocess.Running) {
124+
r.mu.Lock()
125+
defer r.mu.Unlock()
126+
r.processes[key] = proc
127+
}
128+
129+
func (r *RunningPool) setResult(key string, res contractsprocess.Result) {
130+
r.mu.Lock()
131+
defer r.mu.Unlock()
132+
r.results[key] = res
133+
}
134+
107135
func (r *RunningPool) spinner(fn func() error) error {
108136
loadingMessage := r.loadingMessage
109137
if loadingMessage == "" {

0 commit comments

Comments
 (0)