Skip to content

Commit 0343e3c

Browse files
authored
feat(process/scheduler): add exclusive scheduler, support "bootstrap" session message (#635)
- **feat(process/scheduler): add exclusive scheduler** - **feat(pkg/session): add "bootstrap" endpoint** Signed-off-by: Gyuho Lee <[email protected]>
1 parent 80a83ac commit 0343e3c

File tree

5 files changed

+387
-2
lines changed

5 files changed

+387
-2
lines changed

pkg/process/runner.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package process
2+
3+
import (
4+
"context"
5+
"errors"
6+
)
7+
8+
var (
9+
ErrProcessAlreadyRunning = errors.New("process already running")
10+
)
11+
12+
// Runner defines the interface for a process runner.
13+
// It facillitates scheduling and running arbitrary bash scripts.
14+
type Runner interface {
15+
// RunUntilCompletion starts a bash script, blocks until it finishes,
16+
// and returns the output and the exit code.
17+
// Whether to return an error when there is already a process running is up to the implementation.
18+
RunUntilCompletion(ctx context.Context, script string) ([]byte, int32, error)
19+
}

pkg/process/runner_exclusive.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package process
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"path/filepath"
8+
"sync"
9+
10+
"github.com/leptonai/gpud/pkg/log"
11+
)
12+
13+
var _ Runner = &exclusiveRunner{}
14+
15+
func NewExclusiveRunner() Runner {
16+
return &exclusiveRunner{}
17+
}
18+
19+
// exclusiveRunner is a scheduler that runs a single process at a time.
20+
// Does not support concurrent execution of multiple processes.
21+
type exclusiveRunner struct {
22+
mu sync.RWMutex
23+
running Process
24+
}
25+
26+
var defaultScriptsDir = filepath.Join(os.TempDir(), "gpud-scripts-runner")
27+
28+
// RunUntilCompletion starts a bash script, blocks until it finishes,
29+
// and returns the output and the exit code.
30+
// If there is already a process running, it returns an error.
31+
func (er *exclusiveRunner) RunUntilCompletion(ctx context.Context, script string) ([]byte, int32, error) {
32+
if er.alreadyRunning() {
33+
return nil, 0, ErrProcessAlreadyRunning
34+
}
35+
36+
// write all stderr + stdout to a temporary file
37+
if err := os.MkdirAll(defaultScriptsDir, 0755); err != nil {
38+
return nil, 0, fmt.Errorf("failed to create temp dir %s: %w", defaultScriptsDir, err)
39+
}
40+
tmpFile, err := os.CreateTemp(defaultScriptsDir, "gpud-process-output-*.txt")
41+
if err != nil {
42+
return nil, 0, err
43+
}
44+
defer os.Remove(tmpFile.Name())
45+
defer tmpFile.Close()
46+
47+
p, err := New(
48+
WithBashScriptContentsToRun(script),
49+
WithOutputFile(tmpFile),
50+
)
51+
if err != nil {
52+
return nil, 0, err
53+
}
54+
55+
if err := p.Start(ctx); err != nil {
56+
return nil, 0, err
57+
}
58+
defer func() {
59+
if err := p.Close(ctx); err != nil {
60+
log.Logger.Errorw("failed to close process", "pid", p.PID(), "error", err)
61+
}
62+
log.Logger.Infow("closed running script", "pid", p.PID())
63+
er.mu.Lock()
64+
er.running = nil
65+
er.mu.Unlock()
66+
}()
67+
log.Logger.Infow("started running script", "pid", p.PID())
68+
69+
er.mu.Lock()
70+
er.running = p
71+
er.mu.Unlock()
72+
73+
select {
74+
case <-ctx.Done():
75+
log.Logger.Warnw("process aborted before completion", "pid", p.PID())
76+
return nil, p.ExitCode(), ctx.Err()
77+
78+
case err := <-p.Wait():
79+
if err != nil {
80+
return nil, p.ExitCode(), err
81+
}
82+
log.Logger.Infow("process exited", "pid", p.PID(), "exitCode", p.ExitCode())
83+
}
84+
85+
output, err := os.ReadFile(tmpFile.Name())
86+
return output, p.ExitCode(), err
87+
}
88+
89+
func (er *exclusiveRunner) alreadyRunning() bool {
90+
er.mu.RLock()
91+
defer er.mu.RUnlock()
92+
93+
return er.running != nil
94+
}
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
package process
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func TestExclusiveRunnerSimple(t *testing.T) {
13+
runner := NewExclusiveRunner()
14+
15+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
16+
defer cancel()
17+
18+
script := `#!/bin/bash
19+
20+
# do not mask errors in a pipeline
21+
set -o pipefail
22+
23+
echo hello
24+
`
25+
26+
out, exitCode, err := runner.RunUntilCompletion(ctx, script)
27+
assert.NoError(t, err)
28+
assert.Equal(t, "hello\n", string(out))
29+
assert.Equal(t, int32(0), exitCode)
30+
}
31+
32+
func TestExclusiveRunnerAbortByProcessExit(t *testing.T) {
33+
runner := NewExclusiveRunner()
34+
35+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
36+
defer cancel()
37+
38+
out, exitCode, err := runner.RunUntilCompletion(ctx, "exit 1")
39+
assert.Error(t, err)
40+
assert.Nil(t, out)
41+
assert.Equal(t, int32(1), exitCode)
42+
}
43+
44+
func TestExclusiveRunnerAbortByContextCancellation(t *testing.T) {
45+
runner := NewExclusiveRunner()
46+
47+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
48+
49+
done := make(chan struct{})
50+
go func() {
51+
defer close(done)
52+
53+
out, _, err := runner.RunUntilCompletion(ctx, "sleep 10")
54+
assert.Error(t, err)
55+
assert.Nil(t, out)
56+
}()
57+
58+
time.Sleep(time.Second)
59+
cancel()
60+
61+
<-done
62+
}
63+
64+
func TestExclusiveRunnerCannotAbortExistingProcess(t *testing.T) {
65+
runner := NewExclusiveRunner()
66+
67+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
68+
defer cancel()
69+
70+
go func() {
71+
_, _, err := runner.RunUntilCompletion(ctx, "sleep 10")
72+
assert.Error(t, err)
73+
}()
74+
75+
// wait for the first process to start
76+
time.Sleep(time.Second)
77+
78+
done := make(chan struct{})
79+
go func() {
80+
_, _, err := runner.RunUntilCompletion(ctx, "echo hello")
81+
assert.True(t, errors.Is(err, ErrProcessAlreadyRunning))
82+
done <- struct{}{}
83+
}()
84+
85+
<-done
86+
}
87+
88+
func TestExclusiveRunnerExecuteComplexScript(t *testing.T) {
89+
runner := NewExclusiveRunner()
90+
91+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
92+
defer cancel()
93+
94+
script := `#!/bin/bash
95+
set -e
96+
set -o pipefail
97+
98+
# Create a temporary file
99+
TEMP_FILE=$(mktemp)
100+
echo "Hello World" > $TEMP_FILE
101+
102+
# Read from the file
103+
CONTENT=$(cat $TEMP_FILE)
104+
105+
# Check content
106+
if [[ "$CONTENT" != "Hello World" ]]; then
107+
exit 1
108+
fi
109+
110+
# Clean up
111+
rm $TEMP_FILE
112+
113+
# Return success
114+
echo "Script executed successfully"
115+
exit 0
116+
`
117+
118+
out, exitCode, err := runner.RunUntilCompletion(ctx, script)
119+
assert.NoError(t, err)
120+
assert.Equal(t, int32(0), exitCode)
121+
assert.Contains(t, string(out), "Script executed successfully")
122+
}
123+
124+
func TestExclusiveRunnerWithScriptErrors(t *testing.T) {
125+
runner := NewExclusiveRunner()
126+
127+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
128+
defer cancel()
129+
130+
// Test various error scenarios
131+
testCases := []struct {
132+
name string
133+
script string
134+
exitCode int32
135+
shouldFail bool
136+
}{
137+
{
138+
name: "Command not found",
139+
script: "nonexistentcommand",
140+
exitCode: 127,
141+
shouldFail: true,
142+
},
143+
{
144+
name: "Syntax error",
145+
script: "if then fi",
146+
exitCode: 2,
147+
shouldFail: true,
148+
},
149+
{
150+
name: "Exit with error code",
151+
script: "exit 42",
152+
exitCode: 42,
153+
shouldFail: true,
154+
},
155+
}
156+
157+
for _, tc := range testCases {
158+
t.Run(tc.name, func(t *testing.T) {
159+
out, exitCode, err := runner.RunUntilCompletion(ctx, tc.script)
160+
if tc.shouldFail {
161+
assert.Error(t, err)
162+
} else {
163+
assert.NoError(t, err)
164+
}
165+
assert.Equal(t, tc.exitCode, exitCode)
166+
// Check that out is either nil or contains something
167+
if tc.shouldFail {
168+
// For some errors, out might be nil
169+
// For others, it might contain error messages
170+
t.Logf("Script output: %s", string(out))
171+
}
172+
})
173+
}
174+
}
175+
176+
func TestExclusiveRunnerMultipleScripts(t *testing.T) {
177+
runner := NewExclusiveRunner()
178+
179+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
180+
defer cancel()
181+
182+
// Run multiple scripts sequentially
183+
scripts := []struct {
184+
script string
185+
expected string
186+
}{
187+
{
188+
script: "echo 'First script'",
189+
expected: "First script\n",
190+
},
191+
{
192+
script: "echo 'Second script'",
193+
expected: "Second script\n",
194+
},
195+
{
196+
script: "echo 'Third script'",
197+
expected: "Third script\n",
198+
},
199+
}
200+
201+
for _, s := range scripts {
202+
out, exitCode, err := runner.RunUntilCompletion(ctx, s.script)
203+
assert.NoError(t, err)
204+
assert.Equal(t, int32(0), exitCode)
205+
assert.Equal(t, s.expected, string(out))
206+
}
207+
}
208+
209+
func TestCountProcessesWithRunningProcess(t *testing.T) {
210+
runner := NewExclusiveRunner()
211+
212+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
213+
defer cancel()
214+
215+
// Start a process in a goroutine
216+
done := make(chan struct{})
217+
go func() {
218+
defer close(done)
219+
// Run a long sleep command to keep the process running
220+
_, _, _ = runner.RunUntilCompletion(ctx, "sleep 5")
221+
}()
222+
223+
// Cancel the context to stop the process
224+
cancel()
225+
<-done
226+
}

0 commit comments

Comments
 (0)