Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions drivers/cloudreve/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File
switch r.Policy.Type {
case "onedrive":
err = d.upOneDrive(ctx, stream, u, up)
case "s3":
err = d.upS3(ctx, stream, u, up)
case "remote": // 从机存储
err = d.upRemote(ctx, stream, u, up)
case "local": // 本机存储
Expand Down
11 changes: 6 additions & 5 deletions drivers/cloudreve/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ type Policy struct {
}

type UploadInfo struct {
SessionID string `json:"sessionID"`
ChunkSize int `json:"chunkSize"`
Expires int `json:"expires"`
UploadURLs []string `json:"uploadURLs"`
Credential string `json:"credential,omitempty"`
SessionID string `json:"sessionID"`
ChunkSize int `json:"chunkSize"`
Expires int `json:"expires"`
UploadURLs []string `json:"uploadURLs"`
Credential string `json:"credential,omitempty"` // local
CompleteURL string `json:"completeURL,omitempty"` // s3
}

type DirectoryResp struct {
Expand Down
79 changes: 79 additions & 0 deletions drivers/cloudreve/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,82 @@ func (d *Cloudreve) upOneDrive(ctx context.Context, stream model.FileStreamer, u
}
return nil
}

func (d *Cloudreve) upS3(ctx context.Context, stream model.FileStreamer, u UploadInfo, up driver.UpdateProgress) error {
var finish int64 = 0
var chunk int = 0
var etags []string
DEFAULT := int64(u.ChunkSize)
for finish < stream.GetSize() {
if utils.IsCanceled(ctx) {
return ctx.Err()
}
utils.Log.Debugf("[Cloudreve-S3] upload: %d", finish)
var byteSize = DEFAULT
left := stream.GetSize() - finish
if left < DEFAULT {
byteSize = left
}
byteData := make([]byte, byteSize)
n, err := io.ReadFull(stream, byteData)
utils.Log.Debug(err, n)
if err != nil {
return err
}
req, err := http.NewRequest("PUT", u.UploadURLs[chunk],
driver.NewLimitedUploadStream(ctx, bytes.NewBuffer(byteData)))
if err != nil {
return err
}
req = req.WithContext(ctx)
req.ContentLength = byteSize
finish += byteSize
res, err := base.HttpClient.Do(req)
if err != nil {
return err
}
_ = res.Body.Close()
etags = append(etags, res.Header.Get("ETag"))
up(float64(finish) * 100 / float64(stream.GetSize()))
chunk++
}

// s3LikeFinishUpload
// https://github.com/cloudreve/frontend/blob/b485bf297974cbe4834d2e8e744ae7b7e5b2ad39/src/component/Uploader/core/api/index.ts#L204-L252
bodyBuilder := &strings.Builder{}
bodyBuilder.WriteString("<CompleteMultipartUpload>")
for i, etag := range etags {
bodyBuilder.WriteString(fmt.Sprintf(
`<Part><PartNumber>%d</PartNumber><ETag>%s</ETag></Part>`,
i+1, // PartNumber 从 1 开始
etag,
))
}
bodyBuilder.WriteString("</CompleteMultipartUpload>")
req, err := http.NewRequest(
"POST",
u.CompleteURL,
strings.NewReader(bodyBuilder.String()),
)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/xml")
req.Header.Set("User-Agent", d.getUA())
res, err := base.HttpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
body, _ := io.ReadAll(res.Body)
return fmt.Errorf("up status: %d, error: %s", res.StatusCode, string(body))
}

// 上传成功发送回调请求
err = d.request(http.MethodGet, "/callback/s3/"+u.SessionID, nil, nil)
if err != nil {
return err
}
return nil
}
Loading