@@ -5,6 +5,7 @@ package dump
55
66import (
77 "context"
8+ "errors"
89 "fmt"
910 "io"
1011 "io/fs"
@@ -62,78 +63,48 @@ func IsSubdir(upper, lower string) (bool, error) {
6263}
6364
6465type Dumper struct {
65- format string
66- output io.Writer
67- jobs chan archives.ArchiveAsyncJob
68- done chan error
6966 Verbose bool
7067
68+ jobs chan archives.ArchiveAsyncJob
69+ errArchiveAsync chan error
70+ errArchiveJob chan error
71+
7172 globalExcludeAbsPaths []string
7273}
7374
74- func NewDumper (format string , output io.Writer ) * Dumper {
75+ func NewDumper (ctx context. Context , format string , output io.Writer ) ( * Dumper , error ) {
7576 d := & Dumper {
76- format : format ,
77- output : output ,
78- jobs : make (chan archives.ArchiveAsyncJob , 100 ),
79- done : make (chan error , 1 ),
77+ jobs : make (chan archives.ArchiveAsyncJob , 1 ),
78+ errArchiveAsync : make (chan error , 1 ),
79+ errArchiveJob : make (chan error , 1 ),
8080 }
81- d .startArchiver ()
82- return d
83- }
8481
85- func (dumper * Dumper ) startArchiver () {
82+ var comp archives.ArchiverAsync
83+ switch format {
84+ case "zip" :
85+ comp = archives.Zip {}
86+ case "tar" :
87+ comp = archives.Tar {}
88+ case "tar.gz" :
89+ comp = archives.CompressedArchive {Compression : archives.Gz {}, Archival : archives.Tar {}}
90+ case "tar.xz" :
91+ comp = archives.CompressedArchive {Compression : archives.Xz {}, Archival : archives.Tar {}}
92+ case "tar.bz2" :
93+ comp = archives.CompressedArchive {Compression : archives.Bz2 {}, Archival : archives.Tar {}}
94+ case "tar.br" :
95+ comp = archives.CompressedArchive {Compression : archives.Brotli {}, Archival : archives.Tar {}}
96+ case "tar.lz4" :
97+ comp = archives.CompressedArchive {Compression : archives.Lz4 {}, Archival : archives.Tar {}}
98+ case "tar.zst" :
99+ comp = archives.CompressedArchive {Compression : archives.Zstd {}, Archival : archives.Tar {}}
100+ default :
101+ return nil , fmt .Errorf ("unsupported format: %s" , format )
102+ }
86103 go func () {
87- ctx := context .Background ()
88- var err error
89-
90- switch dumper .format {
91- case "zip" :
92- err = archives.Zip {}.ArchiveAsync (ctx , dumper .output , dumper .jobs )
93- case "tar" :
94- err = archives.Tar {}.ArchiveAsync (ctx , dumper .output , dumper .jobs )
95- case "tar.gz" :
96- comp := archives.CompressedArchive {
97- Compression : archives.Gz {},
98- Archival : archives.Tar {},
99- }
100- err = comp .ArchiveAsync (ctx , dumper .output , dumper .jobs )
101- case "tar.xz" :
102- comp := archives.CompressedArchive {
103- Compression : archives.Xz {},
104- Archival : archives.Tar {},
105- }
106- err = comp .ArchiveAsync (ctx , dumper .output , dumper .jobs )
107- case "tar.bz2" :
108- comp := archives.CompressedArchive {
109- Compression : archives.Bz2 {},
110- Archival : archives.Tar {},
111- }
112- err = comp .ArchiveAsync (ctx , dumper .output , dumper .jobs )
113- case "tar.br" :
114- comp := archives.CompressedArchive {
115- Compression : archives.Brotli {},
116- Archival : archives.Tar {},
117- }
118- err = comp .ArchiveAsync (ctx , dumper .output , dumper .jobs )
119- case "tar.lz4" :
120- comp := archives.CompressedArchive {
121- Compression : archives.Lz4 {},
122- Archival : archives.Tar {},
123- }
124- err = comp .ArchiveAsync (ctx , dumper .output , dumper .jobs )
125- case "tar.zst" :
126- comp := archives.CompressedArchive {
127- Compression : archives.Zstd {},
128- Archival : archives.Tar {},
129- }
130- err = comp .ArchiveAsync (ctx , dumper .output , dumper .jobs )
131- default :
132- err = fmt .Errorf ("unsupported format: %s" , dumper .format )
133- }
134-
135- dumper .done <- err
104+ d .errArchiveAsync <- comp .ArchiveAsync (ctx , output , d .jobs )
105+ close (d .errArchiveAsync )
136106 }()
107+ return d , nil
137108}
138109
139110// AddFilePath adds a file by its filesystem path
@@ -147,97 +118,62 @@ func (dumper *Dumper) AddFilePath(filePath, absPath string) error {
147118 return err
148119 }
149120
150- var archiveFileInfo archives.FileInfo
151- if fileInfo .IsDir () {
152- archiveFileInfo = archives.FileInfo {
153- FileInfo : fileInfo ,
154- NameInArchive : filePath ,
155- Open : func () (fs.File , error ) {
156- return & emptyDirFile {info : fileInfo }, nil
157- },
158- }
159- } else {
160- archiveFileInfo = archives.FileInfo {
161- FileInfo : fileInfo ,
162- NameInArchive : filePath ,
163- Open : func () (fs.File , error ) {
164- return os .Open (absPath )
165- },
166- }
121+ archiveFileInfo := archives.FileInfo {
122+ FileInfo : fileInfo ,
123+ NameInArchive : filePath ,
124+ Open : func () (fs.File , error ) {
125+ return os .Open (absPath )
126+ },
167127 }
168128
169- resultChan := make (chan error , 1 )
170- job := archives.ArchiveAsyncJob {
129+ dumper .jobs <- archives.ArchiveAsyncJob {
171130 File : archiveFileInfo ,
172- Result : resultChan ,
131+ Result : dumper . errArchiveJob ,
173132 }
174-
175133 select {
176- case dumper .jobs <- job :
177- return <- resultChan
178- case err := <- dumper .done :
134+ case err = <- dumper .errArchiveAsync :
135+ if err == nil {
136+ return errors .New ("archiver has been closed" )
137+ }
138+ return err
139+ case err = <- dumper .errArchiveJob :
179140 return err
180141 }
181142}
182143
144+ type readerFile struct {
145+ r io.Reader
146+ info os.FileInfo
147+ }
148+
149+ var _ fs.File = (* readerFile )(nil )
150+
151+ func (f * readerFile ) Stat () (fs.FileInfo , error ) { return f .info , nil }
152+ func (f * readerFile ) Read (bytes []byte ) (int , error ) { return f .r .Read (bytes ) }
153+ func (f * readerFile ) Close () error { return nil }
154+
183155// AddReader adds a file's contents from a Reader, this uses a pipe to stream files from object store to prevent them from filling up disk
184- func (dumper * Dumper ) AddReader (r io.ReadCloser , info os.FileInfo , customName string ) error {
156+ func (dumper * Dumper ) AddReader (r io.Reader , info os.FileInfo , customName string ) error {
185157 if dumper .Verbose {
186158 log .Info ("Adding file %s" , customName )
187159 }
188160
189- pr , pw := io .Pipe ()
190-
191161 fileInfo := archives.FileInfo {
192162 FileInfo : info ,
193163 NameInArchive : customName ,
194- Open : func () (fs.File , error ) {
195- go func () {
196- defer pw .Close ()
197- _ , err := io .Copy (pw , r )
198- r .Close ()
199- if err != nil {
200- pw .CloseWithError (err )
201- }
202- }()
203-
204- return & pipeFile {PipeReader : pr , info : info }, nil
205- },
164+ Open : func () (fs.File , error ) { return & readerFile {r , info }, nil },
206165 }
207166
208- resultChan := make (chan error , 1 )
209- job := archives.ArchiveAsyncJob {
167+ dumper .jobs <- archives.ArchiveAsyncJob {
210168 File : fileInfo ,
211- Result : resultChan ,
169+ Result : dumper . errArchiveJob ,
212170 }
213-
214- select {
215- case dumper .jobs <- job :
216- return <- resultChan
217- case err := <- dumper .done :
218- return err
219- }
220- }
221-
222- // pipeFile makes io.PipeReader compatible with fs.File interface
223- type pipeFile struct {
224- * io.PipeReader
225- info os.FileInfo
171+ return <- dumper .errArchiveJob
226172}
227173
228- func (f * pipeFile ) Stat () (fs.FileInfo , error ) { return f .info , nil }
229-
230- type emptyDirFile struct {
231- info os.FileInfo
232- }
233-
234- func (f * emptyDirFile ) Read ([]byte ) (int , error ) { return 0 , io .EOF }
235- func (f * emptyDirFile ) Close () error { return nil }
236- func (f * emptyDirFile ) Stat () (fs.FileInfo , error ) { return f .info , nil }
237-
238174func (dumper * Dumper ) Close () error {
239175 close (dumper .jobs )
240- return <- dumper .done
176+ return <- dumper .errArchiveAsync
241177}
242178
243179// AddFile kept for backwards compatibility since streaming is more efficient
0 commit comments