Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 151 additions & 69 deletions drivers/baidu_netdisk/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,26 @@ import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"net/url"
"os"
stdpath "path"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/sync/semaphore"

"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/errs"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/pkg/errgroup"
"github.com/alist-org/alist/v3/pkg/singleflight"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/avast/retry-go"
"github.com/go-resty/resty/v2"
log "github.com/sirupsen/logrus"
)

Expand All @@ -31,8 +34,16 @@ type BaiduNetdisk struct {

uploadThread int
vipType int // 会员类型,0普通用户(4G/4M)、1普通会员(10G/16M)、2超级会员(20G/32M)

upClient *resty.Client // 上传文件使用的http客户端
uploadUrlG singleflight.Group[string]
uploadUrlMu sync.RWMutex
uploadUrl string // 上传域名
uploadUrlUpdateTime time.Time // 上传域名上次更新时间
}

var ErrUploadIDExpired = errors.New("uploadid expired")

func (d *BaiduNetdisk) Config() driver.Config {
return config
}
Expand All @@ -42,19 +53,26 @@ func (d *BaiduNetdisk) GetAddition() driver.Additional {
}

func (d *BaiduNetdisk) Init(ctx context.Context) error {
d.upClient = base.NewRestyClient().
SetTimeout(UPLOAD_TIMEOUT).
SetRetryCount(UPLOAD_RETRY_COUNT).
SetRetryWaitTime(UPLOAD_RETRY_WAIT_TIME).
SetRetryMaxWaitTime(UPLOAD_RETRY_MAX_WAIT_TIME)
d.uploadThread, _ = strconv.Atoi(d.UploadThread)
if d.uploadThread < 1 || d.uploadThread > 32 {
d.uploadThread, d.UploadThread = 3, "3"
if d.uploadThread < 1 {
d.uploadThread, d.UploadThread = 1, "1"
} else if d.uploadThread > 32 {
d.uploadThread, d.UploadThread = 32, "32"
}

if _, err := url.Parse(d.UploadAPI); d.UploadAPI == "" || err != nil {
d.UploadAPI = "https://d.pcs.baidu.com"
d.UploadAPI = UPLOAD_FALLBACK_API
}

res, err := d.get("/xpan/nas", map[string]string{
"method": "uinfo",
}, nil)
log.Debugf("[baidu] get uinfo: %s", string(res))
log.Debugf("[baidu_netdisk] get uinfo: %s", string(res))
if err != nil {
return err
}
Expand Down Expand Up @@ -181,6 +199,11 @@ func (d *BaiduNetdisk) PutRapid(ctx context.Context, dstDir model.Obj, stream mo
// **注意**: 截至 2024/04/20 百度云盘 api 接口返回的时间永远是当前时间,而不是文件时间。
// 而实际上云盘存储的时间是文件时间,所以此处需要覆盖时间,保证缓存与云盘的数据一致
func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
// 百度网盘不允许上传空文件
if stream.GetSize() < 1 {
return nil, ErrBaiduEmptyFilesNotAllowed
}

// rapid upload
if newObj, err := d.PutRapid(ctx, dstDir, stream); err == nil {
return newObj, nil
Expand Down Expand Up @@ -245,7 +268,7 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F
}
if tmpF != nil {
if written != streamSize {
return nil, errs.NewErr(err, "CreateTempFile failed, incoming stream actual size= %d, expect = %d ", written, streamSize)
return nil, errs.NewErr(err, "CreateTempFile failed, size mismatch: %d != %d ", written, streamSize)
}
_, err = tmpF.Seek(0, io.SeekStart)
if err != nil {
Expand All @@ -259,82 +282,97 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F
mtime := stream.ModTime().Unix()
ctime := stream.CreateTime().Unix()

// step.1 预上传
// 尝试获取之前的进度
// step.1 尝试读取已保存进度
precreateResp, ok := base.GetUploadProgress[*PrecreateResp](d, d.AccessToken, contentMd5)
if !ok {
params := map[string]string{
"method": "precreate",
}
form := map[string]string{
"path": path,
"size": strconv.FormatInt(streamSize, 10),
"isdir": "0",
"autoinit": "1",
"rtype": "3",
"block_list": blockListStr,
"content-md5": contentMd5,
"slice-md5": sliceMd5,
}
joinTime(form, ctime, mtime)

log.Debugf("[baidu_netdisk] precreate data: %s", form)
_, err = d.postForm("/xpan/file", params, form, &precreateResp)
// 没有进度,走预上传
precreateResp, err = d.precreate(ctx, path, streamSize, blockListStr, contentMd5, sliceMd5, ctime, mtime)
if err != nil {
return nil, err
}
log.Debugf("%+v", precreateResp)
if precreateResp.ReturnType == 2 {
//rapid upload, since got md5 match from baidu server
// 修复时间,具体原因见 Put 方法注释的 **注意**
precreateResp.File.Ctime = ctime
precreateResp.File.Mtime = mtime
return fileToObj(precreateResp.File), nil
}
}

// step.2 上传分片
threadG, upCtx := errgroup.NewGroupWithContext(ctx, d.uploadThread,
retry.Attempts(1),
retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay))
sem := semaphore.NewWeighted(3)
for i, partseq := range precreateResp.BlockList {
if utils.IsCanceled(upCtx) {
break
uploadLoop:
for attempt := 0; attempt < 2; attempt++ {
// 获取上传域名
uploadUrl := d.getUploadUrl(path, precreateResp.Uploadid)
// 并发上传
threadG, upCtx := errgroup.NewGroupWithContext(ctx, d.uploadThread,
retry.Attempts(1),
retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay))

cacheReaderAt, okReaderAt := cache.(io.ReaderAt)
if !okReaderAt {
return nil, fmt.Errorf("cache object must implement io.ReaderAt interface for upload operations")
}

i, partseq, offset, byteSize := i, partseq, int64(partseq)*sliceSize, sliceSize
if partseq+1 == count {
byteSize = lastBlockSize
}
threadG.Go(func(ctx context.Context) error {
if err = sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
params := map[string]string{
"method": "upload",
"access_token": d.AccessToken,
"type": "tmpfile",
"path": path,
"uploadid": precreateResp.Uploadid,
"partseq": strconv.Itoa(partseq),
totalParts := len(precreateResp.BlockList)
for i, partseq := range precreateResp.BlockList {
if utils.IsCanceled(upCtx) || partseq < 0 {
continue
}
err := d.uploadSlice(ctx, params, stream.GetName(),
driver.NewLimitedUploadStream(ctx, io.NewSectionReader(cache, offset, byteSize)))
if err != nil {
return err

i, partseq := i, partseq
offset, size := int64(partseq)*sliceSize, sliceSize
if partseq+1 == count {
size = lastBlockSize
}
up(float64(threadG.Success()) * 100 / float64(len(precreateResp.BlockList)))
precreateResp.BlockList[i] = -1
return nil
})
}
if err = threadG.Wait(); err != nil {
// 如果属于用户主动取消,则保存上传进度
threadG.Go(func(ctx context.Context) error {
params := map[string]string{
"method": "upload",
"access_token": d.AccessToken,
"type": "tmpfile",
"path": path,
"uploadid": precreateResp.Uploadid,
"partseq": strconv.Itoa(partseq),
}
section := io.NewSectionReader(cacheReaderAt, offset, size)
err := d.uploadSlice(ctx, uploadUrl, params, stream.GetName(), driver.NewLimitedUploadStream(ctx, section))
if err != nil {
return err
}
precreateResp.BlockList[i] = -1
// 当前goroutine还没退出,+1才是真正成功的数量
success := threadG.Success() + 1
progress := float64(success) * 100 / float64(totalParts)
up(progress)
return nil
})
}

err = threadG.Wait()
if err == nil {
break uploadLoop
}

// 保存进度(所有错误都会保存)
precreateResp.BlockList = utils.SliceFilter(precreateResp.BlockList, func(s int) bool { return s >= 0 })
base.SaveUploadProgress(d, precreateResp, d.AccessToken, contentMd5)

if errors.Is(err, context.Canceled) {
precreateResp.BlockList = utils.SliceFilter(precreateResp.BlockList, func(s int) bool { return s >= 0 })
return nil, err
}
if errors.Is(err, ErrUploadIDExpired) {
log.Warn("[baidu_netdisk] uploadid expired, will restart from scratch")
// 重新 precreate(所有分片都要重传)
newPre, err2 := d.precreate(ctx, path, streamSize, blockListStr, "", "", ctime, mtime)
if err2 != nil {
return nil, err2
}
if newPre.ReturnType == 2 {
return fileToObj(newPre.File), nil
}
precreateResp = newPre
// 覆盖掉旧的进度
base.SaveUploadProgress(d, precreateResp, d.AccessToken, contentMd5)
continue uploadLoop
}
return nil, err
}
Expand All @@ -348,23 +386,67 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F
// 修复时间,具体原因见 Put 方法注释的 **注意**
newFile.Ctime = ctime
newFile.Mtime = mtime
// 上传成功清理进度
base.SaveUploadProgress(d, nil, d.AccessToken, contentMd5)
return fileToObj(newFile), nil
}

func (d *BaiduNetdisk) uploadSlice(ctx context.Context, params map[string]string, fileName string, file io.Reader) error {
res, err := base.RestyClient.R().
// precreate 执行预上传操作,支持首次上传和 uploadid 过期重试
func (d *BaiduNetdisk) precreate(ctx context.Context, path string, streamSize int64, blockListStr, contentMd5, sliceMd5 string, ctime, mtime int64) (*PrecreateResp, error) {
params := map[string]string{"method": "precreate"}
form := map[string]string{
"path": path,
"size": strconv.FormatInt(streamSize, 10),
"isdir": "0",
"autoinit": "1",
"rtype": "3",
"block_list": blockListStr,
}

// 只有在首次上传时才包含 content-md5 和 slice-md5
if contentMd5 != "" && sliceMd5 != "" {
form["content-md5"] = contentMd5
form["slice-md5"] = sliceMd5
}

joinTime(form, ctime, mtime)

var precreateResp PrecreateResp
_, err := d.postForm("/xpan/file", params, form, &precreateResp)
if err != nil {
return nil, err
}

// 修复时间,具体原因见 Put 方法注释的 **注意**
if precreateResp.ReturnType == 2 {
precreateResp.File.Ctime = ctime
precreateResp.File.Mtime = mtime
}

return &precreateResp, nil
}

func (d *BaiduNetdisk) uploadSlice(ctx context.Context, uploadUrl string, params map[string]string, fileName string, file io.Reader) error {
res, err := d.upClient.R().
SetContext(ctx).
SetQueryParams(params).
SetFileReader("file", fileName, file).
Post(d.UploadAPI + "/rest/2.0/pcs/superfile2")
Post(uploadUrl + "/rest/2.0/pcs/superfile2")
if err != nil {
return err
}
log.Debugln(res.RawResponse.Status + res.String())
errCode := utils.Json.Get(res.Body(), "error_code").ToInt()
errNo := utils.Json.Get(res.Body(), "errno").ToInt()
respStr := res.String()
lower := strings.ToLower(respStr)
if strings.Contains(lower, "uploadid") &&
(strings.Contains(lower, "invalid") || strings.Contains(lower, "expired") || strings.Contains(lower, "not found")) {
return ErrUploadIDExpired
}

if errCode != 0 || errNo != 0 {
return errs.NewErr(errs.StreamIncomplete, "error in uploading to baidu, will retry. response=%s", res.String())
return errs.NewErr(errs.StreamIncomplete, "error uploading to baidu, response=%s", res.String())
}
return nil
}
Expand Down
12 changes: 12 additions & 0 deletions drivers/baidu_netdisk/meta.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package baidu_netdisk

import (
"time"

"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/op"
)
Expand All @@ -17,11 +19,21 @@ type Addition struct {
AccessToken string
UploadThread string `json:"upload_thread" default:"3" help:"1<=thread<=32"`
UploadAPI string `json:"upload_api" default:"https://d.pcs.baidu.com"`
UseDynamicUploadAPI bool `json:"use_dynamic_upload_api" default:"true" help:"dynamically get upload api domain, when enabled, the 'Upload API' setting will be used as a fallback if failed to get"`
CustomUploadPartSize int64 `json:"custom_upload_part_size" type:"number" default:"0" help:"0 for auto"`
LowBandwithUploadMode bool `json:"low_bandwith_upload_mode" default:"false"`
OnlyListVideoFile bool `json:"only_list_video_file" default:"false"`
}

const (
UPLOAD_FALLBACK_API = "https://d.pcs.baidu.com" // 备用上传地址
UPLOAD_URL_EXPIRE_TIME = time.Minute * 60 // 上传地址有效期(分钟)
UPLOAD_TIMEOUT = time.Minute * 30 // 上传请求超时时间
UPLOAD_RETRY_COUNT = 3
UPLOAD_RETRY_WAIT_TIME = time.Second * 1
UPLOAD_RETRY_MAX_WAIT_TIME = time.Second * 5
)

var config = driver.Config{
Name: "BaiduNetdisk",
DefaultRoot: "/",
Expand Down
Loading