diff --git a/kadai3/imura81gt/rget/.go-version b/kadai3/imura81gt/rget/.go-version new file mode 100644 index 0000000..80138e7 --- /dev/null +++ b/kadai3/imura81gt/rget/.go-version @@ -0,0 +1 @@ +1.13.4 diff --git a/kadai3/imura81gt/rget/README.md b/kadai3/imura81gt/rget/README.md new file mode 100644 index 0000000..8eeba68 --- /dev/null +++ b/kadai3/imura81gt/rget/README.md @@ -0,0 +1,48 @@ +rget +========================================================= + +Command +----------------------------------------- + +``` +go run cmd/rget/main.go https://upload.wikimedia.org/wikipedia/commons/1/16/Notocactus_minimus.jpg +``` + +Theme +----------------------------------------- + +分割ダウンロードを行う + +元ネタ: https://qiita.com/codehex/items/d0a500ac387d39a34401 + +- [x]Rangeアクセスを用いる +- [ ]いくつかのゴルーチンでダウンロードしてマージする +- [x]エラー処理を工夫する +- [x]golang.org/x/sync/errgroupパッケージなどを使ってみる +- [x]キャンセルが発生した場合の実装を行う + +ref: https://qiita.com/codehex/items/d0a500ac387d39a34401 + + + +Note. +------------------------------------------ + +### Range Request + +https://developer.mozilla.org/ja/docs/Web/HTTP/Range_requests + +> Accept-Ranges が HTTP レスポンスに存在した場合 (そして値が "none" ではない場合)、サーバーは範囲リクエストに対応しています。これは例えば、 HEAD リクエストを cURL で発行することで確認することができます。 + + +https://developer.mozilla.org/ja/docs/Web/HTTP/Headers/Accept-Ranges + +> Accept-Ranges: bytes +> Accept-Ranges: none + +https://developer.mozilla.org/ja/docs/Web/HTTP/Headers/Range + +> Range: =- +> Range: =- +> Range: =-, - +> Range: =-, -, - diff --git a/kadai3/imura81gt/rget/cmd/rget/main.go b/kadai3/imura81gt/rget/cmd/rget/main.go new file mode 100644 index 0000000..720479b --- /dev/null +++ b/kadai3/imura81gt/rget/cmd/rget/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/gopherdojo/dojo7/kadai3/imura81gt/rget" +) + +func main() { + concurrency := flag.Uint("c", 2, "concurrency") + outputDir := flag.String("o", "./", "output directory") + + flag.Parse() + option := rget.Option{ + Concurrency: *concurrency, + OutputDir: *outputDir, + } + urls := flag.Args() + if len(urls) != 1 { + fmt.Fprintf(os.Stderr, "%s \n", os.Args[0]) + fmt.Fprintln(os.Stderr, "option:") + flag.PrintDefaults() + os.Exit(1) + } + + option.URL = urls[0] + fmt.Println(option) + err := rget.Run(option) + if err != nil { + fmt.Fprintf(os.Stderr, "err: %s", err) + os.Exit(1) + } +} diff --git a/kadai3/imura81gt/rget/cmd/rget/main_test.go b/kadai3/imura81gt/rget/cmd/rget/main_test.go new file mode 100644 index 0000000..097f27a --- /dev/null +++ b/kadai3/imura81gt/rget/cmd/rget/main_test.go @@ -0,0 +1,5 @@ +package main + +import "testing" + +func TestMain(t *testing.T) {} diff --git a/kadai3/imura81gt/rget/go.mod b/kadai3/imura81gt/rget/go.mod new file mode 100644 index 0000000..a46d6f7 --- /dev/null +++ b/kadai3/imura81gt/rget/go.mod @@ -0,0 +1,8 @@ +module github.com/gopherdojo/dojo7/kadai3/imura81gt/rget + +go 1.13 + +require ( + github.com/google/go-cmp v0.3.1 + golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e +) diff --git a/kadai3/imura81gt/rget/go.sum b/kadai3/imura81gt/rget/go.sum new file mode 100644 index 0000000..33b824b --- /dev/null +++ b/kadai3/imura81gt/rget/go.sum @@ -0,0 +1,4 @@ +github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/kadai3/imura81gt/rget/rget.go b/kadai3/imura81gt/rget/rget.go new file mode 100644 index 0000000..4fc49ae --- /dev/null +++ b/kadai3/imura81gt/rget/rget.go @@ -0,0 +1,235 @@ +package rget + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "path" + "path/filepath" + + "golang.org/x/sync/errgroup" +) + +type Option struct { + Concurrency uint + URL string + OutputDir string + ContentLength int64 + Units Units +} + +type Unit struct { + RangeStart int64 + RangeEnd int64 + TempFileName string + DownloadedSize int64 +} + +func (u *Unit) Write(data []byte) (int, error) { + d := len(data) + u.DownloadedSize += int64(d) + // fmt.Printf("%v is downloaded %v/%v \n", + // u.TempFileName, u.DownloadedSize, u.RangeEnd-u.RangeStart+1) + return d, nil +} + +type Units []Unit + +func Run(option Option) error { + fmt.Printf("%+v\n", option) + err := option.checkingHeaders() + if err != nil { + return fmt.Errorf("%s", err) + } + + option.divide() + + tmpDir, err := ioutil.TempDir("", "rget") + if err != nil { + return fmt.Errorf("%s", err) + } + defer os.RemoveAll(tmpDir) + fmt.Println(tmpDir) + + err = option.parallelDownload(tmpDir) + if err != nil { + return fmt.Errorf("%s", err) + } + + err = option.combine(tmpDir) + if err != nil { + return fmt.Errorf("%s", err) + } + + return nil +} + +func (o *Option) checkingHeaders() error { + resp, err := http.Head(o.URL) + if err != nil { + return err + } + + if resp.Header.Get("Accept-Ranges") == "" { + err := fmt.Errorf("%s : %s cannot support Ranges Requests", o.URL, resp.Request.URL.String()) + return err + } + + if resp.Header["Accept-Ranges"][0] == "none" { + err := fmt.Errorf("%s : %s cannot support Ranges Requests", o.URL, resp.Request.URL.String()) + return err + } + + if resp.ContentLength == 0 { + err := fmt.Errorf("%s size is nil", o.URL) + return err + } + + redirectURL := resp.Request.URL.String() + + o.ContentLength = resp.ContentLength + + // keep the redirect URL that accept Ranges Requests because some mirror sites may deny. + // TODO: redirectURL should set by Unit separately. + if o.URL != redirectURL { + o.URL = redirectURL + } + + return err +} + +//func divide(contentLength int64, concurrency int) Units { +func (o *Option) divide() { + var units []Unit + + if o.Concurrency == 0 { + o.Concurrency = 1 + } + + if o.ContentLength < int64(o.Concurrency) { + o.Concurrency = uint(o.ContentLength) + } + + sbyte := o.ContentLength / int64(o.Concurrency) + + for i := 0; i < int(o.Concurrency); i++ { + units = append(units, Unit{ + RangeStart: int64(i) * sbyte, + RangeEnd: int64((i+1))*sbyte - 1, + TempFileName: fmt.Sprintf("%d_%s", i, path.Base(o.URL)), + }) + } + + // TODO: should distribute the remainder to each unit + units[len(units)-1].RangeEnd = (o.ContentLength - 1) + + o.Units = units +} + +func (o *Option) parallelDownload(tmpDir string) error { + fmt.Println("parallelDownload", o.Units) + + eg, ctx := errgroup.WithContext(context.Background()) + for i := range o.Units { + // https://godoc.org/golang.org/x/sync/errgroup#example-Group--Parallel + // https://golang.org/doc/faq#closures_and_goroutines + i := i + eg.Go(func() error { + return o.downloadWithContext(ctx, i, tmpDir) + }) + } + + if err := eg.Wait(); err != nil { + return err + } + + return nil +} + +func (o *Option) downloadWithContext( + ctx context.Context, + i int, + dir string, +) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + fmt.Printf("Downloading: %v %+v\n", i, o.Units[i]) + + //v1.13 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, o.URL, nil) + if err != nil { + return fmt.Errorf("Error: %v", err) + } + + // add range header + byteRange := fmt.Sprintf("bytes=%d-%d", o.Units[i].RangeStart, o.Units[i].RangeEnd) + fmt.Println(byteRange) + req.Header.Set("Range", byteRange) + + client := http.DefaultClient + // TODO: should check resp.StatusCode. + // client.Do cannot seems to return the err when statusCode is 50x etc. + resp, err := client.Do(req) + if err != nil { + fmt.Printf("client err: %s", err) + return fmt.Errorf("Error: %v", err) + } + defer resp.Body.Close() + + select { + case <-ctx.Done(): + fmt.Printf("Done: %v %+v\n", i, o.Units[i]) + return fmt.Errorf("Error: %v", err) + default: + fmt.Println("default:", i, o.Units[i]) + } + + w, err := os.Create(filepath.Join(dir, o.Units[i].TempFileName)) + if err != nil { + return fmt.Errorf("Error: %v", err) + } + defer func() error { + if err := w.Close(); err != nil { + return fmt.Errorf("Error: %v", err) + } + return nil + }() + + _, err = io.Copy(w, io.TeeReader(resp.Body, &o.Units[i])) + if err != nil { + return fmt.Errorf("Error: %v", err) + } + + return nil +} + +func (o *Option) combine(dir string) error { + w, err := os.Create(filepath.Join(o.OutputDir, path.Base(o.URL))) + if err != nil { + return fmt.Errorf("Error: %v", err) + } + defer func() error { + if err := w.Close(); err != nil { + return fmt.Errorf("Error: %v", err) + } + return nil + }() + + for _, unit := range o.Units { + r, err := os.Open(filepath.Join(dir, unit.TempFileName)) + if err != nil { + return fmt.Errorf("Error: %v", err) + } + + _, err = io.Copy(w, r) + if err != nil { + return fmt.Errorf("Error: %v", err) + } + } + + return nil +} diff --git a/kadai3/imura81gt/rget/rget_test.go b/kadai3/imura81gt/rget/rget_test.go new file mode 100644 index 0000000..3f62798 --- /dev/null +++ b/kadai3/imura81gt/rget/rget_test.go @@ -0,0 +1,316 @@ +package rget + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" +) + +func TestWrite(t *testing.T) { +} + +func TestRun(t *testing.T) { +} + +func TestCheckingHeaders(t *testing.T) { + testCases := []struct { + caseName string + acceptRanges string + body string + isErr bool + expected string + }{ + {caseName: "acceptRanges:none", acceptRanges: "none", body: "1", isErr: true, expected: "cannot support Ranges Requests"}, + {caseName: "acceptRanges:(empty)", acceptRanges: "", body: "1", isErr: true, expected: "cannot support Ranges Requests"}, + {caseName: "acceptRanges:bytes", acceptRanges: "bytes", body: "1", isErr: false}, + {caseName: "acceptRanges:bytes but content is empty", acceptRanges: "bytes", body: "", isErr: true, expected: "size is nil"}, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.caseName, func(t *testing.T) { + t.Parallel() + + ts := SetupHTTPServer(t, tc.acceptRanges, tc.body) + defer ts.Close() + + // resp, _ := http.Get(ts.URL) + // t.Logf("resp: %+v", resp) + + resp, err := http.Head(ts.URL) + if err != nil { + t.Fatal(err) + } + t.Logf("ContentLength: %+v,Accept-Ranges: %+v", resp.ContentLength, resp.Header.Get("Accept-Ranges")) + + o := Option{URL: ts.URL} + exerr := o.checkingHeaders() + if tc.isErr && exerr == nil { + t.Errorf("tc.isErr %+v but err is %+v", tc.isErr, exerr) + } + if tc.isErr && exerr != nil && !strings.Contains(exerr.Error(), tc.expected) { + t.Errorf("actual: %+v\nexpected: %+v\n", exerr, tc.isErr) + } + }) + } + + t.Run("Invalid URL", func(t *testing.T) { + oEmpty := Option{URL: "s3://example.com"} + exerr := oEmpty.checkingHeaders() + t.Log(exerr) + if exerr == nil { + t.Error("invalid URL but err is nil") + } + }) + +} + +//func divide(contentLength int64, concurrency int) Units { +func TestDivide(t *testing.T) { + + const ( + url = "https://example.com/test.iso" + contentLength = 5 + ) + + testCases := []struct { + caseName string + concurrency uint + expected Option + }{ + { + caseName: "ContentLength and Concurrency is same value", + concurrency: 5, + expected: Option{ + URL: url, + ContentLength: contentLength, + Concurrency: 5, + Units: []Unit{ + {TempFileName: "0_test.iso", RangeStart: 0, RangeEnd: 0}, + {TempFileName: "1_test.iso", RangeStart: 1, RangeEnd: 1}, + {TempFileName: "2_test.iso", RangeStart: 2, RangeEnd: 2}, + {TempFileName: "3_test.iso", RangeStart: 3, RangeEnd: 3}, + {TempFileName: "4_test.iso", RangeStart: 4, RangeEnd: 4}, + }, + }, + }, + { + caseName: "One Thread", + concurrency: 1, + expected: Option{ + URL: url, + ContentLength: contentLength, + Concurrency: 1, + Units: []Unit{ + {TempFileName: "0_test.iso", RangeStart: 0, RangeEnd: 4}, + }, + }, + }, + { + caseName: "Remainder:ContentLength%Concurrency!=0", + concurrency: 2, + expected: Option{ + URL: url, + ContentLength: contentLength, + Concurrency: 2, + Units: []Unit{ + {TempFileName: "0_test.iso", RangeStart: 0, RangeEnd: 1}, + {TempFileName: "1_test.iso", RangeStart: 2, RangeEnd: 4}, + }, + }, + }, + { + caseName: "Concurrency exceed the contentLength.", + concurrency: 10, + expected: Option{ + URL: url, + ContentLength: contentLength, + Concurrency: 5, + Units: []Unit{ + {TempFileName: "0_test.iso", RangeStart: 0, RangeEnd: 0}, + {TempFileName: "1_test.iso", RangeStart: 1, RangeEnd: 1}, + {TempFileName: "2_test.iso", RangeStart: 2, RangeEnd: 2}, + {TempFileName: "3_test.iso", RangeStart: 3, RangeEnd: 3}, + {TempFileName: "4_test.iso", RangeStart: 4, RangeEnd: 4}, + }, + }, + }, + { + caseName: "ContentLength:5/0", + concurrency: 0, + expected: Option{ + URL: url, + ContentLength: contentLength, + Concurrency: 1, + Units: []Unit{ + {TempFileName: "0_test.iso", RangeStart: 0, RangeEnd: 4}, + }, + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.caseName, func(t *testing.T) { + t.Parallel() + + o := Option{URL: url, ContentLength: contentLength, Concurrency: tc.concurrency} + o.divide() + + if !cmp.Equal(o, tc.expected) { + t.Errorf("actual: %+v\nexpected: %+v\n", o, tc.expected) + } + + }) + } +} + +func TestParallelDownload(t *testing.T) { + + type Expected []struct { + TempFileName string + Text string + } + + testCases := []struct { + caseName string + acceptRanges string + body string + option Option + expected Expected + }{ + { + caseName: "acceptRanges:bytes per 1byte", acceptRanges: "bytes", body: "12345", + option: Option{ + Units: []Unit{ + {TempFileName: "0_test.iso", RangeStart: 0, RangeEnd: 0}, + {TempFileName: "1_test.iso", RangeStart: 1, RangeEnd: 1}, + {TempFileName: "2_test.iso", RangeStart: 2, RangeEnd: 2}, + {TempFileName: "3_test.iso", RangeStart: 3, RangeEnd: 3}, + {TempFileName: "4_test.iso", RangeStart: 4, RangeEnd: 4}, + }, + }, + expected: Expected{ + {TempFileName: "0_test.iso", Text: "1"}, + {TempFileName: "1_test.iso", Text: "2"}, + {TempFileName: "2_test.iso", Text: "3"}, + {TempFileName: "3_test.iso", Text: "4"}, + {TempFileName: "4_test.iso", Text: "5"}, + }, + }, + { + caseName: "acceptRanges:bytes per 2bytes", acceptRanges: "bytes", body: "0123456789", + option: Option{ + Units: []Unit{ + {TempFileName: "0_test.iso", RangeStart: 0, RangeEnd: 1}, + {TempFileName: "1_test.iso", RangeStart: 2, RangeEnd: 3}, + {TempFileName: "2_test.iso", RangeStart: 4, RangeEnd: 5}, + {TempFileName: "3_test.iso", RangeStart: 6, RangeEnd: 7}, + {TempFileName: "4_test.iso", RangeStart: 8, RangeEnd: 9}, + }, + }, + expected: Expected{ + {TempFileName: "0_test.iso", Text: "01"}, + {TempFileName: "1_test.iso", Text: "23"}, + {TempFileName: "2_test.iso", Text: "45"}, + {TempFileName: "3_test.iso", Text: "67"}, + {TempFileName: "4_test.iso", Text: "89"}, + }, + }, + { + caseName: "acceptRanges:bytes per 2bytes+1", acceptRanges: "bytes", body: "01234567890", + option: Option{ + Units: []Unit{ + {TempFileName: "0_test.iso", RangeStart: 0, RangeEnd: 1}, + {TempFileName: "1_test.iso", RangeStart: 2, RangeEnd: 3}, + {TempFileName: "2_test.iso", RangeStart: 4, RangeEnd: 5}, + {TempFileName: "3_test.iso", RangeStart: 6, RangeEnd: 7}, + {TempFileName: "4_test.iso", RangeStart: 8, RangeEnd: 10}, + }, + }, + expected: Expected{ + {TempFileName: "0_test.iso", Text: "01"}, + {TempFileName: "1_test.iso", Text: "23"}, + {TempFileName: "2_test.iso", Text: "45"}, + {TempFileName: "3_test.iso", Text: "67"}, + {TempFileName: "4_test.iso", Text: "890"}, + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.caseName, func(t *testing.T) { + t.Parallel() + + prefix := "rget_test" + tmpDir, err := ioutil.TempDir("", prefix) + if err != nil { + t.Fatal(err) + } + + ts := SetupHTTPServer(t, tc.acceptRanges, tc.body) + defer ts.Close() + + tc.option.URL = ts.URL + resp, err := http.Head(tc.option.URL) + if err != nil { + t.Fatal(err) + } + + tc.option.ContentLength = resp.ContentLength + + err = tc.option.parallelDownload(tmpDir) + if err != nil { + t.Fatal(err) + } + + for _, ex := range tc.expected { + f, err := os.Open(filepath.Join(tmpDir, ex.TempFileName)) + if err != nil { + t.Fatal(err) + } + + actual, err := ioutil.ReadAll(f) + if ex.Text != string(actual) { + t.Errorf("actual: %+v\nexpected: %+v\n", string(actual), ex.Text) + } + + } + // t.Log(tmpDir) + os.RemoveAll(tmpDir) + + }) + } + +} + +func TestDownloadWithContext(t *testing.T) { +} + +func TestCombine(t *testing.T) { +} + +func SetupHTTPServer(t *testing.T, ac string, body string) *httptest.Server { + t.Helper() + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // the unsupported server for Range request + if ac != "bytes" { + w.Header().Set("Accept-Ranges", ac) + fmt.Fprint(w, body) + } else { + // the supported server for Range request + http.ServeContent(w, r, "", time.Unix(0, 0), strings.NewReader(body)) + } + + })) + return ts +}