|
6 | 6 | "encoding/xml" |
7 | 7 | "errors" |
8 | 8 | "fmt" |
| 9 | + "hash/crc64" |
9 | 10 | "io" |
10 | 11 | "io/ioutil" |
11 | 12 | "net/http" |
@@ -147,7 +148,7 @@ type ObjectPutHeaderOptions struct { |
147 | 148 | ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"` |
148 | 149 | ContentType string `header:"Content-Type,omitempty" url:"-"` |
149 | 150 | ContentMD5 string `header:"Content-MD5,omitempty" url:"-"` |
150 | | - ContentLength int `header:"Content-Length,omitempty" url:"-"` |
| 151 | + ContentLength int64 `header:"Content-Length,omitempty" url:"-"` |
151 | 152 | ContentLanguage string `header:"Content-Language,omitempty" url:"-"` |
152 | 153 | Expect string `header:"Expect,omitempty" url:"-"` |
153 | 154 | Expires string `header:"Expires,omitempty" url:"-"` |
@@ -179,26 +180,34 @@ type ObjectPutOptions struct { |
179 | 180 |
|
180 | 181 | // Put Object请求可以将一个文件(Oject)上传至指定Bucket。 |
181 | 182 | // |
182 | | -// 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength |
183 | | -// |
184 | 183 | // https://www.qcloud.com/document/product/436/7749 |
185 | | -func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) { |
| 184 | +func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, uopt *ObjectPutOptions) (*Response, error) { |
186 | 185 | if err := CheckReaderLen(r); err != nil { |
187 | 186 | return nil, err |
188 | 187 | } |
189 | | - if opt != nil && opt.Listener != nil { |
190 | | - totalBytes, err := GetReaderLen(r) |
191 | | - if err != nil { |
192 | | - return nil, err |
| 188 | + opt := cloneObjectPutOptions(uopt) |
| 189 | + totalBytes, err := GetReaderLen(r) |
| 190 | + if err != nil && opt != nil && opt.Listener != nil { |
| 191 | + return nil, err |
| 192 | + } |
| 193 | + if err == nil { |
| 194 | + // 与 go http 保持一致, 非bytes.Buffer/bytes.Reader/strings.Reader由用户指定ContentLength, 或使用 Chunk 上传 |
| 195 | + if opt != nil && opt.ContentLength == 0 && IsLenReader(r) { |
| 196 | + opt.ContentLength = totalBytes |
193 | 197 | } |
194 | | - r = TeeReader(r, nil, totalBytes, opt.Listener) |
195 | 198 | } |
196 | | - |
| 199 | + reader := TeeReader(r, nil, totalBytes, nil) |
| 200 | + if s.client.Conf.EnableCRC { |
| 201 | + reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA)) |
| 202 | + } |
| 203 | + if opt != nil && opt.Listener != nil { |
| 204 | + reader.listener = opt.Listener |
| 205 | + } |
197 | 206 | sendOpt := sendOptions{ |
198 | 207 | baseURL: s.client.BaseURL.BucketURL, |
199 | 208 | uri: "/" + encodeURIComponent(name), |
200 | 209 | method: http.MethodPut, |
201 | | - body: r, |
| 210 | + body: reader, |
202 | 211 | optHeader: opt, |
203 | 212 | } |
204 | 213 | resp, err := s.client.send(ctx, &sendOpt) |
@@ -556,38 +565,54 @@ type Results struct { |
556 | 565 | err error |
557 | 566 | } |
558 | 567 |
|
| 568 | +func LimitReadCloser(r io.Reader, n int64) io.Reader { |
| 569 | + var lc LimitedReadCloser |
| 570 | + lc.R = r |
| 571 | + lc.N = n |
| 572 | + return &lc |
| 573 | +} |
| 574 | + |
| 575 | +type LimitedReadCloser struct { |
| 576 | + io.LimitedReader |
| 577 | +} |
| 578 | + |
| 579 | +func (lc *LimitedReadCloser) Close() error { |
| 580 | + if r, ok := lc.R.(io.ReadCloser); ok { |
| 581 | + return r.Close() |
| 582 | + } |
| 583 | + return nil |
| 584 | +} |
| 585 | + |
559 | 586 | func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { |
560 | 587 | for j := range jobs { |
561 | | - fd, err := os.Open(j.FilePath) |
562 | | - var res Results |
563 | | - if err != nil { |
564 | | - res.err = err |
565 | | - res.PartNumber = j.Chunk.Number |
566 | | - res.Resp = nil |
567 | | - results <- &res |
568 | | - } |
569 | | - |
570 | | - // UploadPart do not support the chunk trsf, so need to add the content-length |
571 | | - j.Opt.ContentLength = int(j.Chunk.Size) |
| 588 | + j.Opt.ContentLength = j.Chunk.Size |
572 | 589 |
|
573 | 590 | rt := j.RetryTimes |
574 | 591 | for { |
| 592 | + // http.Request.Body can be Closed in request |
| 593 | + fd, err := os.Open(j.FilePath) |
| 594 | + var res Results |
| 595 | + if err != nil { |
| 596 | + res.err = err |
| 597 | + res.PartNumber = j.Chunk.Number |
| 598 | + res.Resp = nil |
| 599 | + results <- &res |
| 600 | + break |
| 601 | + } |
575 | 602 | fd.Seek(j.Chunk.OffSet, os.SEEK_SET) |
576 | 603 | resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number, |
577 | | - &io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt) |
| 604 | + LimitReadCloser(fd, j.Chunk.Size), j.Opt) |
578 | 605 | res.PartNumber = j.Chunk.Number |
579 | 606 | res.Resp = resp |
580 | 607 | res.err = err |
581 | 608 | if err != nil { |
582 | 609 | rt-- |
583 | 610 | if rt == 0 { |
584 | | - fd.Close() |
585 | 611 | results <- &res |
586 | 612 | break |
587 | 613 | } |
588 | 614 | continue |
589 | 615 | } |
590 | | - fd.Close() |
591 | 616 | results <- &res |
592 | 617 | break |
593 | 618 | } |
|
0 commit comments