Skip to content

Commit 46f6233

Browse files
author
junoberryferry
committed
use archiveasync to prevent memory exhaustion
1 parent 6257543 commit 46f6233

File tree

1 file changed

+95
-62
lines changed

1 file changed

+95
-62
lines changed

modules/dump/dumper.go

Lines changed: 95 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,76 @@ func IsSubdir(upper, lower string) (bool, error) {
6464
type Dumper struct {
6565
format string
6666
output io.Writer
67-
files []archives.FileInfo
67+
jobs chan archives.ArchiveAsyncJob
68+
done chan error
6869
Verbose bool
6970

7071
globalExcludeAbsPaths []string
7172
}
7273

7374
func NewDumper(format string, output io.Writer) *Dumper {
74-
return &Dumper{
75+
d := &Dumper{
7576
format: format,
7677
output: output,
77-
files: make([]archives.FileInfo, 0),
78+
jobs: make(chan archives.ArchiveAsyncJob, 100),
79+
done: make(chan error, 1),
7880
}
81+
d.startArchiver()
82+
return d
83+
}
84+
85+
func (dumper *Dumper) startArchiver() {
86+
go func() {
87+
ctx := context.Background()
88+
var err error
89+
90+
switch dumper.format {
91+
case "zip":
92+
err = archives.Zip{}.ArchiveAsync(ctx, dumper.output, dumper.jobs)
93+
case "tar":
94+
err = archives.Tar{}.ArchiveAsync(ctx, dumper.output, dumper.jobs)
95+
case "tar.gz":
96+
comp := archives.CompressedArchive{
97+
Compression: archives.Gz{},
98+
Archival: archives.Tar{},
99+
}
100+
err = comp.ArchiveAsync(ctx, dumper.output, dumper.jobs)
101+
case "tar.xz":
102+
comp := archives.CompressedArchive{
103+
Compression: archives.Xz{},
104+
Archival: archives.Tar{},
105+
}
106+
err = comp.ArchiveAsync(ctx, dumper.output, dumper.jobs)
107+
case "tar.bz2":
108+
comp := archives.CompressedArchive{
109+
Compression: archives.Bz2{},
110+
Archival: archives.Tar{},
111+
}
112+
err = comp.ArchiveAsync(ctx, dumper.output, dumper.jobs)
113+
case "tar.br":
114+
comp := archives.CompressedArchive{
115+
Compression: archives.Brotli{},
116+
Archival: archives.Tar{},
117+
}
118+
err = comp.ArchiveAsync(ctx, dumper.output, dumper.jobs)
119+
case "tar.lz4":
120+
comp := archives.CompressedArchive{
121+
Compression: archives.Lz4{},
122+
Archival: archives.Tar{},
123+
}
124+
err = comp.ArchiveAsync(ctx, dumper.output, dumper.jobs)
125+
case "tar.zst":
126+
comp := archives.CompressedArchive{
127+
Compression: archives.Zstd{},
128+
Archival: archives.Tar{},
129+
}
130+
err = comp.ArchiveAsync(ctx, dumper.output, dumper.jobs)
131+
default:
132+
err = fmt.Errorf("unsupported format: %s", dumper.format)
133+
}
134+
135+
dumper.done <- err
136+
}()
79137
}
80138

81139
// AddFilePath adds a file by its filesystem path
@@ -89,28 +147,37 @@ func (dumper *Dumper) AddFilePath(filePath, absPath string) error {
89147
return err
90148
}
91149

150+
var archiveFileInfo archives.FileInfo
92151
if fileInfo.IsDir() {
93-
archiveFileInfo := archives.FileInfo{
152+
archiveFileInfo = archives.FileInfo{
94153
FileInfo: fileInfo,
95154
NameInArchive: filePath,
96155
Open: func() (fs.File, error) {
97156
return &emptyDirFile{info: fileInfo}, nil
98157
},
99158
}
100-
dumper.files = append(dumper.files, archiveFileInfo)
101-
return nil
159+
} else {
160+
archiveFileInfo = archives.FileInfo{
161+
FileInfo: fileInfo,
162+
NameInArchive: filePath,
163+
Open: func() (fs.File, error) {
164+
return os.Open(absPath)
165+
},
166+
}
102167
}
103168

104-
archiveFileInfo := archives.FileInfo{
105-
FileInfo: fileInfo,
106-
NameInArchive: filePath,
107-
Open: func() (fs.File, error) {
108-
return os.Open(absPath)
109-
},
169+
resultChan := make(chan error, 1)
170+
job := archives.ArchiveAsyncJob{
171+
File: archiveFileInfo,
172+
Result: resultChan,
110173
}
111174

112-
dumper.files = append(dumper.files, archiveFileInfo)
113-
return nil
175+
select {
176+
case dumper.jobs <- job:
177+
return <-resultChan
178+
case err := <-dumper.done:
179+
return err
180+
}
114181
}
115182

116183
// AddReader adds a file's contents from a Reader, this uses a pipe to stream files from object store to prevent them from filling up disk
@@ -138,8 +205,18 @@ func (dumper *Dumper) AddReader(r io.ReadCloser, info os.FileInfo, customName st
138205
},
139206
}
140207

141-
dumper.files = append(dumper.files, fileInfo)
142-
return nil
208+
resultChan := make(chan error, 1)
209+
job := archives.ArchiveAsyncJob{
210+
File: fileInfo,
211+
Result: resultChan,
212+
}
213+
214+
select {
215+
case dumper.jobs <- job:
216+
return <-resultChan
217+
case err := <-dumper.done:
218+
return err
219+
}
143220
}
144221

145222
// pipeFile makes io.PipeReader compatible with fs.File interface
@@ -159,52 +236,8 @@ func (f *emptyDirFile) Close() error { return nil }
159236
func (f *emptyDirFile) Stat() (fs.FileInfo, error) { return f.info, nil }
160237

161238
func (dumper *Dumper) Close() error {
162-
ctx := context.Background()
163-
164-
switch dumper.format {
165-
case "zip":
166-
return archives.Zip{}.Archive(ctx, dumper.output, dumper.files)
167-
case "tar":
168-
return archives.Tar{}.Archive(ctx, dumper.output, dumper.files)
169-
case "tar.gz":
170-
comp := archives.CompressedArchive{
171-
Compression: archives.Gz{},
172-
Archival: archives.Tar{},
173-
}
174-
return comp.Archive(ctx, dumper.output, dumper.files)
175-
case "tar.xz":
176-
comp := archives.CompressedArchive{
177-
Compression: archives.Xz{},
178-
Archival: archives.Tar{},
179-
}
180-
return comp.Archive(ctx, dumper.output, dumper.files)
181-
case "tar.bz2":
182-
comp := archives.CompressedArchive{
183-
Compression: archives.Bz2{},
184-
Archival: archives.Tar{},
185-
}
186-
return comp.Archive(ctx, dumper.output, dumper.files)
187-
case "tar.br":
188-
comp := archives.CompressedArchive{
189-
Compression: archives.Brotli{},
190-
Archival: archives.Tar{},
191-
}
192-
return comp.Archive(ctx, dumper.output, dumper.files)
193-
case "tar.lz4":
194-
comp := archives.CompressedArchive{
195-
Compression: archives.Lz4{},
196-
Archival: archives.Tar{},
197-
}
198-
return comp.Archive(ctx, dumper.output, dumper.files)
199-
case "tar.zst":
200-
comp := archives.CompressedArchive{
201-
Compression: archives.Zstd{},
202-
Archival: archives.Tar{},
203-
}
204-
return comp.Archive(ctx, dumper.output, dumper.files)
205-
default:
206-
return fmt.Errorf("unsupported format: %s", dumper.format)
207-
}
239+
close(dumper.jobs)
240+
return <-dumper.done
208241
}
209242

210243
// AddFile kept for backwards compatibility since streaming is more efficient

0 commit comments

Comments
 (0)