Skip to content

Commit f139615

Browse files
committed
Add downloading package
1 parent 0ddf69b commit f139615

File tree

7 files changed

+636
-1
lines changed

7 files changed

+636
-1
lines changed

kadai3-2/hioki-daichi/Gopkg.lock

Lines changed: 17 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
/*
2+
Package downloading provides download function.
3+
*/
4+
package downloading
5+
6+
import (
7+
"context"
8+
"crypto/rand"
9+
"errors"
10+
"fmt"
11+
"io"
12+
"io/ioutil"
13+
"net/http"
14+
"net/url"
15+
"os"
16+
"path"
17+
"path/filepath"
18+
"sync"
19+
"time"
20+
21+
"github.com/gopherdojo/dojo3/kadai3-2/hioki-daichi/opt"
22+
"github.com/gopherdojo/dojo3/kadai3-2/hioki-daichi/termination"
23+
"golang.org/x/sync/errgroup"
24+
)
25+
26+
var (
27+
errResponseDoesNotIncludeAcceptRangesHeader = errors.New("response does not include Accept-Ranges header")
28+
errValueOfAcceptRangesHeaderIsNotBytes = errors.New("the value of Accept-Ranges header is not bytes")
29+
errNoContent = errors.New("no content")
30+
)
31+
32+
// Downloader has the information for the download.
33+
type Downloader struct {
34+
outStream io.Writer
35+
url *url.URL
36+
parallelism int
37+
output string
38+
timeout time.Duration
39+
}
40+
41+
// NewDownloader generates Downloader based on Options.
42+
func NewDownloader(w io.Writer, opts *opt.Options) *Downloader {
43+
return &Downloader{
44+
outStream: w,
45+
url: opts.URL,
46+
parallelism: opts.Parallelism,
47+
output: opts.Output,
48+
timeout: opts.Timeout,
49+
}
50+
}
51+
52+
// Download performs parallel download.
53+
func (d *Downloader) Download(ctx context.Context) error {
54+
ctx, cancel := context.WithTimeout(ctx, d.timeout)
55+
defer cancel()
56+
57+
contentLength, err := d.getContentLength(ctx)
58+
if err != nil {
59+
return err
60+
}
61+
62+
rangeHeaders := d.toRangeHeaders(contentLength)
63+
64+
tempDir, err := ioutil.TempDir("", "parallel-download")
65+
if err != nil {
66+
return err
67+
}
68+
clean := func() { os.RemoveAll(tempDir) }
69+
defer clean()
70+
termination.CleanFunc(clean)
71+
72+
filenames, err := d.parallelDownload(ctx, rangeHeaders, tempDir)
73+
if err != nil {
74+
return err
75+
}
76+
77+
filename, err := d.concat(filenames, tempDir)
78+
if err != nil {
79+
return err
80+
}
81+
82+
fmt.Fprintf(d.outStream, "rename %q to %q\n", filename, d.output)
83+
84+
err = os.Rename(filename, d.output)
85+
if err != nil {
86+
return err
87+
}
88+
89+
fmt.Fprintf(d.outStream, "completed: %q\n", d.output)
90+
91+
return nil
92+
}
93+
94+
// getContentLength returns the value of Content-Length received by making a HEAD request.
95+
func (d *Downloader) getContentLength(ctx context.Context) (int, error) {
96+
fmt.Fprintf(d.outStream, "start HEAD request to get Content-Length\n")
97+
98+
req, err := http.NewRequest("HEAD", d.url.String(), nil)
99+
if err != nil {
100+
return 0, err
101+
}
102+
req = req.WithContext(ctx)
103+
104+
resp, err := http.DefaultClient.Do(req)
105+
if err != nil {
106+
return 0, err
107+
}
108+
109+
err = d.validateAcceptRangesHeader(resp)
110+
if err != nil {
111+
return 0, err
112+
}
113+
114+
contentLength := int(resp.ContentLength)
115+
116+
fmt.Fprintf(d.outStream, "got: Content-Length: %d\n", contentLength)
117+
118+
if contentLength < 1 {
119+
return 0, errNoContent
120+
}
121+
122+
return contentLength, nil
123+
}
124+
125+
// validateAcceptRangesHeader validates the following.
126+
// - The presence of an Accept-Ranges header
127+
// - The value of the Accept-Ranges header is "bytes"
128+
func (d *Downloader) validateAcceptRangesHeader(resp *http.Response) error {
129+
acceptRangesHeader := resp.Header.Get("Accept-Ranges")
130+
131+
fmt.Fprintf(d.outStream, "got: Accept-Ranges: %s\n", acceptRangesHeader)
132+
133+
if acceptRangesHeader == "" {
134+
return errResponseDoesNotIncludeAcceptRangesHeader
135+
}
136+
137+
if acceptRangesHeader != "bytes" {
138+
return errValueOfAcceptRangesHeaderIsNotBytes
139+
}
140+
141+
return nil
142+
}
143+
144+
// toRangeHeaders converts the value of Content-Length to the value of Range header.
145+
func (d *Downloader) toRangeHeaders(contentLength int) []string {
146+
parallelism := d.parallelism
147+
148+
// 1 <= parallelism <= Content-Length
149+
if parallelism < 1 {
150+
parallelism = 1
151+
}
152+
if contentLength < parallelism {
153+
parallelism = contentLength
154+
}
155+
156+
unitLength := contentLength / parallelism
157+
remainingLength := contentLength % parallelism
158+
159+
rangeHeaders := make([]string, 0)
160+
161+
cntr := 0
162+
for n := parallelism; n > 0; n-- {
163+
min := cntr
164+
max := cntr + unitLength - 1
165+
166+
// Add the remaining length to the last chunk
167+
if n == 1 && remainingLength != 0 {
168+
max += remainingLength
169+
}
170+
171+
rangeHeaders = append(rangeHeaders, fmt.Sprintf("bytes=%d-%d", min, max))
172+
173+
cntr += unitLength
174+
}
175+
176+
return rangeHeaders
177+
}
178+
179+
// parallelDownload downloads in parallel for each specified rangeHeaders and saves it in the specified dir.
180+
func (d *Downloader) parallelDownload(ctx context.Context, rangeHeaders []string, dir string) (map[int]string, error) {
181+
filenames := map[int]string{}
182+
183+
filenameCh := make(chan map[int]string)
184+
errCh := make(chan error)
185+
186+
for i, rangeHeader := range rangeHeaders {
187+
go d.partialDownloadAndSendToChannel(ctx, i, rangeHeader, filenameCh, errCh, dir)
188+
}
189+
190+
eg, ctx := errgroup.WithContext(ctx)
191+
var mu sync.Mutex
192+
for i := 0; i < len(rangeHeaders); i++ {
193+
eg.Go(func() error {
194+
select {
195+
case <-ctx.Done():
196+
return ctx.Err()
197+
case m := <-filenameCh:
198+
for k, v := range m {
199+
mu.Lock()
200+
filenames[k] = v
201+
mu.Unlock()
202+
}
203+
return nil
204+
case err := <-errCh:
205+
return err
206+
}
207+
})
208+
}
209+
210+
if err := eg.Wait(); err != nil {
211+
return nil, err
212+
}
213+
214+
return filenames, nil
215+
}
216+
217+
// partialDownloadAndSendToChannel performs partialDownload and sends it to the appropriate channel according to the result.
218+
func (d *Downloader) partialDownloadAndSendToChannel(ctx context.Context, i int, rangeHeader string, filenameCh chan<- map[int]string, errCh chan<- error, dir string) {
219+
filename, err := d.partialDownload(ctx, rangeHeader, dir)
220+
if err != nil {
221+
errCh <- err
222+
return
223+
}
224+
225+
filenameCh <- map[int]string{i: filename}
226+
227+
return
228+
}
229+
230+
// partialDownload sends a partial request with the specified rangeHeader,
231+
// and saves the response body in the file under the specified dir,
232+
// and returns the filename.
233+
func (d *Downloader) partialDownload(ctx context.Context, rangeHeader string, dir string) (string, error) {
234+
req, err := http.NewRequest("GET", d.url.String(), nil)
235+
if err != nil {
236+
return "", err
237+
}
238+
req = req.WithContext(ctx)
239+
240+
req.Header.Set("Range", rangeHeader)
241+
242+
fmt.Fprintf(d.outStream, "start GET request with header: \"Range: %s\"\n", rangeHeader)
243+
244+
resp, err := http.DefaultClient.Do(req)
245+
if err != nil {
246+
return "", err
247+
}
248+
defer resp.Body.Close()
249+
250+
if resp.StatusCode != http.StatusPartialContent {
251+
return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
252+
}
253+
254+
fp, err := os.Create(path.Join(dir, randomHexStr()))
255+
if err != nil {
256+
return "", err
257+
}
258+
259+
_, err = io.Copy(fp, resp.Body)
260+
if err != nil {
261+
return "", err
262+
}
263+
264+
filename := fp.Name()
265+
266+
fmt.Fprintf(d.outStream, "downloaded: %q\n", filename)
267+
268+
return filename, nil
269+
}
270+
271+
// concat concatenates the files in order based on the mapping of the specified filenames,
272+
// and creates the concatenated file under the specified dir,
273+
// and returns the filename.
274+
func (d *Downloader) concat(filenames map[int]string, dir string) (string, error) {
275+
fp, err := os.Create(filepath.Join(dir, randomHexStr()))
276+
if err != nil {
277+
return "", err
278+
}
279+
defer fp.Close()
280+
281+
filename := fp.Name()
282+
283+
fmt.Fprintf(d.outStream, "concatenate downloaded files to tempfile: %q\n", filename)
284+
285+
for i := 0; i < len(filenames); i++ {
286+
src, err := os.Open(filenames[i])
287+
if err != nil {
288+
return "", err
289+
}
290+
291+
_, err = io.Copy(fp, src)
292+
if err != nil {
293+
return "", err
294+
}
295+
}
296+
297+
return filename, nil
298+
}
299+
300+
// randomHexStr returns a random hex string of length 10.
301+
// 10 is a length which does not duplicate enough.
302+
func randomHexStr() string {
303+
b := make([]byte, 5)
304+
_, err := rand.Read(b)
305+
if err != nil {
306+
panic(err)
307+
}
308+
return fmt.Sprintf("%x", b)
309+
}

0 commit comments

Comments
 (0)