Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,43 @@ func (u *UFileRequest) DownloadFile(writer io.Writer, keyName string) error {
return err
}

func (u *UFileRequest) DownloadFileRetRespBody(keyName string, offset int64) (io.ReadCloser, error) {
reqURL := u.GetPrivateURL(keyName, 24*time.Hour)
req, err := http.NewRequest("GET", reqURL, nil)
req.Header.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
if err != nil {
return nil, err
}

resp, err := u.requestWithResp(req)
if err != nil {
return nil, err
}

u.LastResponseStatus = resp.StatusCode
u.LastResponseHeader = resp.Header
u.LastResponseBody = nil // 不要保存到内存!!!超过 128MB 的 body 会撑爆内存(其实似乎是因为 []byte 的最大容量为 128MB)
u.lastResponse = resp

if !VerifyHTTPCode(resp.StatusCode) {
// 如果 req 出错,此时可以将 resp.Body 保存到内存里,因为 resp.Body 里就只有 RetCode ErrMsg 等信息
defer resp.Body.Close()
resBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
u.LastResponseBody = resBody
return nil, fmt.Errorf("Remote response code is %d - %s not 2xx call DumpResponse(true) show details",
resp.StatusCode, http.StatusText(resp.StatusCode))
}
size := u.LastResponseHeader.Get("Content-Length")
fileSize, err := strconv.ParseInt(size, 10, 0)
if err != nil || fileSize < 0 {
return nil, fmt.Errorf("Parse content-lengt returned error")
}
return resp.Body, nil
}

//DownloadFileWithIopString 支持下载iop,直接指定iop命令字符串
func (u *UFileRequest) DownloadFileWithIopString(writer io.Writer, keyName string, iopcmd string) error {

Expand Down
169 changes: 169 additions & 0 deletions file_mutipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,53 @@ type MultipartState struct {
mux sync.Mutex
}

type Part struct {
Etag string
Size int
PartNum int
// LastModified int // 目前的需求里用不到。如果加上,[]Part 中每个元素的格式可能无法保持一致
}

type UploadIdDataSet struct {
UploadId string `json:"UploadId,omitempty"`
FileName string `json:"FileName,omitempty"`
StartTime int `json:"StartTime,omitempty"`
StroageClass string `json:"StroageClass,omitempty"`
}

type UploadIdResponse struct {
NextMarker string `json:"NextMarker,omitempty"`
UploadIdMarker string `json:"UploadIdMarker,omitempty"`
NextUploadMarker string `json:"NextUploadMarker,omitempty"`
Prefix string `json:"Prefix,omitempty"`
Limit int `json:"Limit,omitempty"`
IsTruncated bool `json:"IsTruncated,omitempty"`
DataSet []UploadIdDataSet `json:"DataSet,omitempty"`
}

type UploadPartResponse struct {
Key string `json:"Key,omitempty"`
StroageClass string `json:"UploadIdMarker,omitempty"`
UploadId string `json:"UploadId,omitempty"`
Status int `json:"Status,omitempty"`
IsTruncated bool `json:"IsTruncated,omitempty"`
NextPartNumberMarker int `json:"NextPartNumberMarker,omitempty"`
Parts []Part `json:"Parts,omitempty"`
}

func (m *MultipartState) GenerateMultipartState(blkSize int, uploadID, mimeType, keyName string, parts []*Part) {
m.BlkSize = blkSize
m.uploadID = uploadID
m.mimeType = mimeType
m.keyName = keyName
m.etags = make(map[int]string)
m.mux.Lock()
defer m.mux.Unlock()
for _, part := range parts {
m.etags[part.PartNum] = part.Etag
}
}

//UnmarshalJSON custom unmarshal json
func (m *MultipartState) UnmarshalJSON(bytes []byte) error {
tmp := struct {
Expand Down Expand Up @@ -266,6 +313,51 @@ func (u *UFileRequest) UploadPart(buf *bytes.Buffer, state *MultipartState, part
return nil
}

// 上传一个分片。构造并返回一个 Part
func (u *UFileRequest) UploadPartRetPart(buf *bytes.Buffer, state *MultipartState, partNumber int) (*Part, error) {
size := buf.Len()

query := &url.Values{}
query.Add("uploadId", state.uploadID)
query.Add("partNumber", strconv.Itoa(partNumber))

reqURL := u.genFileURL(state.keyName) + "?" + query.Encode()
req, err := http.NewRequest("PUT", reqURL, buf)
if err != nil {
return nil, err
}
if u.verifyUploadMD5 {
md5Str := fmt.Sprintf("%x", md5.Sum(buf.Bytes()))
req.Header.Add("Content-MD5", md5Str)
}

req.Header.Add("Content-Type", state.mimeType)
authorization := u.Auth.Authorization("PUT", u.BucketName, state.keyName, req.Header)
req.Header.Add("Authorization", authorization)
req.Header.Add("Content-Length", strconv.Itoa(buf.Len()))

resp, err := u.requestWithResp(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

etag := strings.Trim(resp.Header.Get("Etag"), "\"") //为保证线程安全,这里就不保留 lastResponse
if etag == "" {
etag = strings.Trim(resp.Header.Get("ETag"), "\"") //为保证线程安全,这里就不保留 lastResponse
}
state.mux.Lock()
state.etags[partNumber] = etag
state.mux.Unlock()

part := &Part{
Etag: etag,
Size: size,
PartNum: partNumber,
}
return part, nil
}

//FinishMultipartUpload 完成分片上传。分片上传必须要调用的接口。
//state 参数是 InitiateMultipartUpload 返回的
func (u *UFileRequest) FinishMultipartUpload(state *MultipartState) error {
Expand Down Expand Up @@ -293,6 +385,83 @@ func (u *UFileRequest) FinishMultipartUpload(state *MultipartState) error {
return u.request(req)
}

// 获取当前 bucket 正在进行分片上传的,但未 finish 的 upload 事件(即所有 multiState)
func (u *UFileRequest) GetMultiUploadId(prefix, marker, uploadIdMarker string, limit int) (list UploadIdResponse, err error) {
query := &url.Values{}
query.Add("muploadid", "")
if prefix != "" {
query.Add("prefix", prefix)
}
if marker != "" {
query.Add("marker", marker)
}
if limit != 0 {
query.Add("limit", strconv.Itoa(limit))
}
if uploadIdMarker != "" {
query.Add("upload-id-marker", uploadIdMarker)
}

reqURL := u.baseURL.String() + "?" + query.Encode()

req, err := http.NewRequest("GET", reqURL, nil)
if err != nil {
return
}

authorization := u.Auth.Authorization("GET", u.BucketName, "", req.Header)
req.Header.Add("authorization", authorization)

err = u.request(req)
if err != nil {
return
}

err = json.Unmarshal(u.LastResponseBody, &list)
return
}

func (u *UFileRequest) GetMultiUploadPart(uploadId string, maxParts, partNumberMarker int) (parts []*Part, err error) {
query := &url.Values{}
query.Add("muploadpart", "")
query.Add("uploadId", uploadId)
if maxParts < 0 || maxParts > 1000 {
maxParts = 1000
}
query.Add("max-parts", strconv.Itoa(maxParts))
if partNumberMarker < 0 {
partNumberMarker = 0
}
query.Add("part-number-marker", strconv.Itoa(partNumberMarker))

reqURL := u.baseURL.String() + "?" + query.Encode()

req, err := http.NewRequest("GET", reqURL, nil)
if err != nil {
return
}

authorization := u.Auth.Authorization("GET", u.BucketName, "", req.Header)
req.Header.Add("authorization", authorization)

err = u.request(req)
if err != nil {
return
}

var uploadPartResponse UploadPartResponse
err = json.Unmarshal(u.LastResponseBody, &uploadPartResponse)
if err != nil {
return
}

for _, part := range uploadPartResponse.Parts {
tmp := part
parts = append(parts, &tmp)
}
return
}

func divideCeil(a, b int64) int {
div := float64(a) / float64(b)
c := math.Ceil(div)
Expand Down
36 changes: 36 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ufsdk
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -37,6 +38,18 @@ type UFileRequest struct {
lastResponse *http.Response
}

type Error struct {
RetCode int
StatusCode int
BucketName string
ErrMsg string
// ...
}

func (e *Error) Error() string {
return fmt.Sprintf("US3 API Error: BucketName: %s; StatusCode: %d; RetCode: %d; ErrMsg: %s", e.BucketName, e.StatusCode, e.RetCode, e.ErrMsg)
}

//NewFileRequest 创建一个用于管理文件的 request,管理文件的 url 与 管理 bucket 接口不一样,
//请将 bucket 和文件管理所需要的分开,NewUBucketRequest 是用来管理 bucket 的。
//Request 创建后的 instance 不是线程安全的,如果你需要做并发的操作,请创建多个 UFileRequest。
Expand Down Expand Up @@ -116,6 +129,29 @@ func (u *UFileRequest) DumpResponse(isDumpBody bool) []byte {
return b.Bytes()
}

func (u *UFileRequest) ParseError() error {
if u.LastResponseBody == nil {
return fmt.Errorf("Stream downloads have no resp.Body stored in 'u', so you cannot parse the body to get RetCode and ErrMsg...")
}

var list struct {
RetCode int `json:"RetCode,omitempty"`
ErrMsg string `json:"ErrMsg,omitempty"`
}

err := json.Unmarshal(u.LastResponseBody, &list)
if err != nil {
return err
}

return &Error{
BucketName: u.BucketName,
StatusCode: u.LastResponseStatus,
RetCode: list.RetCode,
ErrMsg: list.ErrMsg,
}
}

func newRequest(publicKey, privateKey, bucket, host string, client *http.Client) *UFileRequest {
req := new(UFileRequest)
req.Auth = NewAuth(publicKey, privateKey)
Expand Down