Skip to content

Commit 709f513

Browse files
committed
Test parallel downloads
Add sub tests for parallel downloads with "with cache" and "caching-only mode". The test run 20 concurrent downloads with the current test http server and ensure that downloads succeeded. To make it possible to test using the same process, we include now a counter in the perProcessTempfile. The file name is now: {filename}.pid.{count} The test revealed an issue with clonefile, not seen when testing 3 parallel `limactl create`. When the data file is replaced during a clonefile syscall, it may fail with: clonefile failed: no such file or directory This smells like a darwin bug, but we can avoid this issue by using the first download result. When a download finishes, we check if data file exists, and return success if it does. We take a lock for the very short time needed to check and rename the temporary data file to the target file. The parallel tests failed on windows - it seems that os.Rename() does not work on windows if the target exist. Replacing atomicWrite with a simpler version that takes a lock and write the file fixes the issue. Tested using: % go test ./pkg/downloader -run 'TestDownloadRemote/with_cache/parallel' -count=1000 ok github.com/lima-vm/lima/pkg/downloader 116.025s % go test ./pkg/downloader -run 'TestDownloadRemote/caching-only_mode/parallel' -count=1000 ok github.com/lima-vm/lima/pkg/downloader 98.535s Signed-off-by: Nir Soffer <[email protected]>
1 parent 713e1cd commit 709f513

File tree

2 files changed

+129
-61
lines changed

2 files changed

+129
-61
lines changed

pkg/downloader/downloader.go

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@ import (
1212
"os/exec"
1313
"path"
1414
"path/filepath"
15-
"strconv"
1615
"strings"
16+
"sync/atomic"
1717
"time"
1818

1919
"github.com/cheggaaa/pb/v3"
2020
"github.com/containerd/continuity/fs"
2121
"github.com/lima-vm/lima/pkg/httpclientutil"
2222
"github.com/lima-vm/lima/pkg/localpathutil"
23+
"github.com/lima-vm/lima/pkg/lockutil"
2324
"github.com/lima-vm/lima/pkg/progressbar"
2425
"github.com/opencontainers/go-digest"
2526
"github.com/sirupsen/logrus"
@@ -267,21 +268,21 @@ func Download(ctx context.Context, local, remote string, opts ...Opt) (*Result,
267268
return nil, err
268269
}
269270
shadURL := filepath.Join(shad, "url")
270-
if err := atomicWrite(shadURL, []byte(remote), 0o644); err != nil {
271+
if err := writeFirst(shadURL, []byte(remote), 0o644); err != nil {
271272
return nil, err
272273
}
273274
if err := downloadHTTP(ctx, shadData, shadTime, shadType, remote, o.description, o.expectedDigest); err != nil {
274275
return nil, err
275276
}
276-
// no need to pass the digest to copyLocal(), as we already verified the digest
277-
if err := copyLocal(ctx, localPath, shadData, ext, o.decompress, "", ""); err != nil {
278-
return nil, err
279-
}
280277
if shadDigest != "" && o.expectedDigest != "" {
281-
if err := atomicWrite(shadDigest, []byte(o.expectedDigest.String()), 0o644); err != nil {
278+
if err := writeFirst(shadDigest, []byte(o.expectedDigest.String()), 0o644); err != nil {
282279
return nil, err
283280
}
284281
}
282+
// no need to pass the digest to copyLocal(), as we already verified the digest
283+
if err := copyLocal(ctx, localPath, shadData, ext, o.decompress, "", ""); err != nil {
284+
return nil, err
285+
}
285286
res := &Result{
286287
Status: StatusDownloaded,
287288
CachePath: shadData,
@@ -605,13 +606,13 @@ func downloadHTTP(ctx context.Context, localPath, lastModified, contentType, url
605606
}
606607
if lastModified != "" {
607608
lm := resp.Header.Get("Last-Modified")
608-
if err := atomicWrite(lastModified, []byte(lm), 0o644); err != nil {
609+
if err := writeFirst(lastModified, []byte(lm), 0o644); err != nil {
609610
return err
610611
}
611612
}
612613
if contentType != "" {
613614
ct := resp.Header.Get("Content-Type")
614-
if err := atomicWrite(contentType, []byte(ct), 0o644); err != nil {
615+
if err := writeFirst(contentType, []byte(ct), 0o644); err != nil {
615616
return err
616617
}
617618
}
@@ -672,43 +673,43 @@ func downloadHTTP(ctx context.Context, localPath, lastModified, contentType, url
672673
return err
673674
}
674675

675-
return os.Rename(localPathTmp, localPath)
676+
// If localPath was created by a parallel download keep it. Replacing it
677+
// while another process is copying it to the destination may fail the
678+
// clonefile syscall. We use a lock to ensure that only one process updates
679+
// data, and when we return data file exists.
680+
681+
return lockutil.WithDirLock(filepath.Dir(localPath), func() error {
682+
if _, err := os.Stat(localPath); err == nil {
683+
return nil
684+
} else if !errors.Is(err, os.ErrNotExist) {
685+
return err
686+
}
687+
return os.Rename(localPathTmp, localPath)
688+
})
676689
}
677690

691+
var tempfileCount atomic.Uint64
692+
678693
// To allow parallel download we use a per-process unique suffix for tempoary
679694
// files. Renaming the temporary file to the final file is safe without
680695
// synchronization on posix.
696+
// To make it easy to test we also include a counter ensuring that each
697+
// temporary file is unique in the same process.
681698
// https://github.com/lima-vm/lima/issues/2722
682699
func perProcessTempfile(path string) string {
683-
return path + ".tmp." + strconv.FormatInt(int64(os.Getpid()), 10)
700+
return fmt.Sprintf("%s.tmp.%d.%d", path, os.Getpid(), tempfileCount.Add(1))
684701
}
685702

686-
// atomicWrite writes data to path, creating a new file or replacing existing
687-
// one. Multiple processess can write to the same path safely. Safe on posix and
688-
// likely safe on windows when using NTFS.
689-
func atomicWrite(path string, data []byte, perm os.FileMode) error {
690-
tmpPath := perProcessTempfile(path)
691-
tmp, err := os.OpenFile(tmpPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
692-
if err != nil {
693-
return err
694-
}
695-
defer func() {
696-
if err != nil {
697-
tmp.Close()
698-
os.RemoveAll(tmpPath)
703+
// writeFirst writes data to path unless path already exists.
704+
func writeFirst(path string, data []byte, perm os.FileMode) error {
705+
return lockutil.WithDirLock(filepath.Dir(path), func() error {
706+
if _, err := os.Stat(path); err == nil {
707+
return nil
708+
} else if !errors.Is(err, os.ErrNotExist) {
709+
return err
699710
}
700-
}()
701-
if _, err = tmp.Write(data); err != nil {
702-
return err
703-
}
704-
if err = tmp.Sync(); err != nil {
705-
return err
706-
}
707-
if err = tmp.Close(); err != nil {
708-
return err
709-
}
710-
err = os.Rename(tmpPath, path)
711-
return err
711+
return os.WriteFile(path, data, perm)
712+
})
712713
}
713714

714715
// CacheEntries returns a map of cache entries.

pkg/downloader/downloader_test.go

Lines changed: 92 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os/exec"
99
"path/filepath"
1010
"runtime"
11+
"slices"
1112
"strings"
1213
"testing"
1314
"time"
@@ -21,6 +22,20 @@ func TestMain(m *testing.M) {
2122
os.Exit(m.Run())
2223
}
2324

25+
type downloadResult struct {
26+
r *Result
27+
err error
28+
}
29+
30+
// We expect only few parallel downloads. Testing with larger number to find
31+
// races quicker. 20 parallel downloads take about 120 milliseocnds on M1 Pro.
32+
const parallelDownloads = 20
33+
34+
// When downloading in parallel usually all downloads completed with
35+
// StatusDownload, but some may be delayed and find the data file when they
36+
// start. Can be reproduced locally using 100 parallel downloads.
37+
var parallelStatus = []Status{StatusDownloaded, StatusUsedCache}
38+
2439
func TestDownloadRemote(t *testing.T) {
2540
ts := httptest.NewServer(http.FileServer(http.Dir("testdata")))
2641
t.Cleanup(ts.Close)
@@ -57,38 +72,90 @@ func TestDownloadRemote(t *testing.T) {
5772
})
5873
})
5974
t.Run("with cache", func(t *testing.T) {
60-
cacheDir := filepath.Join(t.TempDir(), "cache")
61-
localPath := filepath.Join(t.TempDir(), t.Name())
62-
r, err := Download(context.Background(), localPath, dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
63-
assert.NilError(t, err)
64-
assert.Equal(t, StatusDownloaded, r.Status)
75+
t.Run("serial", func(t *testing.T) {
76+
cacheDir := filepath.Join(t.TempDir(), "cache")
77+
localPath := filepath.Join(t.TempDir(), t.Name())
78+
r, err := Download(context.Background(), localPath, dummyRemoteFileURL,
79+
WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
80+
assert.NilError(t, err)
81+
assert.Equal(t, StatusDownloaded, r.Status)
6582

66-
r, err = Download(context.Background(), localPath, dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
67-
assert.NilError(t, err)
68-
assert.Equal(t, StatusSkipped, r.Status)
83+
r, err = Download(context.Background(), localPath, dummyRemoteFileURL,
84+
WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
85+
assert.NilError(t, err)
86+
assert.Equal(t, StatusSkipped, r.Status)
6987

70-
localPath2 := localPath + "-2"
71-
r, err = Download(context.Background(), localPath2, dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
72-
assert.NilError(t, err)
73-
assert.Equal(t, StatusUsedCache, r.Status)
88+
localPath2 := localPath + "-2"
89+
r, err = Download(context.Background(), localPath2, dummyRemoteFileURL,
90+
WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
91+
assert.NilError(t, err)
92+
assert.Equal(t, StatusUsedCache, r.Status)
93+
})
94+
t.Run("parallel", func(t *testing.T) {
95+
cacheDir := filepath.Join(t.TempDir(), "cache")
96+
results := make(chan downloadResult, parallelDownloads)
97+
for i := 0; i < parallelDownloads; i++ {
98+
go func() {
99+
// Parallel download is supported only for different instances with unique localPath.
100+
localPath := filepath.Join(t.TempDir(), t.Name())
101+
r, err := Download(context.Background(), localPath, dummyRemoteFileURL,
102+
WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
103+
results <- downloadResult{r, err}
104+
}()
105+
}
106+
// We must process all results before cleanup.
107+
for i := 0; i < parallelDownloads; i++ {
108+
result := <-results
109+
if result.err != nil {
110+
t.Errorf("Download failed: %s", result.err)
111+
} else if !slices.Contains(parallelStatus, result.r.Status) {
112+
t.Errorf("Expected download status %s, got %s", parallelStatus, result.r.Status)
113+
}
114+
}
115+
})
74116
})
75117
t.Run("caching-only mode", func(t *testing.T) {
76-
_, err := Download(context.Background(), "", dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest))
77-
assert.ErrorContains(t, err, "cache directory to be specified")
118+
t.Run("serial", func(t *testing.T) {
119+
_, err := Download(context.Background(), "", dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest))
120+
assert.ErrorContains(t, err, "cache directory to be specified")
78121

79-
cacheDir := filepath.Join(t.TempDir(), "cache")
80-
r, err := Download(context.Background(), "", dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
81-
assert.NilError(t, err)
82-
assert.Equal(t, StatusDownloaded, r.Status)
122+
cacheDir := filepath.Join(t.TempDir(), "cache")
123+
r, err := Download(context.Background(), "", dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest),
124+
WithCacheDir(cacheDir))
125+
assert.NilError(t, err)
126+
assert.Equal(t, StatusDownloaded, r.Status)
83127

84-
r, err = Download(context.Background(), "", dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
85-
assert.NilError(t, err)
86-
assert.Equal(t, StatusUsedCache, r.Status)
128+
r, err = Download(context.Background(), "", dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest),
129+
WithCacheDir(cacheDir))
130+
assert.NilError(t, err)
131+
assert.Equal(t, StatusUsedCache, r.Status)
87132

88-
localPath := filepath.Join(t.TempDir(), t.Name())
89-
r, err = Download(context.Background(), localPath, dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
90-
assert.NilError(t, err)
91-
assert.Equal(t, StatusUsedCache, r.Status)
133+
localPath := filepath.Join(t.TempDir(), t.Name())
134+
r, err = Download(context.Background(), localPath, dummyRemoteFileURL,
135+
WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
136+
assert.NilError(t, err)
137+
assert.Equal(t, StatusUsedCache, r.Status)
138+
})
139+
t.Run("parallel", func(t *testing.T) {
140+
cacheDir := filepath.Join(t.TempDir(), "cache")
141+
results := make(chan downloadResult, parallelDownloads)
142+
for i := 0; i < parallelDownloads; i++ {
143+
go func() {
144+
r, err := Download(context.Background(), "", dummyRemoteFileURL,
145+
WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
146+
results <- downloadResult{r, err}
147+
}()
148+
}
149+
// We must process all results before cleanup.
150+
for i := 0; i < parallelDownloads; i++ {
151+
result := <-results
152+
if result.err != nil {
153+
t.Errorf("Download failed: %s", result.err)
154+
} else if !slices.Contains(parallelStatus, result.r.Status) {
155+
t.Errorf("Expected download status %s, got %s", parallelStatus, result.r.Status)
156+
}
157+
}
158+
})
92159
})
93160
t.Run("cached", func(t *testing.T) {
94161
_, err := Cached(dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest))

0 commit comments

Comments
 (0)