Skip to content

Commit dff215c

Browse files
author
yang.zhao
committed
update dep && fix baidu logic
1 parent d880578 commit dff215c

File tree

2 files changed

+111
-82
lines changed

2 files changed

+111
-82
lines changed

backend/baidu/baidu.go

Lines changed: 101 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"encoding/json"
99
"errors"
1010
"fmt"
11+
"hash/crc32"
1112
"io"
1213
"net/http"
1314
"os"
@@ -38,11 +39,13 @@ const (
3839
uriOauthToken = "/oauth/2.0/token"
3940
uriFile = "/rest/2.0/xpan/file"
4041
uriSuperFile = "/rest/2.0/pcs/superfile2"
42+
uriPCSFile = "/rest/2.0/pcs/file"
4143
uriMultimedia = "/rest/2.0/xpan/multimedia"
4244
uriQuota = "/api/quota"
4345

4446
//
45-
chunkSize = 4 * 1024 * 1024 //分片大小4M
47+
chunkSize = 4 * 1024 * 1024 //分片大小4M
48+
rapidUploadThreshold = 256 * 1024 //秒传阈值256KB
4649
)
4750

4851
// Options defines the configuration for this backend
@@ -696,100 +699,120 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
696699
return o.upload(ctx, in, src.Size())
697700
}
698701

699-
func fileMd5(file *os.File) string {
700-
file.Seek(0, 0)
701-
hash := md5.New()
702-
io.Copy(hash, file)
703-
return hex.EncodeToString(hash.Sum(nil))
704-
}
705-
706702
func (o *Object) upload(ctx context.Context, in io.Reader, size int64) error {
707-
var md5s []string
708-
var files []*os.File
709-
710-
defer func() {
711-
for _, f := range files {
712-
if f != nil {
713-
os.Remove(f.Name())
714-
f.Close()
703+
remote := "/" + strings.TrimLeft(o.remote, "/")
704+
705+
// 1. 尝试秒传 (仅针对已知大小且较大的文件尝试)
706+
if size > rapidUploadThreshold {
707+
// 这里尝试从 reader 获取底层文件以计算秒传所需的哈希
708+
// NOTE: 这种方式不一定总是成功,取决于 rclone 如何传递流
709+
if f, ok := in.(*os.File); ok {
710+
contentMD5, sliceMD5, crc32Val, err := o.computeLocalHashes(f.Name())
711+
if err == nil {
712+
err = o.rapidUpload(ctx, remote, contentMD5, sliceMD5, fmt.Sprintf("%x", crc32Val), size)
713+
if err == nil {
714+
return nil // 秒传成功
715+
}
716+
fs.Debugf(o, "秒传失败: %v,转为普通上传", err)
715717
}
716718
}
717-
}()
719+
}
720+
721+
// 2. 流式切片上传 (方案 A)
722+
var md5s []string
723+
buf := make([]byte, chunkSize)
724+
var uploaded int64 = 0
718725

719726
for {
720-
tFile, err := os.CreateTemp("", "rclone_baidu_")
721-
if err != nil {
722-
return err
723-
}
724-
n, err := io.CopyN(tFile, in, chunkSize)
725-
if err != nil && n == 0 {
726-
os.Remove(tFile.Name())
727-
tFile.Close()
728-
if err == io.EOF {
729-
break
727+
n, err := io.ReadFull(in, buf)
728+
if n > 0 {
729+
md5sum, uploadErr := o.sliceUpload(ctx, remote, bytes.NewReader(buf[:n]), int64(n))
730+
if uploadErr != nil {
731+
return fmt.Errorf("分片上传失败: %w", uploadErr)
730732
}
733+
md5s = append(md5s, md5sum)
734+
uploaded += int64(n)
735+
}
736+
if err == io.EOF || err == io.ErrUnexpectedEOF {
737+
break
738+
}
739+
if err != nil {
731740
return err
732741
}
733-
md5s = append(md5s, fileMd5(tFile))
734-
files = append(files, tFile)
735742
}
736-
md5ListBytes, _ := json.Marshal(md5s)
737743

738-
out, err := o.preUpload(ctx, string(md5ListBytes), o.remote, size)
744+
if len(md5s) == 0 && size > 0 {
745+
return errors.New("没有读取到任何数据")
746+
}
747+
748+
// 3. 合并文件
749+
md5ListBytes, _ := json.Marshal(md5s)
750+
file, err := o.complete(ctx, remote, string(md5ListBytes))
739751
if err != nil {
740-
return err
752+
return fmt.Errorf("合并文件失败: %w", err)
741753
}
754+
o.id = strconv.FormatUint(file.FsId, 10)
755+
o.path = file.Path
756+
return nil
757+
}
742758

743-
if len(out.BlockList) != len(files) {
744-
return errors.New("file chunk size is error")
759+
func (o *Object) computeLocalHashes(lpath string) (contentMD5, sliceMD5 string, crc32Val uint32, err error) {
760+
f, err := os.Open(lpath)
761+
if err != nil {
762+
return "", "", 0, err
745763
}
764+
defer f.Close()
746765

747-
for k, file := range files {
748-
info, err := file.Stat()
749-
if err != nil {
750-
return err
751-
}
752-
err = o.sliceUpload(ctx, o.remote, out.UploadId, k, file, info.Size())
753-
if err != nil {
754-
return err
755-
}
756-
os.Remove(file.Name())
757-
file.Close()
766+
hMD5 := md5.New()
767+
hCRC32 := crc32.NewIEEE()
768+
hSliceMD5 := md5.New()
769+
770+
mw := io.MultiWriter(hMD5, hCRC32)
771+
772+
// 读取前 256KB 算 Slice-MD5
773+
limitReader := io.LimitReader(f, rapidUploadThreshold)
774+
teeReader := io.TeeReader(limitReader, hSliceMD5)
775+
776+
_, err = io.Copy(mw, teeReader)
777+
if err != nil {
778+
return "", "", 0, err
758779
}
759-
file, err := o.complete(ctx, o.remote, string(md5ListBytes), out.UploadId, size)
780+
781+
// 读取剩余部分
782+
_, err = io.Copy(mw, f)
760783
if err != nil {
761-
return err
784+
return "", "", 0, err
762785
}
763-
o.id = strconv.FormatUint(file.FsId, 10)
764-
o.path = file.Path
765-
return nil
786+
787+
return hex.EncodeToString(hMD5.Sum(nil)), hex.EncodeToString(hSliceMD5.Sum(nil)), hCRC32.Sum32(), nil
766788
}
767789

768-
// 预上传
769-
func (o *Object) preUpload(ctx context.Context, md5ListJson string, remote string, size int64) (PreUploadOut, error) {
790+
// 秒传
791+
func (o *Object) rapidUpload(ctx context.Context, remote, contentMD5, sliceMD5, crc32Val string, size int64) error {
770792
opts := &rest.Opts{
771793
Method: "POST",
772-
RootURL: rootUrl,
773-
Path: uriFile,
794+
RootURL: uploadUrl,
795+
Path: uriPCSFile,
774796
Parameters: map[string][]string{
775-
"method": {"precreate"},
797+
"method": {"rapidupload"},
798+
"path": {remote},
799+
"content-length": {strconv.FormatInt(size, 10)},
800+
"content-md5": {contentMD5},
801+
"slice-md5": {sliceMD5},
802+
"content-crc32": {crc32Val},
803+
"ondup": {"overwrite"},
776804
},
777-
Body: bytes.NewBuffer([]byte(fmt.Sprintf("path=%s&size=%d&rtype=0&isdir=0&autoinit=1&block_list=%s", "/"+strings.TrimLeft(remote, "/"), size, md5ListJson))),
778805
}
779-
resp := PreUploadOut{}
806+
resp := RapidUploadOut{}
780807
err := o.fs.call(ctx, opts, &resp)
781-
return resp, err
808+
return err
782809
}
783810

784811
// 分片上传
785-
func (o *Object) sliceUpload(ctx context.Context, remote, uploadId string, partSeq int, file *os.File, size int64) (err error) {
786-
if file == nil {
787-
return errors.New("file error")
788-
}
789-
file.Seek(0, 0)
790-
formReader, contentType, overhead, err := rest.MultipartUpload(ctx, file, nil, "file", "file")
812+
func (o *Object) sliceUpload(ctx context.Context, remote string, in io.Reader, size int64) (md5sum string, err error) {
813+
formReader, contentType, overhead, err := rest.MultipartUpload(ctx, in, nil, "file", "file", "")
791814
if err != nil {
792-
return fmt.Errorf("failed to make multipart upload for 0 length file: %w", err)
815+
return "", err
793816
}
794817
contentLength := size + overhead
795818
opts := &rest.Opts{
@@ -799,28 +822,29 @@ func (o *Object) sliceUpload(ctx context.Context, remote, uploadId string, partS
799822
ContentType: contentType,
800823
ContentLength: &contentLength,
801824
Parameters: map[string][]string{
802-
"method": {"upload"},
803-
"type": {"tmpfile"},
804-
"path": {"/" + strings.TrimLeft(remote, "/")},
805-
"uploadid": {uploadId},
806-
"partseq": {strconv.Itoa(partSeq)},
825+
"method": {"upload"},
826+
"type": {"tmpfile"},
827+
"path": {remote},
807828
},
808829
Body: formReader,
809830
}
810831
resp := SliceUploadOut{}
811-
return o.fs.call(ctx, opts, &resp)
832+
err = o.fs.call(ctx, opts, &resp)
833+
return resp.Md5, err
812834
}
813835

814-
// 完成上传
815-
func (o *Object) complete(ctx context.Context, remote, md5ListJson, uploadId string, size int64) (FileEntity, error) {
836+
// 合并上传 (createsuperfile)
837+
func (o *Object) complete(ctx context.Context, remote, md5ListJson string) (FileEntity, error) {
816838
opts := &rest.Opts{
817839
Method: "POST",
818-
RootURL: rootUrl,
819-
Path: uriFile,
840+
RootURL: uploadUrl,
841+
Path: uriPCSFile,
820842
Parameters: map[string][]string{
821-
"method": {"create"},
843+
"method": {"createsuperfile"},
844+
"path": {remote},
845+
"ondup": {"overwrite"},
822846
},
823-
Body: bytes.NewBuffer([]byte(fmt.Sprintf("path=%s&size=%d&rtype=0&isdir=0&autoinit=1&block_list=%s&uploadid=%s", "/"+strings.TrimLeft(remote, "/"), size, md5ListJson, uploadId))),
847+
Body: bytes.NewBuffer([]byte(fmt.Sprintf("param=%s", `{"block_list":`+md5ListJson+`}`))),
824848
}
825849
resp := FileEntity{}
826850
err := o.fs.call(ctx, opts, &resp)

backend/baidu/entity.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ type FileEntity struct {
2222
FsId uint64 `json:"fs_id"` //文件在云端的唯一标识ID
2323
Path string `json:"path"` //文件的绝对路径
2424
ServerFilename string `json:"server_filename"` //文件名称
25-
Size uint `json:"size"` //文件大小,单位B
26-
ServerMtime uint `json:"server_mtime"` //文件在服务器修改时间
27-
ServerCtime uint `json:"server_ctime"` //文件在服务器创建时间
28-
LocalMtime uint `json:"local_mtime"` //文件在客户端修改时间
29-
LocalCtime uint `json:"local_ctime"` //文件在客户端创建时间
25+
Size int64 `json:"size"` //文件大小,单位B
26+
ServerMtime int64 `json:"server_mtime"` //文件在服务器修改时间
27+
ServerCtime int64 `json:"server_ctime"` //文件在服务器创建时间
28+
LocalMtime int64 `json:"local_mtime"` //文件在客户端修改时间
29+
LocalCtime int64 `json:"local_ctime"` //文件在客户端创建时间
3030
IsDir uint `json:"isdir"` //是否为目录,0 文件、1 目录
3131
Md5 string `json:"md5"` //云端哈希(非文件真实MD5),只有是文件类型时,该字段才存在
3232
DirEmpty int `json:"dir_empty"` //该目录是否存在子目录,只有请求参数web=1且该条目为目录时,该字段才存在, 0为存在, 1为不存在
@@ -73,6 +73,11 @@ type SliceUploadOut struct {
7373
Md5 string `json:"md5"` //
7474
}
7575

76+
type RapidUploadOut struct {
77+
ErrorOut
78+
Info FileEntity `json:"info"`
79+
}
80+
7681
/**
7782
list json array 文件信息列表
7883
names json 如果查询共享目录,该字段为共享目录文件上传者的uk和账户名称

0 commit comments

Comments
 (0)