Skip to content

Commit dd64a01

Browse files
[Bug]: Using "files" directive with an SSH Fleet will have dstack-runner consume all ram and hang (#3263)
Co-authored-by: Dmitry Meyer <[email protected]>
1 parent ea555f3 commit dd64a01

File tree

7 files changed

+92
-24
lines changed

7 files changed

+92
-24
lines changed

runner/internal/common/utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func MkdirAll(ctx context.Context, pth string, uid int, gid int) error {
3131
paths := []string{pth}
3232
for {
3333
pth = path.Dir(pth)
34-
if pth == "/" {
34+
if pth == "/" || pth == "." {
3535
break
3636
}
3737
paths = append(paths, pth)

runner/internal/common/utils_test.go

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package common
22

33
import (
4+
"context"
5+
"os"
6+
"path"
47
"testing"
58

69
"github.com/stretchr/testify/require"
@@ -13,9 +16,12 @@ func TestExpandPath_NoPath_NoBase(t *testing.T) {
1316
}
1417

1518
func TestExpandPath_NoPath_RelBase(t *testing.T) {
16-
path, err := ExpandPath("", "repo", "")
17-
require.NoError(t, err)
18-
require.Equal(t, "repo", path)
19+
testCases := []string{"repo", "./repo"}
20+
for _, base := range testCases {
21+
path, err := ExpandPath("", base, "")
22+
require.NoError(t, err)
23+
require.Equal(t, "repo", path)
24+
}
1925
}
2026

2127
func TestExpandPath_NoPath_AbsBase(t *testing.T) {
@@ -25,9 +31,12 @@ func TestExpandPath_NoPath_AbsBase(t *testing.T) {
2531
}
2632

2733
func TestExpandtPath_RelPath_NoBase(t *testing.T) {
28-
path, err := ExpandPath("repo", "", "")
29-
require.NoError(t, err)
30-
require.Equal(t, "repo", path)
34+
testCases := []string{"repo", "./repo"}
35+
for _, pth := range testCases {
36+
path, err := ExpandPath(pth, "", "")
37+
require.NoError(t, err)
38+
require.Equal(t, "repo", path)
39+
}
3140
}
3241

3342
func TestExpandtPath_RelPath_RelBase(t *testing.T) {
@@ -107,3 +116,42 @@ func TestExpandtPath_ErrorTildeUsernameNotSupported_TildeUsernameWithPath(t *tes
107116
require.ErrorContains(t, err, "~username syntax is not supported")
108117
require.Equal(t, "", path)
109118
}
119+
120+
func TestMkdirAll_AbsPath_NotExists(t *testing.T) {
121+
absPath := path.Join(t.TempDir(), "a/b/c")
122+
require.NoDirExists(t, absPath)
123+
err := MkdirAll(context.Background(), absPath, -1, -1)
124+
require.NoError(t, err)
125+
require.DirExists(t, absPath)
126+
}
127+
128+
func TestMkdirAll_AbsPath_Exists(t *testing.T) {
129+
absPath, err := os.Getwd()
130+
require.NoError(t, err)
131+
err = MkdirAll(context.Background(), absPath, -1, -1)
132+
require.NoError(t, err)
133+
require.DirExists(t, absPath)
134+
}
135+
136+
func TestMkdirAll_RelPath_NotExists(t *testing.T) {
137+
cwd := t.TempDir()
138+
os.Chdir(cwd)
139+
relPath := "a/b/c"
140+
absPath := path.Join(cwd, relPath)
141+
require.NoDirExists(t, absPath)
142+
err := MkdirAll(context.Background(), relPath, -1, -1)
143+
require.NoError(t, err)
144+
require.DirExists(t, absPath)
145+
}
146+
147+
func TestMkdirAll_RelPath_Exists(t *testing.T) {
148+
cwd := t.TempDir()
149+
os.Chdir(cwd)
150+
relPath := "a/b/c"
151+
absPath := path.Join(cwd, relPath)
152+
err := os.MkdirAll(absPath, 0o755)
153+
require.NoError(t, err)
154+
err = MkdirAll(context.Background(), relPath, -1, -1)
155+
require.NoError(t, err)
156+
require.DirExists(t, absPath)
157+
}

runner/internal/executor/executor.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,16 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
196196

197197
ex.setJobCredentials(ctx)
198198

199+
if err := ex.setJobWorkingDir(ctx); err != nil {
200+
ex.SetJobStateWithTerminationReason(
201+
ctx,
202+
types.JobStateFailed,
203+
types.TerminationReasonExecutorError,
204+
fmt.Sprintf("Failed to set up the working dir (%s)", err),
205+
)
206+
return fmt.Errorf("prepare job working dir: %w", err)
207+
}
208+
199209
if err := ex.setupRepo(ctx); err != nil {
200210
ex.SetJobStateWithTerminationReason(
201211
ctx,
@@ -216,16 +226,6 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
216226
return fmt.Errorf("setup files: %w", err)
217227
}
218228

219-
if err := ex.prepareJobWorkingDir(ctx); err != nil {
220-
ex.SetJobStateWithTerminationReason(
221-
ctx,
222-
types.JobStateFailed,
223-
types.TerminationReasonExecutorError,
224-
fmt.Sprintf("Failed to set up the working dir (%s)", err),
225-
)
226-
return fmt.Errorf("prepare job working dir: %w", err)
227-
}
228-
229229
cleanupCredentials, err := ex.setupCredentials(ctx)
230230
if err != nil {
231231
ex.SetJobState(ctx, types.JobStateFailed)
@@ -352,7 +352,7 @@ func (ex *RunExecutor) setJobCredentials(ctx context.Context) {
352352
log.Trace(ctx, "Job credentials", "uid", ex.jobUid, "gid", ex.jobGid, "home", ex.jobHomeDir)
353353
}
354354

355-
func (ex *RunExecutor) prepareJobWorkingDir(ctx context.Context) error {
355+
func (ex *RunExecutor) setJobWorkingDir(ctx context.Context) error {
356356
var err error
357357
if ex.jobSpec.WorkingDir == nil {
358358
ex.jobWorkingDir, err = os.Getwd()
@@ -372,9 +372,6 @@ func (ex *RunExecutor) prepareJobWorkingDir(ctx context.Context) error {
372372
}
373373
}
374374
log.Trace(ctx, "Job working dir", "path", ex.jobWorkingDir)
375-
if err := common.MkdirAll(ctx, ex.jobWorkingDir, ex.jobUid, ex.jobGid); err != nil {
376-
return fmt.Errorf("create working directory: %w", err)
377-
}
378375
return nil
379376
}
380377

@@ -429,6 +426,9 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
429426
}
430427
cmd.WaitDelay = ex.killDelay // kills the process if it doesn't exit in time
431428

429+
if err := common.MkdirAll(ctx, ex.jobWorkingDir, ex.jobUid, ex.jobGid); err != nil {
430+
return fmt.Errorf("create working directory: %w", err)
431+
}
432432
cmd.Dir = ex.jobWorkingDir
433433

434434
// User must be already set

runner/internal/executor/executor_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestExecutor_WorkingDir_Set(t *testing.T) {
2828

2929
ex.jobSpec.WorkingDir = &workingDir
3030
ex.jobSpec.Commands = append(ex.jobSpec.Commands, "pwd")
31-
err = ex.prepareJobWorkingDir(context.TODO())
31+
err = ex.setJobWorkingDir(context.TODO())
3232
require.NoError(t, err)
3333
require.Equal(t, workingDir, ex.jobWorkingDir)
3434
err = os.MkdirAll(workingDir, 0o755)
@@ -47,7 +47,7 @@ func TestExecutor_WorkingDir_NotSet(t *testing.T) {
4747
require.NoError(t, err)
4848
ex.jobSpec.WorkingDir = nil
4949
ex.jobSpec.Commands = append(ex.jobSpec.Commands, "pwd")
50-
err = ex.prepareJobWorkingDir(context.TODO())
50+
err = ex.setJobWorkingDir(context.TODO())
5151
require.NoError(t, err)
5252
require.Equal(t, cwd, ex.jobWorkingDir)
5353

@@ -158,7 +158,7 @@ func TestExecutor_RemoteRepo(t *testing.T) {
158158
err := os.WriteFile(ex.codePath, []byte{}, 0o600) // empty diff
159159
require.NoError(t, err)
160160

161-
err = ex.prepareJobWorkingDir(context.TODO())
161+
err = ex.setJobWorkingDir(context.TODO())
162162
require.NoError(t, err)
163163
err = ex.setupRepo(context.TODO())
164164
require.NoError(t, err)
@@ -211,6 +211,7 @@ func makeTestExecutor(t *testing.T) *RunExecutor {
211211
ex, _ := NewRunExecutor(temp, home, 10022)
212212
ex.SetJob(body)
213213
ex.SetCodePath(filepath.Join(baseDir, "code")) // note: create file before run
214+
ex.setJobWorkingDir(context.Background())
214215
return ex
215216
}
216217

runner/internal/executor/files.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package executor
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"os"
89
"path"
10+
"path/filepath"
911
"regexp"
1012

1113
"github.com/codeclysm/extract/v4"
@@ -32,7 +34,14 @@ func (ex *RunExecutor) AddFileArchive(id string, src io.Reader) error {
3234
}
3335

3436
// setupFiles must be called from Run
37+
// ex.jobWorkingDir must be already set
3538
func (ex *RunExecutor) setupFiles(ctx context.Context) error {
39+
if ex.jobWorkingDir == "" {
40+
return errors.New("setup files: working dir is not set")
41+
}
42+
if !filepath.IsAbs(ex.jobWorkingDir) {
43+
return fmt.Errorf("setup files: working dir must be absolute: %s", ex.jobWorkingDir)
44+
}
3645
for _, fa := range ex.jobSpec.FileArchives {
3746
archivePath := path.Join(ex.archiveDir, fa.Id)
3847
if err := extractFileArchive(ctx, archivePath, fa.Path, ex.jobWorkingDir, ex.jobUid, ex.jobGid, ex.jobHomeDir); err != nil {

runner/internal/executor/repo.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,15 @@ import (
1515
)
1616

1717
// setupRepo must be called from Run
18+
// ex.jobWorkingDir must be already set
1819
// TODO: change ownership to uid:gid
1920
func (ex *RunExecutor) setupRepo(ctx context.Context) error {
21+
if ex.jobWorkingDir == "" {
22+
return errors.New("setup repo: working dir is not set")
23+
}
24+
if !filepath.IsAbs(ex.jobWorkingDir) {
25+
return fmt.Errorf("setup repo: working dir must be absolute: %s", ex.jobWorkingDir)
26+
}
2027
if ex.jobSpec.RepoDir == nil {
2128
return errors.New("repo_dir is not set")
2229
}

runner/internal/runner/api/http.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func (s *Server) uploadArchivePostHandler(w http.ResponseWriter, r *http.Request
8686
return nil, &api.Error{Status: http.StatusBadRequest, Msg: "missing boundary"}
8787
}
8888

89+
r.Body = http.MaxBytesReader(w, r.Body, math.MaxInt64)
8990
formReader := multipart.NewReader(r.Body, boundary)
9091
part, err := formReader.NextPart()
9192
if errors.Is(err, io.EOF) {
@@ -94,6 +95,8 @@ func (s *Server) uploadArchivePostHandler(w http.ResponseWriter, r *http.Request
9495
if err != nil {
9596
return nil, fmt.Errorf("read multipart form: %w", err)
9697
}
98+
defer part.Close()
99+
97100
fieldName := part.FormName()
98101
if fieldName == "" {
99102
return nil, &api.Error{Status: http.StatusBadRequest, Msg: "missing field name"}

0 commit comments

Comments
 (0)