Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
115e3da
feat:slice upload
jenken827 Aug 23, 2025
f4056db
feat(fs): implement slice upload
jenken827 Aug 26, 2025
50b55dd
fix(fs): Fix concurrent race conditions, optimize segment upload stat…
Suyunmeng Sep 5, 2025
6c32b69
fix(fs): Fix import path errors and unify context parameters
Suyunmeng Sep 5, 2025
e90303c
Merge branch 'main' into chunk_upload
Suyunmeng Sep 5, 2025
c37659e
fix(fs): 移除重复的清理逻辑
j2rong4cn Sep 5, 2025
507b19a
修复上传任务的文件被提前清理
j2rong4cn Sep 5, 2025
6fcb63c
fix(upload): Added task ID support, optimized segment upload logic an…
Suyunmeng Sep 5, 2025
109b116
fix(imports): fix build bugs
Suyunmeng Sep 5, 2025
e035412
feat(upload): Refactored the shard upload manager to optimize upload …
Suyunmeng Sep 5, 2025
70fbe17
refactor(upload): Refactor the global shard upload manager using dela…
Suyunmeng Sep 5, 2025
a58e939
feat(upload): Added the function of cleaning up orphan shard upload t…
Suyunmeng Sep 5, 2025
24411ec
feat(upload): Added the function of resuming unfinished multipart upl…
Suyunmeng Sep 5, 2025
05882df
统一持久化临时目录
j2rong4cn Sep 6, 2025
dc501f0
refactor(upload): Optimize SliceUploadManager with singleflight for s…
j2rong4cn Sep 6, 2025
bba51f4
feat(upload): 支持重启后恢复上传任务,优化临时文件验证和日志记录
Suyunmeng Sep 6, 2025
e365417
上传时检查是否覆盖
j2rong4cn Sep 6, 2025
e68f04e
fix(upload): 修复分片上传时hash值验证逻辑,确保完整hash列表不被覆盖
Suyunmeng Sep 6, 2025
7ccabf4
Merge branch 'chunk_upload' of https://github.com/jenken827/openlist …
Suyunmeng Sep 6, 2025
34c2b7d
fix(upload): 修复FsPreup函数中的文件存在检查逻辑
Suyunmeng Sep 6, 2025
bfdf98f
fix(upload): 修复FsPreup函数中的文件存在检查逻辑
j2rong4cn Sep 6, 2025
bcd8bba
fix(upload): 优化分片函数
Suyunmeng Sep 6, 2025
9c8f70f
fix(upload): 精简CreateSession和CompleteUpload函数,移除冗余逻辑
Suyunmeng Sep 6, 2025
f995c48
fix(upload): 初始化分片上传管理器并优化清理逻辑
Suyunmeng Sep 6, 2025
9d3d11b
fix(upload): 移除冗余
Suyunmeng Sep 6, 2025
b5a05cb
fix(upload): 优化全局分片上传管理器的初始化逻辑,移除冗余函数
j2rong4cn Sep 6, 2025
a438bb8
fix(upload): 调整日志级别
Suyunmeng Sep 6, 2025
d5153a2
Merge branch 'main' into chunk_upload
Suyunmeng Sep 12, 2025
db9aa34
Merge main branch
Suyunmeng Sep 23, 2025
2801021
Merge branch 'main' into pr/jenken827/1141
Suyunmeng Sep 23, 2025
5a30413
Merge branch 'main' into chunk_upload
Suyunmeng Sep 23, 2025
5087eb9
fix: remove duplicate method declarations
Suyunmeng Sep 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions drivers/123_open/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package _123_open
import (
"context"
"fmt"
"io"
"strconv"
"strings"
"time"

"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/model/reqres"
"github.com/OpenListTeam/OpenList/v4/internal/model/tables"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
Expand All @@ -28,6 +32,13 @@ func (d *Open123) GetAddition() driver.Additional {
return &d.Addition
}

func (d *Open123) GetUploadInfo() *model.UploadInfo {
return &model.UploadInfo{
SliceHashNeed: true,
HashMd5Need: true,
}
}

func (d *Open123) Init(ctx context.Context) error {
if d.UploadThread < 1 || d.UploadThread > 32 {
d.UploadThread = 3
Expand Down Expand Up @@ -214,5 +225,59 @@ func (d *Open123) Put(ctx context.Context, dstDir model.Obj, file model.FileStre
return nil, fmt.Errorf("upload complete timeout")
}

// Preup 预上传
func (d *Open123) Preup(ctx context.Context, srcobj model.Obj, req *reqres.PreupReq) (*model.PreupInfo, error) {
pid, err := strconv.ParseUint(srcobj.GetID(), 10, 64)
if err != nil {
return nil, err
}
duplicate := 1
if req.Overwrite {
duplicate = 2
}

ucr := &UploadCreateReq{
ParentFileID: pid,
Etag: req.Hash.Md5,
FileName: req.Name,
Size: int64(req.Size),
Duplicate: duplicate,
}

resp, err := d.uploadCreate(ucr)
if err != nil {
return nil, err
}
return &model.PreupInfo{
PreupID: resp.PreuploadID,
Server: resp.Servers[0],
SliceSize: resp.SliceSize,
Reuse: resp.Reuse,
}, nil
}

// UploadSlice 上传分片
func (d *Open123) SliceUpload(ctx context.Context, req *tables.SliceUpload, sliceno uint, fd io.Reader) error {
sh := strings.Split(req.SliceHash, ",")
if int(sliceno) >= len(sh) {
return fmt.Errorf("slice number %d out of range, total slices: %d", sliceno, len(sh))
}
r := &UploadSliceReq{
Name: req.Name,
PreuploadID: req.PreupID,
Server: req.Server,
Slice: fd,
SliceMD5: sh[sliceno],
SliceNo: int(sliceno) + 1,
}
return d.uploadSlice(r)
}

// UploadSliceComplete 分片上传完成
func (d *Open123) UploadSliceComplete(ctx context.Context, su *tables.SliceUpload) error {

return d.sliceUpComplete(su.PreupID)
}

var _ driver.Driver = (*Open123)(nil)
var _ driver.PutResult = (*Open123)(nil)
104 changes: 92 additions & 12 deletions drivers/123_open/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package _123_open

import (
"io"
"strconv"
"time"

Expand Down Expand Up @@ -165,18 +166,6 @@ type DirectLinkResp struct {
} `json:"data"`
}

// 创建文件V2返回
type UploadCreateResp struct {
BaseResp
Data struct {
FileID int64 `json:"fileID"`
PreuploadID string `json:"preuploadID"`
Reuse bool `json:"reuse"`
SliceSize int64 `json:"sliceSize"`
Servers []string `json:"servers"`
} `json:"data"`
}

// 上传完毕V2返回
type UploadCompleteResp struct {
BaseResp
Expand All @@ -185,3 +174,94 @@ type UploadCompleteResp struct {
FileID int64 `json:"fileID"`
} `json:"data"`
}

// UploadCreateReq 预上传请求
// parentFileID number 必填 父目录id,上传到根目录时填写 0
// filename string 必填 文件名要小于255个字符且不能包含以下任何字符:"\/:*?|><。(注:不能重名)
// containDir 为 true 时,传入路径+文件名,例如:/你好/123/测试文件.mp4
// etag string 必填 文件md5
// size number 必填 文件大小,单位为 byte 字节
// duplicate number 非必填 当有相同文件名时,文件处理策略(1保留两者,新文件名将自动添加后缀,2覆盖原文件)
// containDir bool 非必填 上传文件是否包含路径,默认false
type UploadCreateReq struct {
ParentFileID uint64 `json:"parentFileID"`
FileName string `json:"filename"`
Etag string `json:"etag"`
Size int64 `json:"size"`
Duplicate int `json:"duplicate"`
ContainDir bool `json:"containDir"`
}

type UploadCreateResp struct {
BaseResp
Data UploadCreateData `json:"data"`
}

// UploadCreateData 预上传响应
// fileID number 非必填 文件ID。当123云盘已有该文件,则会发生秒传。此时会将文件ID字段返回。唯一
// preuploadID string 必填 预上传ID(如果 reuse 为 true 时,该字段不存在)
// reuse boolean 必填 是否秒传,返回true时表示文件已上传成功
// sliceSize number 必填 分片大小,必须按此大小生成文件分片再上传
// servers array 必填 上传地址
type UploadCreateData struct {
FileID int64 `json:"fileID"`
PreuploadID string `json:"preuploadID"`
Reuse bool `json:"reuse"`
SliceSize int64 `json:"sliceSize"`
Servers []string `json:"servers"`
}

// UploadSliceReq 分片上传请求
// preuploadID string 必填 预上传ID
// sliceNo number 必填 分片序号,从1开始自增
// sliceMD5 string 必填 当前分片md5
// slice file 必填 分片二进制流
type UploadSliceReq struct {
Name string `json:"name"`
PreuploadID string `json:"preuploadID"`
SliceNo int `json:"sliceNo"`
SliceMD5 string `json:"sliceMD5"`
Slice io.Reader `json:"slice"`
Server string `json:"server"`
}

type SliceUpCompleteResp struct {
SingleUploadResp
}

type GetUploadServerResp struct {
BaseResp
Data []string `json:"data"`
}

// SingleUploadReq 单文件上传请求
// parentFileID number 必填 父目录id,上传到根目录时填写 0
// filename string 必填 文件名要小于255个字符且不能包含以下任何字符:"\/:*?|><。(注:不能重名)
//
// containDir 为 true 时,传入路径+文件名,例如:/你好/123/测试文件.mp4
//
// etag string 必填 文件md5
// size number 必填 文件大小,单位为 byte 字节
// file file 必填 文件二进制流
// duplicate number 非必填 当有相同文件名时,文件处理策略(1保留两者,新文件名将自动添加后缀,2覆盖原文件)
// containDir bool 非必填 上传文件是否包含路径,默认false
type SingleUploadReq struct {
ParentFileID int64 `json:"parentFileID"`
FileName string `json:"filename"`
Etag string `json:"etag"`
Size int64 `json:"size"`
File io.Reader `json:"file"`
Duplicate int `json:"duplicate"`
ContainDir bool `json:"containDir"`
}

// SingleUploadResp 单文件上传响应
type SingleUploadResp struct {
BaseResp
Data SingleUploadData `json:"data"`
}

type SingleUploadData struct {
FileID int64 `json:"fileID"`
Completed bool `json:"completed"`
}
62 changes: 62 additions & 0 deletions drivers/123_open/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"mime/multipart"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/avast/retry-go"
"github.com/go-resty/resty/v2"
log "github.com/sirupsen/logrus"
)

// 创建文件 V2
Expand Down Expand Up @@ -183,3 +185,63 @@ func (d *Open123) complete(preuploadID string) (*UploadCompleteResp, error) {
}
return &resp, nil
}

func (d *Open123) uploadSlice(req *UploadSliceReq) error {
_, err := d.Request(InitApiInfo(req.Server+"/upload/v2/file/slice", 0), http.MethodPost, func(rt *resty.Request) {
rt.SetHeader("Content-Type", "multipart/form-data")
rt.SetMultipartFormData(map[string]string{
"preuploadID": req.PreuploadID,
"sliceMD5": req.SliceMD5,
"sliceNo": strconv.FormatInt(int64(req.SliceNo), 10),
})
rt.SetMultipartField("slice", req.Name, "multipart/form-data", req.Slice)
}, nil)
return err
}

func (d *Open123) sliceUpComplete(uploadID string) error {
r := &SliceUpCompleteResp{}

b, err := d.Request(UploadComplete, http.MethodPost, func(req *resty.Request) {
req.SetBody(base.Json{
"preuploadID": uploadID,
})
}, r)
if err != nil {
log.Error("123 open uploadComplete error", err)
return err
}
log.Infof("upload complete,body: %s", string(b))
if r.Data.Completed {
return nil
}

return errors.New("upload uncomplete")

}

func (d *Open123) getUploadServer() (string, error) {
r := &GetUploadServerResp{}
body, err := d.Request(UploadFileDomain, "GET", nil, r)
if err != nil {
log.Error("get upload server failed", string(body), r, err)
return "", err
}
if len(r.Data) == 0 {
return "", errors.New("upload server is empty")
}

return r.Data[0], err
}

func (d *Open123) uploadCreate(uc *UploadCreateReq) (*UploadCreateData, error) {
r := &UploadCreateResp{}
_, err := d.Request(UploadCreate, http.MethodPost, func(req *resty.Request) {
req.SetBody(uc)
}, r)
if err != nil {
log.Error("123 open uploadCreate error", err)
}
return &r.Data, err

}
28 changes: 16 additions & 12 deletions drivers/123_open/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ import (
var ( //不同情况下获取的AccessTokenQPS限制不同 如下模块化易于拓展
Api = "https://open-api.123pan.com"

AccessToken = InitApiInfo(Api+"/api/v1/access_token", 1)
RefreshToken = InitApiInfo(Api+"/api/v1/oauth2/access_token", 1)
UserInfo = InitApiInfo(Api+"/api/v1/user/info", 1)
FileList = InitApiInfo(Api+"/api/v2/file/list", 3)
DownloadInfo = InitApiInfo(Api+"/api/v1/file/download_info", 5)
DirectLink = InitApiInfo(Api+"/api/v1/direct-link/url", 5)
Mkdir = InitApiInfo(Api+"/upload/v1/file/mkdir", 2)
Move = InitApiInfo(Api+"/api/v1/file/move", 1)
Rename = InitApiInfo(Api+"/api/v1/file/name", 1)
Trash = InitApiInfo(Api+"/api/v1/file/trash", 2)
UploadCreate = InitApiInfo(Api+"/upload/v2/file/create", 2)
UploadComplete = InitApiInfo(Api+"/upload/v2/file/upload_complete", 0)
AccessToken = InitApiInfo(Api+"/api/v1/access_token", 1)
RefreshToken = InitApiInfo(Api+"/api/v1/oauth2/access_token", 1)
UserInfo = InitApiInfo(Api+"/api/v1/user/info", 1)
FileList = InitApiInfo(Api+"/api/v2/file/list", 3)
DownloadInfo = InitApiInfo(Api+"/api/v1/file/download_info", 5)
DirectLink = InitApiInfo(Api+"/api/v1/direct-link/url", 5)
Mkdir = InitApiInfo(Api+"/upload/v1/file/mkdir", 2)
Move = InitApiInfo(Api+"/api/v1/file/move", 1)
Rename = InitApiInfo(Api+"/api/v1/file/name", 1)
Trash = InitApiInfo(Api+"/api/v1/file/trash", 2)
UploadCreate = InitApiInfo(Api+"/upload/v2/file/create", 2)
UploadComplete = InitApiInfo(Api+"/upload/v2/file/upload_complete", 0)
UploadFileDomain = InitApiInfo(Api+"/upload/v2/file/domain", 0)
)

func (d *Open123) Request(apiInfo *ApiInfo, method string, callback base.ReqCallback, resp interface{}) ([]byte, error) {
Expand Down Expand Up @@ -78,7 +79,10 @@ func (d *Open123) Request(apiInfo *ApiInfo, method string, callback base.ReqCall
} else if baseResp.Code == 429 {
time.Sleep(500 * time.Millisecond)
log.Warningf("API: %s, QPS: %d, 请求太频繁,对应API提示过多请减小QPS", apiInfo.url, apiInfo.qps)
} else if baseResp.Code == 20103 { //code: 20103, error: 文件正在校验中,请间隔1秒后再试
time.Sleep(2 * time.Second)
} else {
log.Errorf("API: %s, body:%s, code: %d, error: %s", apiInfo.url, res.Body(), baseResp.Code, baseResp.Message)
return nil, errors.New(baseResp.Message)
}
}
Expand Down
Loading