diff --git a/cmd/server.go b/cmd/server.go index 3758009f6..b9b7d9118 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -44,6 +44,7 @@ the address is defined in config file`, bootstrap.InitOfflineDownloadTools() bootstrap.LoadStorages() bootstrap.InitTaskManager() + fs.InitSliceUploadManager() if !flags.Debug && !flags.Dev { gin.SetMode(gin.ReleaseMode) } diff --git a/drivers/123_open/driver.go b/drivers/123_open/driver.go index ba608bc0f..fa279d5da 100644 --- a/drivers/123_open/driver.go +++ b/drivers/123_open/driver.go @@ -3,12 +3,16 @@ package _123_open import ( "context" "fmt" + "io" "strconv" + "strings" "time" "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/model/reqres" + "github.com/OpenListTeam/OpenList/v4/internal/model/tables" "github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/pkg/utils" @@ -28,6 +32,13 @@ func (d *Open123) GetAddition() driver.Additional { return &d.Addition } +func (d *Open123) GetUploadInfo() *model.UploadInfo { + return &model.UploadInfo{ + SliceHashNeed: true, + HashMd5Need: true, + } +} + func (d *Open123) Init(ctx context.Context) error { if d.UploadThread < 1 || d.UploadThread > 32 { d.UploadThread = 3 @@ -214,6 +225,79 @@ func (d *Open123) Put(ctx context.Context, dstDir model.Obj, file model.FileStre return nil, fmt.Errorf("upload complete timeout") } +// Preup 预上传 +func (d *Open123) Preup(ctx context.Context, srcobj model.Obj, req *reqres.PreupReq) (*model.PreupInfo, error) { + pid, err := strconv.ParseUint(srcobj.GetID(), 10, 64) + if err != nil { + return nil, err + } + duplicate := 1 + if req.Overwrite { + duplicate = 2 + } + + ucr := &UploadCreateReq{ + ParentFileID: pid, + Etag: req.Hash.Md5, + FileName: req.Name, + Size: int64(req.Size), + Duplicate: duplicate, + } + + resp, err := d.uploadCreate(ucr) + if err != nil { + return nil, err + } + return &model.PreupInfo{ + PreupID: resp.PreuploadID, + Server: resp.Servers[0], + SliceSize: resp.SliceSize, + Reuse: resp.Reuse, + }, nil +} + +// UploadSlice 上传分片 +func (d *Open123) SliceUpload(ctx context.Context, req *tables.SliceUpload, sliceno uint, fd io.Reader) error { + sh := strings.Split(req.SliceHash, ",") + if int(sliceno) >= len(sh) { + return fmt.Errorf("slice number %d out of range, total slices: %d", sliceno, len(sh)) + } + + if req.PreupID == "" { + return fmt.Errorf("preupload ID is empty for slice %d", sliceno) + } + + if req.Server == "" { + return fmt.Errorf("upload server is empty for slice %d", sliceno) + } + + r := &UploadSliceReq{ + Name: req.Name, + PreuploadID: req.PreupID, + Server: req.Server, + Slice: fd, + SliceMD5: sh[sliceno], + SliceNo: int(sliceno) + 1, + } + + if err := d.uploadSlice(r); err != nil { + return fmt.Errorf("upload slice %d failed: %w", sliceno, err) + } + return nil +} + +// UploadSliceComplete 分片上传完成 +func (d *Open123) UploadSliceComplete(ctx context.Context, su *tables.SliceUpload) error { + if su.PreupID == "" { + return fmt.Errorf("preupload ID is empty") + } + + if err := d.sliceUpComplete(su.PreupID); err != nil { + return fmt.Errorf("slice upload complete failed: %w", err) + } + return nil +} + func (d *Open123) GetDetails(ctx context.Context) (*model.StorageDetails, error) { userInfo, err := d.getUserInfo() if err != nil { diff --git a/drivers/123_open/types.go b/drivers/123_open/types.go index 70257d84f..ca095d5f2 100644 --- a/drivers/123_open/types.go +++ b/drivers/123_open/types.go @@ -1,6 +1,7 @@ package _123_open import ( + "io" "strconv" "time" @@ -165,18 +166,6 @@ type DirectLinkResp struct { } `json:"data"` } -// 创建文件V2返回 -type UploadCreateResp struct { - BaseResp - Data struct { - FileID int64 `json:"fileID"` - PreuploadID string `json:"preuploadID"` - Reuse bool `json:"reuse"` - SliceSize int64 `json:"sliceSize"` - Servers []string `json:"servers"` - } `json:"data"` -} - // 上传完毕V2返回 type UploadCompleteResp struct { BaseResp @@ -185,3 +174,94 @@ type UploadCompleteResp struct { FileID int64 `json:"fileID"` } `json:"data"` } + +// UploadCreateReq 预上传请求 +// parentFileID number 必填 父目录id,上传到根目录时填写 0 +// filename string 必填 文件名要小于255个字符且不能包含以下任何字符:"\/:*?|><。(注:不能重名) +// containDir 为 true 时,传入路径+文件名,例如:/你好/123/测试文件.mp4 +// etag string 必填 文件md5 +// size number 必填 文件大小,单位为 byte 字节 +// duplicate number 非必填 当有相同文件名时,文件处理策略(1保留两者,新文件名将自动添加后缀,2覆盖原文件) +// containDir bool 非必填 上传文件是否包含路径,默认false +type UploadCreateReq struct { + ParentFileID uint64 `json:"parentFileID"` + FileName string `json:"filename"` + Etag string `json:"etag"` + Size int64 `json:"size"` + Duplicate int `json:"duplicate"` + ContainDir bool `json:"containDir"` +} + +type UploadCreateResp struct { + BaseResp + Data UploadCreateData `json:"data"` +} + +// UploadCreateData 预上传响应 +// fileID number 非必填 文件ID。当123云盘已有该文件,则会发生秒传。此时会将文件ID字段返回。唯一 +// preuploadID string 必填 预上传ID(如果 reuse 为 true 时,该字段不存在) +// reuse boolean 必填 是否秒传,返回true时表示文件已上传成功 +// sliceSize number 必填 分片大小,必须按此大小生成文件分片再上传 +// servers array 必填 上传地址 +type UploadCreateData struct { + FileID int64 `json:"fileID"` + PreuploadID string `json:"preuploadID"` + Reuse bool `json:"reuse"` + SliceSize int64 `json:"sliceSize"` + Servers []string `json:"servers"` +} + +// UploadSliceReq 分片上传请求 +// preuploadID string 必填 预上传ID +// sliceNo number 必填 分片序号,从1开始自增 +// sliceMD5 string 必填 当前分片md5 +// slice file 必填 分片二进制流 +type UploadSliceReq struct { + Name string `json:"name"` + PreuploadID string `json:"preuploadID"` + SliceNo int `json:"sliceNo"` + SliceMD5 string `json:"sliceMD5"` + Slice io.Reader `json:"slice"` + Server string `json:"server"` +} + +type SliceUpCompleteResp struct { + SingleUploadResp +} + +type GetUploadServerResp struct { + BaseResp + Data []string `json:"data"` +} + +// SingleUploadReq 单文件上传请求 +// parentFileID number 必填 父目录id,上传到根目录时填写 0 +// filename string 必填 文件名要小于255个字符且不能包含以下任何字符:"\/:*?|><。(注:不能重名) +// +// containDir 为 true 时,传入路径+文件名,例如:/你好/123/测试文件.mp4 +// +// etag string 必填 文件md5 +// size number 必填 文件大小,单位为 byte 字节 +// file file 必填 文件二进制流 +// duplicate number 非必填 当有相同文件名时,文件处理策略(1保留两者,新文件名将自动添加后缀,2覆盖原文件) +// containDir bool 非必填 上传文件是否包含路径,默认false +type SingleUploadReq struct { + ParentFileID int64 `json:"parentFileID"` + FileName string `json:"filename"` + Etag string `json:"etag"` + Size int64 `json:"size"` + File io.Reader `json:"file"` + Duplicate int `json:"duplicate"` + ContainDir bool `json:"containDir"` +} + +// SingleUploadResp 单文件上传响应 +type SingleUploadResp struct { + BaseResp + Data SingleUploadData `json:"data"` +} + +type SingleUploadData struct { + FileID int64 `json:"fileID"` + Completed bool `json:"completed"` +} diff --git a/drivers/123_open/upload.go b/drivers/123_open/upload.go index abcde2aaf..b3eb09de1 100644 --- a/drivers/123_open/upload.go +++ b/drivers/123_open/upload.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "mime/multipart" @@ -20,6 +21,7 @@ import ( "github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/avast/retry-go" "github.com/go-resty/resty/v2" + log "github.com/sirupsen/logrus" ) // 创建文件 V2 @@ -183,3 +185,63 @@ func (d *Open123) complete(preuploadID string) (*UploadCompleteResp, error) { } return &resp, nil } + +func (d *Open123) uploadSlice(req *UploadSliceReq) error { + _, err := d.Request(InitApiInfo(req.Server+"/upload/v2/file/slice", 0), http.MethodPost, func(rt *resty.Request) { + rt.SetHeader("Content-Type", "multipart/form-data") + rt.SetMultipartFormData(map[string]string{ + "preuploadID": req.PreuploadID, + "sliceMD5": req.SliceMD5, + "sliceNo": strconv.FormatInt(int64(req.SliceNo), 10), + }) + rt.SetMultipartField("slice", req.Name, "multipart/form-data", req.Slice) + }, nil) + return err +} + +func (d *Open123) sliceUpComplete(uploadID string) error { + r := &SliceUpCompleteResp{} + + b, err := d.Request(UploadComplete, http.MethodPost, func(req *resty.Request) { + req.SetBody(base.Json{ + "preuploadID": uploadID, + }) + }, r) + if err != nil { + log.Error("123 open uploadComplete error", err) + return err + } + log.Debugf("upload complete,body: %s", string(b)) + if r.Data.Completed { + return nil + } + + return errors.New("upload uncomplete") + +} + +func (d *Open123) getUploadServer() (string, error) { + r := &GetUploadServerResp{} + body, err := d.Request(UploadFileDomain, "GET", nil, r) + if err != nil { + log.Error("get upload server failed", string(body), r, err) + return "", err + } + if len(r.Data) == 0 { + return "", errors.New("upload server is empty") + } + + return r.Data[0], err +} + +func (d *Open123) uploadCreate(uc *UploadCreateReq) (*UploadCreateData, error) { + r := &UploadCreateResp{} + _, err := d.Request(UploadCreate, http.MethodPost, func(req *resty.Request) { + req.SetBody(uc) + }, r) + if err != nil { + log.Error("123 open uploadCreate error", err) + } + return &r.Data, err + +} diff --git a/drivers/123_open/util.go b/drivers/123_open/util.go index 841e6a161..08f37d78b 100644 --- a/drivers/123_open/util.go +++ b/drivers/123_open/util.go @@ -21,18 +21,19 @@ import ( var ( //不同情况下获取的AccessTokenQPS限制不同 如下模块化易于拓展 Api = "https://open-api.123pan.com" - AccessToken = InitApiInfo(Api+"/api/v1/access_token", 1) - RefreshToken = InitApiInfo(Api+"/api/v1/oauth2/access_token", 1) - UserInfo = InitApiInfo(Api+"/api/v1/user/info", 1) - FileList = InitApiInfo(Api+"/api/v2/file/list", 3) - DownloadInfo = InitApiInfo(Api+"/api/v1/file/download_info", 5) - DirectLink = InitApiInfo(Api+"/api/v1/direct-link/url", 5) - Mkdir = InitApiInfo(Api+"/upload/v1/file/mkdir", 2) - Move = InitApiInfo(Api+"/api/v1/file/move", 1) - Rename = InitApiInfo(Api+"/api/v1/file/name", 1) - Trash = InitApiInfo(Api+"/api/v1/file/trash", 2) - UploadCreate = InitApiInfo(Api+"/upload/v2/file/create", 2) - UploadComplete = InitApiInfo(Api+"/upload/v2/file/upload_complete", 0) + AccessToken = InitApiInfo(Api+"/api/v1/access_token", 1) + RefreshToken = InitApiInfo(Api+"/api/v1/oauth2/access_token", 1) + UserInfo = InitApiInfo(Api+"/api/v1/user/info", 1) + FileList = InitApiInfo(Api+"/api/v2/file/list", 3) + DownloadInfo = InitApiInfo(Api+"/api/v1/file/download_info", 5) + DirectLink = InitApiInfo(Api+"/api/v1/direct-link/url", 5) + Mkdir = InitApiInfo(Api+"/upload/v1/file/mkdir", 2) + Move = InitApiInfo(Api+"/api/v1/file/move", 1) + Rename = InitApiInfo(Api+"/api/v1/file/name", 1) + Trash = InitApiInfo(Api+"/api/v1/file/trash", 2) + UploadCreate = InitApiInfo(Api+"/upload/v2/file/create", 2) + UploadComplete = InitApiInfo(Api+"/upload/v2/file/upload_complete", 0) + UploadFileDomain = InitApiInfo(Api+"/upload/v2/file/domain", 0) ) func (d *Open123) Request(apiInfo *ApiInfo, method string, callback base.ReqCallback, resp interface{}) ([]byte, error) { @@ -78,7 +79,10 @@ func (d *Open123) Request(apiInfo *ApiInfo, method string, callback base.ReqCall } else if baseResp.Code == 429 { time.Sleep(500 * time.Millisecond) log.Warningf("API: %s, QPS: %d, 请求太频繁,对应API提示过多请减小QPS", apiInfo.url, apiInfo.qps) + } else if baseResp.Code == 20103 { //code: 20103, error: 文件正在校验中,请间隔1秒后再试 + time.Sleep(2 * time.Second) } else { + log.Errorf("API: %s, body:%s, code: %d, error: %s", apiInfo.url, res.Body(), baseResp.Code, baseResp.Message) return nil, errors.New(baseResp.Message) } } diff --git a/drivers/baidu_netdisk/driver.go b/drivers/baidu_netdisk/driver.go index 5a6923781..9c39fdcd1 100644 --- a/drivers/baidu_netdisk/driver.go +++ b/drivers/baidu_netdisk/driver.go @@ -4,12 +4,16 @@ import ( "context" "crypto/md5" "encoding/hex" + "encoding/json" "errors" + "fmt" "io" "net/url" "os" stdpath "path" + "path/filepath" "strconv" + "strings" "time" "github.com/OpenListTeam/OpenList/v4/drivers/base" @@ -17,6 +21,8 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/model/reqres" + "github.com/OpenListTeam/OpenList/v4/internal/model/tables" "github.com/OpenListTeam/OpenList/v4/pkg/errgroup" "github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/avast/retry-go" @@ -39,6 +45,14 @@ func (d *BaiduNetdisk) GetAddition() driver.Additional { return &d.Addition } +func (d *BaiduNetdisk) GetUploadInfo() *model.UploadInfo { + return &model.UploadInfo{ + SliceHashNeed: true, + HashMd5Need: true, + HashMd5256KBNeed: true, + } +} + func (d *BaiduNetdisk) Init(ctx context.Context) error { d.uploadThread, _ = strconv.Atoi(d.UploadThread) if d.uploadThread < 1 || d.uploadThread > 32 { @@ -364,6 +378,88 @@ func (d *BaiduNetdisk) uploadSlice(ctx context.Context, params map[string]string return nil } +// SliceUpload 上传分片 +func (d *BaiduNetdisk) SliceUpload(ctx context.Context, req *tables.SliceUpload, sliceno uint, fd io.Reader) error { + fp := filepath.Join(req.DstPath, req.Name) + if sliceno == 0 { //第一个分片需要先执行预上传 + rtype := 1 + if req.Overwrite { + rtype = 3 + } + precreateResp, err := d.precreate(&PrecreateReq{ + Path: fp, + Size: req.Size, + Isdir: 0, + BlockList: strings.Split(req.SliceHash, ","), + Autoinit: 1, + Rtype: rtype, + ContentMd5: req.HashMd5, + SliceMd5: req.HashMd5256KB, + }) + if err != nil { + return fmt.Errorf("precreate failed: %w", err) + } + if precreateResp == nil { + return fmt.Errorf("precreate returned nil response") + } + req.PreupID = precreateResp.Uploadid + } + + if req.PreupID == "" { + return fmt.Errorf("preupload ID is empty for slice %d", sliceno) + } + + err := d.uploadSlice(ctx, map[string]string{ + "method": "upload", + "access_token": d.AccessToken, + "type": "tmpfile", + "path": fp, + "uploadid": req.PreupID, + "partseq": strconv.Itoa(int(sliceno)), + }, req.Name, fd) + + if err != nil { + return fmt.Errorf("upload slice %d failed: %w", sliceno, err) + } + return nil +} + +// Preup 预上传(自定以接口,为了适配自定义的分片上传) +func (d *BaiduNetdisk) Preup(ctx context.Context, srcobj model.Obj, req *reqres.PreupReq) (*model.PreupInfo, error) { + return &model.PreupInfo{ + SliceSize: d.getSliceSize(req.Size), + }, nil +} + +// UploadSliceComplete 分片上传完成 +func (d *BaiduNetdisk) UploadSliceComplete(ctx context.Context, su *tables.SliceUpload) error { + fp := filepath.Join(su.DstPath, su.Name) + rsp := &SliceUpCompleteResp{} + t := time.Now().Unix() + + sliceHashList := strings.Split(su.SliceHash, ",") + if len(sliceHashList) == 0 { + return fmt.Errorf("slice hash list is empty") + } + + sh, err := json.Marshal(sliceHashList) + if err != nil { + return fmt.Errorf("failed to marshal slice hash: %w", err) + } + + b, err := d.create(fp, int64(su.Size), 0, su.PreupID, string(sh), rsp, t, t) + if err != nil { + log.Errorf("create file failed: %v, response: %v, body: %s", err, rsp, string(b)) + return fmt.Errorf("create file failed: %w", err) + } + + if rsp.Errno != 0 { + return fmt.Errorf("baidu response error: errno=%d", rsp.Errno) + } + + return nil +} + func (d *BaiduNetdisk) GetDetails(ctx context.Context) (*model.StorageDetails, error) { du, err := d.quota() if err != nil { diff --git a/drivers/baidu_netdisk/types.go b/drivers/baidu_netdisk/types.go index 0e9ee4432..187198d6f 100644 --- a/drivers/baidu_netdisk/types.go +++ b/drivers/baidu_netdisk/types.go @@ -190,6 +190,52 @@ type PrecreateResp struct { File File `json:"info"` } +// PrecreateReq 预上传请求 +type PrecreateReq struct { + Path string `json:"path"` // 上传后使用的文件绝对路径(需urlencode) + Size int64 `json:"size"` // 文件或目录大小,单位B + Isdir int `json:"isdir"` // 是否为目录,0 文件,1 目录 + BlockList []string `json:"block_list"` // 文件各分片MD5数组的json串 + Autoinit int `json:"autoinit"` // 固定值1 + Rtype int `json:"rtype,omitempty"` // 文件命名策略,非必填 + Uploadid string `json:"uploadid,omitempty"` // 上传ID,非必填 + ContentMd5 string `json:"content-md5,omitempty"` // 文件MD5,非必填 + SliceMd5 string `json:"slice-md5,omitempty"` // 文件校验段的MD5,非必填 + LocalCtime string `json:"local_ctime,omitempty"` // 客户端创建时间,非必填 + LocalMtime string `json:"local_mtime,omitempty"` // 客户端修改时间,非必填 +} + +// SliceupCompleteReq 分片上传完成请求 +type SliceUpCompleteReq struct { + Path string `json:"path"` // 上传后使用的文件绝对路径(需urlencode),与预上传precreate接口中的path保持一致 + Size int64 `json:"size"` // 文件或目录的大小,必须与实际大小一致 + Isdir int `json:"isdir"` // 是否目录,0 文件、1 目录,与预上传precreate接口中的isdir保持一致 + BlockList []string `json:"block_list"` // 文件各分片md5数组的json串,与预上传precreate接口中的block_list保持一致 + Uploadid string `json:"uploadid"` // 预上传precreate接口下发的uploadid + Rtype int `json:"rtype,omitempty"` // 文件命名策略,默认0 + LocalCtime int64 `json:"local_ctime,omitempty"` // 客户端创建时间(精确到秒),默认为当前时间戳 + LocalMtime int64 `json:"local_mtime,omitempty"` // 客户端修改时间(精确到秒),默认为当前时间戳 + ZipQuality int `json:"zip_quality,omitempty"` // 图片压缩程度,有效值50、70、100(带此参数时,zip_sign 参数需要一并带上) + ZipSign string `json:"zip_sign,omitempty"` // 未压缩原始图片文件真实md5(带此参数时,zip_quality 参数需要一并带上) + IsRevision int `json:"is_revision,omitempty"` // 是否需要多版本支持,1为支持,0为不支持,默认为0 + Mode int `json:"mode,omitempty"` // 上传方式,1手动、2批量上传、3文件自动备份、4相册自动备份、5视频自动备份 + ExifInfo string `json:"exif_info,omitempty"` // exif信息,json字符串,orientation、width、height、recovery为必传字段 +} + +// SliceUpCompleteResp 分片上传完成响应 +type SliceUpCompleteResp struct { + Errno int `json:"errno"` // 错误码 + FsID uint64 `json:"fs_id"` // 文件在云端的唯一标识ID + Md5 string `json:"md5,omitempty"` // 文件的MD5,只有提交文件时才返回,提交目录时没有该值 + ServerFilename string `json:"server_filename"` // 文件名 + Category int `json:"category"` // 分类类型, 1 视频 2 音频 3 图片 4 文档 5 应用 6 其他 7 种子 + Path string `json:"path"` // 上传后使用的文件绝对路径 + Size uint64 `json:"size"` // 文件大小,单位B + Ctime uint64 `json:"ctime"` // 文件创建时间 + Mtime uint64 `json:"mtime"` // 文件修改时间 + Isdir int `json:"isdir"` // 是否目录,0 文件、1 目录 +} + type QuotaResp struct { Errno int `json:"errno"` RequestId int64 `json:"request_id"` diff --git a/drivers/baidu_netdisk/util.go b/drivers/baidu_netdisk/util.go index 3f031cc4f..76f26fa20 100644 --- a/drivers/baidu_netdisk/util.go +++ b/drivers/baidu_netdisk/util.go @@ -2,6 +2,7 @@ package baidu_netdisk import ( "encoding/hex" + "encoding/json" "errors" "fmt" "net/http" @@ -306,6 +307,37 @@ func (d *BaiduNetdisk) create(path string, size int64, isdir int, uploadid, bloc return d.postForm("/xpan/file", params, form, resp) } +func (d *BaiduNetdisk) precreate(req *PrecreateReq) (*PrecreateResp, error) { + bl, err := json.Marshal(req.BlockList) + if err != nil { + log.Errorf("json.Marshal error: %v", err) + return nil, err + } + b := map[string]string{ + "path": req.Path, + "size": strconv.Itoa(int(req.Size)), + "isdir": strconv.Itoa(req.Isdir), + "autoinit": strconv.Itoa(req.Autoinit), + "rtype": strconv.Itoa(req.Rtype), + "block_list": string(bl), + "content-md5": req.ContentMd5, + "slice-md5": req.SliceMd5, + } + + res := &PrecreateResp{} + r, err := d.request("https://pan.baidu.com/rest/2.0/xpan/file", http.MethodPost, func(rt *resty.Request) { + rt.SetQueryParam("method", "precreate"). + SetFormData(b) + + }, res) + if err != nil { + log.Errorf("baidu_netdisk precreate error: %s, %v", string(r), err) + return nil, err + } + return res, nil + +} + func joinTime(form map[string]string, ctime, mtime int64) { form["local_mtime"] = strconv.FormatInt(mtime, 10) form["local_ctime"] = strconv.FormatInt(ctime, 10) diff --git a/internal/bootstrap/config.go b/internal/bootstrap/config.go index 2209c64f3..4ed8e9145 100644 --- a/internal/bootstrap/config.go +++ b/internal/bootstrap/config.go @@ -125,9 +125,14 @@ func InitConfig() { if err != nil { log.Fatalf("create temp dir error: %+v", err) } + err = os.MkdirAll(conf.GetPersistentTempDir(), 0o777) + if err != nil { + log.Fatalf("create persistent temp dir error: %+v", err) + } log.Debugf("config: %+v", conf.Conf) base.InitClient() initURL() + CleanTempDir() } func confFromEnv() { @@ -160,6 +165,9 @@ func CleanTempDir() { log.Errorln("failed list temp file: ", err) } for _, file := range files { + if file.Name() == "persistent" { + continue + } if err := os.RemoveAll(filepath.Join(conf.Conf.TempDir, file.Name())); err != nil { log.Errorln("failed delete temp file: ", err) } diff --git a/internal/bootstrap/task.go b/internal/bootstrap/task.go index 47e0b59eb..5294db76a 100644 --- a/internal/bootstrap/task.go +++ b/internal/bootstrap/task.go @@ -38,9 +38,6 @@ func InitTaskManager() { op.RegisterSettingChangingCallback(func() { tool.TransferTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskOfflineDownloadTransferThreadsNum, conf.Conf.Tasks.Transfer.Workers))) }) - if len(tool.TransferTaskManager.GetAll()) == 0 { //prevent offline downloaded files from being deleted - CleanTempDir() - } fs.ArchiveDownloadTaskManager = tache.NewManager[*fs.ArchiveDownloadTask](tache.WithWorks(setting.GetInt(conf.TaskDecompressDownloadThreadsNum, conf.Conf.Tasks.Decompress.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("decompress", conf.Conf.Tasks.Decompress.TaskPersistant), db.UpdateTaskDataFunc("decompress", conf.Conf.Tasks.Decompress.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Decompress.MaxRetry)) op.RegisterSettingChangingCallback(func() { fs.ArchiveDownloadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskDecompressDownloadThreadsNum, conf.Conf.Tasks.Decompress.Workers))) diff --git a/internal/conf/config.go b/internal/conf/config.go index af198e916..08dd3603a 100644 --- a/internal/conf/config.go +++ b/internal/conf/config.go @@ -245,3 +245,7 @@ func DefaultConfig(dataDir string) *Config { LastLaunchedVersion: "", } } + +func GetPersistentTempDir() string { + return filepath.Join(Conf.TempDir, "persistent") +} diff --git a/internal/conf/const.go b/internal/conf/const.go index f46b0d809..c506c9ca4 100644 --- a/internal/conf/const.go +++ b/internal/conf/const.go @@ -174,4 +174,5 @@ const ( UserAgentKey PathKey SharingIDKey + StorageKey ) diff --git a/internal/db/db.go b/internal/db/db.go index 96529c15d..172ea6be1 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -5,6 +5,7 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/model/tables" "gorm.io/gorm" ) @@ -12,10 +13,14 @@ var db *gorm.DB func Init(d *gorm.DB) { db = d - err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem), new(model.SSHPublicKey), new(model.SharingDB)) + err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem), new(model.SSHPublicKey), new(model.SharingDB), new(tables.SliceUpload)) if err != nil { log.Fatalf("failed migrate database: %s", err.Error()) } + + if err := CleanupOrphanedSliceUploads(); err != nil { + log.Errorf("Failed to cleanup orphaned slice uploads: %v", err) + } } func AutoMigrate(dst ...interface{}) error { diff --git a/internal/db/slice_upload.go b/internal/db/slice_upload.go new file mode 100644 index 000000000..aadb11f26 --- /dev/null +++ b/internal/db/slice_upload.go @@ -0,0 +1,194 @@ +package db + +import ( + "io/fs" + "os" + "path/filepath" + "strings" + "time" + + "github.com/OpenListTeam/OpenList/v4/internal/conf" + "github.com/OpenListTeam/OpenList/v4/internal/model/tables" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "gorm.io/gorm" +) + +func CreateSliceUpload(su *tables.SliceUpload) error { + return errors.WithStack(db.Create(su).Error) +} + +func GetSliceUpload(wh map[string]any) (*tables.SliceUpload, error) { + su := &tables.SliceUpload{} + return su, db.Where(wh).First(su).Error +} + +// GetSliceUploadByTaskID 通过TaskID获取分片上传记录 +func GetSliceUploadByTaskID(taskID string) (*tables.SliceUpload, error) { + su := &tables.SliceUpload{} + return su, db.Where("task_id = ?", taskID).First(su).Error +} + +func UpdateSliceUpload(su *tables.SliceUpload) error { + return errors.WithStack(db.Save(su).Error) +} + +// DeleteSliceUpload 删除分片上传记录 +func DeleteSliceUpload(id uint) error { + return errors.WithStack(db.Delete(&tables.SliceUpload{}, id).Error) +} + +// DeleteSliceUploadByTaskID 通过TaskID删除分片上传记录 +func DeleteSliceUploadByTaskID(taskID string) error { + return errors.WithStack(db.Where("task_id = ?", taskID).Delete(&tables.SliceUpload{}).Error) +} + +// GetIncompleteSliceUploads 获取所有未完成的分片上传任务(用于重启恢复) +func GetIncompleteSliceUploads() ([]*tables.SliceUpload, error) { + var uploads []*tables.SliceUpload + err := db.Where("status IN (?)", []int{ + tables.SliceUploadStatusWaiting, + tables.SliceUploadStatusUploading, + tables.SliceUploadStatusProxyComplete, + tables.SliceUploadStatusPendingComplete, + }).Find(&uploads).Error + + if err != nil { + return nil, errors.WithStack(err) + } + + return uploads, nil +} + +// UpdateSliceUploadWithTx 使用事务更新分片上传状态,确保数据一致性 +func UpdateSliceUploadWithTx(su *tables.SliceUpload) error { + return errors.WithStack(db.Transaction(func(tx *gorm.DB) error { + return tx.Save(su).Error + })) +} + +// UpdateSliceStatusAtomic 原子性地更新分片状态 +func UpdateSliceStatusAtomic(taskID string, sliceNum int, status []byte) error { + return errors.WithStack(db.Transaction(func(tx *gorm.DB) error { + var su tables.SliceUpload + if err := tx.Where("task_id = ?", taskID).First(&su).Error; err != nil { + return err + } + + tables.SetSliceUploaded(su.SliceUploadStatus, sliceNum) + + return tx.Save(&su).Error + })) +} + +// CleanupOrphanedSliceUploads 清理孤儿分片上传记录(启动时调用) +func CleanupOrphanedSliceUploads() error { + cutoff := time.Now().Add(-24 * time.Hour) + + var orphanedTasks []tables.SliceUpload + if err := db.Where("status IN (?, ?) AND updated_at < ?", + tables.SliceUploadStatusWaiting, + tables.SliceUploadStatusUploading, + cutoff).Find(&orphanedTasks).Error; err != nil { + return errors.WithStack(err) + } + + cleanedCount := 0 + for _, task := range orphanedTasks { + if task.TmpFile != "" { + if err := os.Remove(task.TmpFile); err != nil && !os.IsNotExist(err) { + log.Warnf("Failed to remove orphaned tmp file %s: %v", task.TmpFile, err) + } else if err == nil { + log.Debugf("Removed orphaned tmp file: %s", task.TmpFile) + } + } + + if err := db.Delete(&task).Error; err != nil { + log.Errorf("Failed to delete orphaned slice upload task %s: %v", task.TaskID, err) + } else { + cleanedCount++ + } + } + + if cleanedCount > 0 { + log.Debugf("Cleaned up %d orphaned slice upload tasks", cleanedCount) + } + + return cleanupOrphanedTempFiles() +} + +// cleanupOrphanedTempFiles 清理临时目录中的孤儿文件 +func cleanupOrphanedTempFiles() error { + tempDir := conf.GetPersistentTempDir() + + if _, err := os.Stat(tempDir); os.IsNotExist(err) { + log.Debugf("Temp directory does not exist: %s", tempDir) + return nil + } + + var activeTasks []tables.SliceUpload + if err := db.Where("tmp_file IS NOT NULL AND tmp_file != '' AND status IN (?, ?)", + tables.SliceUploadStatusWaiting, + tables.SliceUploadStatusUploading).Find(&activeTasks).Error; err != nil { + return errors.WithStack(err) + } + + activeFiles := make(map[string]bool) + for _, task := range activeTasks { + if task.TmpFile != "" { + activeFiles[task.TmpFile] = true + } + } + + cleanedCount := 0 + cutoff := time.Now().Add(-24 * time.Hour) + + err := filepath.WalkDir(tempDir, func(path string, d fs.DirEntry, err error) error { + if err != nil { + log.Warnf("Failed to access path %s: %v", path, err) + return nil // 继续处理其他文件 + } + + if d.IsDir() { + return nil + } + + if !strings.HasPrefix(d.Name(), "slice_upload_") { + return nil + } + + if activeFiles[path] { + return nil // 文件仍在使用中,跳过 + } + + info, err := d.Info() + if err != nil { + log.Warnf("Failed to get file info for %s: %v", path, err) + return nil + } + + if info.ModTime().After(cutoff) { + return nil + } + + if err := os.Remove(path); err != nil { + log.Warnf("Failed to remove orphaned temp file %s: %v", path, err) + } else { + log.Debugf("Removed orphaned temp file: %s", path) + cleanedCount++ + } + + return nil + }) + + if err != nil { + log.Errorf("Failed to walk temp directory %s: %v", tempDir, err) + return errors.WithStack(err) + } + + if cleanedCount > 0 { + log.Debugf("Cleaned up %d orphaned temp files from %s", cleanedCount, tempDir) + } + + return nil +} diff --git a/internal/driver/driver.go b/internal/driver/driver.go index 01d89a6e9..e9ce8e108 100644 --- a/internal/driver/driver.go +++ b/internal/driver/driver.go @@ -2,8 +2,11 @@ package driver import ( "context" + "io" "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/model/reqres" + "github.com/OpenListTeam/OpenList/v4/internal/model/tables" ) type Driver interface { @@ -81,6 +84,22 @@ type Remove interface { Remove(ctx context.Context, obj model.Obj) error } +type IUploadInfo interface { + GetUploadInfo() *model.UploadInfo +} + +type IPreup interface { + Preup(ctx context.Context, srcobj model.Obj, req *reqres.PreupReq) (*model.PreupInfo, error) +} + +type ISliceUpload interface { + SliceUpload(ctx context.Context, req *tables.SliceUpload, sliceno uint, file io.Reader) error +} + +type IUploadSliceComplete interface { + UploadSliceComplete(ctx context.Context, req *tables.SliceUpload) error +} + type Put interface { // Put a file (provided as a FileStreamer) into the driver // Besides the most basic upload functionality, the following features also need to be implemented: diff --git a/internal/fs/fs.go b/internal/fs/fs.go index ca199ed40..77cca2cbf 100644 --- a/internal/fs/fs.go +++ b/internal/fs/fs.go @@ -9,6 +9,7 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/model/reqres" "github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/task" "github.com/pkg/errors" @@ -190,3 +191,15 @@ func PutURL(ctx context.Context, path, dstName, urlStr string) error { } return op.PutURL(ctx, storage, dstDirActualPath, dstName, urlStr) } + +func Preup(c context.Context, s driver.Driver, actualPath string, req *reqres.PreupReq) (*reqres.PreupResp, error) { + return globalSliceManager.CreateSession(c, s, actualPath, req) +} + +func UploadSlice(ctx context.Context, storage driver.Driver, req *reqres.UploadSliceReq, reader io.Reader) error { + return globalSliceManager.UploadSlice(ctx, storage, req, reader) +} + +func SliceUpComplete(ctx context.Context, storage driver.Driver, taskID string) (*reqres.UploadSliceCompleteResp, error) { + return globalSliceManager.CompleteUpload(ctx, storage, taskID) +} diff --git a/internal/fs/sliceup.go b/internal/fs/sliceup.go new file mode 100644 index 000000000..bf45b3b73 --- /dev/null +++ b/internal/fs/sliceup.go @@ -0,0 +1,546 @@ +package fs + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/OpenListTeam/OpenList/v4/internal/conf" + "github.com/OpenListTeam/OpenList/v4/internal/db" + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/model/reqres" + "github.com/OpenListTeam/OpenList/v4/internal/model/tables" + "github.com/OpenListTeam/OpenList/v4/internal/op" + "github.com/OpenListTeam/OpenList/v4/internal/stream" + "github.com/OpenListTeam/OpenList/v4/pkg/singleflight" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/google/uuid" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +// SliceUploadManager 分片上传管理器 +type SliceUploadManager struct { + sessionG singleflight.Group[*SliceUploadSession] + cache sync.Map // TaskID -> *SliceUploadSession +} + +// SliceUploadSession 分片上传会话 +type SliceUploadSession struct { + *tables.SliceUpload + tmpFile *os.File + mutex sync.Mutex +} + +// CreateSession 创建新的上传会话 +func (m *SliceUploadManager) CreateSession(ctx context.Context, storage driver.Driver, actualPath string, req *reqres.PreupReq) (*reqres.PreupResp, error) { + srcobj, err := op.Get(ctx, storage, actualPath) + if err != nil { + log.Error(err) + return nil, errors.WithStack(err) + } + user := ctx.Value(conf.UserKey).(*model.User) + + taskID := uuid.New().String() + + createsu := &tables.SliceUpload{ + TaskID: taskID, + DstPath: req.Path, + DstID: srcobj.GetID(), + Size: req.Size, + Name: req.Name, + HashMd5: req.Hash.Md5, + HashMd5256KB: req.Hash.Md5256KB, + HashSha1: req.Hash.Sha1, + Overwrite: req.Overwrite, + ActualPath: actualPath, + AsTask: req.AsTask, + UserID: user.ID, + } + log.Debugf("storage mount path %s", storage.GetStorage().MountPath) + + switch st := storage.(type) { + case driver.IPreup: + log.Info("preup support") + res, err := st.Preup(ctx, srcobj, req) + if err != nil { + log.Error("Preup error", req, err) + return nil, errors.WithStack(err) + } + log.Info("Preup success", res) + if res.Reuse { //秒传 + return &reqres.PreupResp{ + Reuse: true, + SliceCnt: 0, + SliceSize: res.SliceSize, + TaskID: taskID, + }, nil + } + createsu.PreupID = res.PreupID + createsu.SliceSize = res.SliceSize + createsu.Server = res.Server + default: + log.Info("Preup not support") + createsu.SliceSize = 10 * utils.MB + } + + createsu.SliceCnt = uint((req.Size + createsu.SliceSize - 1) / createsu.SliceSize) + createsu.SliceUploadStatus = make([]byte, (createsu.SliceCnt+7)/8) + createsu.Status = tables.SliceUploadStatusWaiting // 设置初始状态 + + err = db.CreateSliceUpload(createsu) + if err != nil { + log.Error("CreateSliceUpload error", createsu, err) + return nil, errors.WithStack(err) + } + + session := &SliceUploadSession{SliceUpload: createsu} + m.cache.Store(taskID, session) + + return &reqres.PreupResp{ + Reuse: false, + SliceUploadStatus: createsu.SliceUploadStatus, + SliceSize: createsu.SliceSize, + SliceCnt: createsu.SliceCnt, + TaskID: createsu.TaskID, + }, nil +} + +func (m *SliceUploadManager) getOrLoadSession(taskID string) (*SliceUploadSession, error) { + session, err, _ := m.sessionG.Do(taskID, func() (*SliceUploadSession, error) { + if s, ok := m.cache.Load(taskID); ok { + return s.(*SliceUploadSession), nil + } + su, err := db.GetSliceUploadByTaskID(taskID) + if err != nil { + return nil, errors.WithMessagef(err, "failed get slice upload [%s]", taskID) + } + s := &SliceUploadSession{ + SliceUpload: su, + } + m.cache.Store(taskID, s) + return s, nil + }) + return session, err +} + +// UploadSlice 流式上传分片 +func (m *SliceUploadManager) UploadSlice(ctx context.Context, storage driver.Driver, req *reqres.UploadSliceReq, reader io.Reader) error { + session, err := m.getOrLoadSession(req.TaskID) + if err != nil { + log.Errorf("failed to get session: %+v", err) + return err + } + + defer func() { + if err != nil { + session.mutex.Lock() + session.Status = tables.SliceUploadStatusFailed + session.Message = err.Error() + updateData := *session.SliceUpload + session.mutex.Unlock() + + if updateErr := db.UpdateSliceUpload(&updateData); updateErr != nil { + log.Errorf("Failed to update slice upload status: %v", updateErr) + } + } + }() + + // 使用锁保护状态检查 + session.mutex.Lock() + if tables.IsSliceUploaded(session.SliceUploadStatus, int(req.SliceNum)) { + session.mutex.Unlock() + log.Warnf("slice already uploaded,req:%+v", req) + return nil + } + session.mutex.Unlock() + + // 分片hash验证逻辑 + if req.SliceHash != "" { + session.mutex.Lock() + + if req.SliceNum == 0 { + hs := strings.Split(req.SliceHash, ",") + if len(hs) != int(session.SliceCnt) { + session.mutex.Unlock() + err := fmt.Errorf("slice hash count mismatch, expected %d, got %d", session.SliceCnt, len(hs)) + log.Error("slice hash count mismatch", req, err) + return err + } + session.SliceHash = req.SliceHash + } else { + log.Debugf("Slice %d hash: %s (keeping complete hash list)", req.SliceNum, req.SliceHash) + } + session.mutex.Unlock() + } + + switch s := storage.(type) { + case driver.ISliceUpload: + if err := s.SliceUpload(ctx, session.SliceUpload, req.SliceNum, reader); err != nil { + log.Errorf("Native slice upload failed - TaskID: %s, SliceNum: %d, Error: %v", + req.TaskID, req.SliceNum, err) + return errors.WithMessagef(err, "slice %d upload failed", req.SliceNum) + } + log.Debugf("Native slice upload success - TaskID: %s, SliceNum: %d", + req.TaskID, req.SliceNum) + + default: + if err := session.ensureTmpFile(); err != nil { + log.Error("ensureTmpFile error", req, err) + return err + } + + sw := &sliceWriter{ + file: session.tmpFile, + offset: int64(req.SliceNum) * int64(session.SliceSize), + } + _, err := utils.CopyWithBuffer(sw, reader) + if err != nil { + log.Error("Copy error", req, err) + return err + } + } + + session.mutex.Lock() + tables.SetSliceUploaded(session.SliceUploadStatus, int(req.SliceNum)) + updateData := *session.SliceUpload + session.mutex.Unlock() + + err = db.UpdateSliceUpload(&updateData) + if err != nil { + log.Error("UpdateSliceUpload error", updateData, err) + return err + } + return nil +} + +// CompleteUpload 完成上传 +func (m *SliceUploadManager) CompleteUpload(ctx context.Context, storage driver.Driver, taskID string) (*reqres.UploadSliceCompleteResp, error) { + var err error + + session, err := m.getOrLoadSession(taskID) + if err != nil { + log.Errorf("failed to get session: %+v", err) + return nil, err + } + + // 检查是否所有分片都已上传 + session.mutex.Lock() + allUploaded := tables.IsAllSliceUploaded(session.SliceUploadStatus, session.SliceCnt) + session.mutex.Unlock() + + if !allUploaded { + return &reqres.UploadSliceCompleteResp{ + Complete: 0, + SliceUploadStatus: session.SliceUploadStatus, + TaskID: session.TaskID, + }, nil + } + + defer func() { + session.cleanup() + m.cache.Delete(session.TaskID) + + if err != nil { + session.mutex.Lock() + session.Status = tables.SliceUploadStatusFailed + session.Message = err.Error() + updateData := *session.SliceUpload + session.mutex.Unlock() + + if updateErr := db.UpdateSliceUpload(&updateData); updateErr != nil { + log.Errorf("Failed to update slice upload status: %v", updateErr) + } + } else { + if deleteErr := db.DeleteSliceUploadByTaskID(session.TaskID); deleteErr != nil { + log.Errorf("Failed to delete slice upload record: %v", deleteErr) + } + } + }() + + switch s := storage.(type) { + case driver.IUploadSliceComplete: + err = s.UploadSliceComplete(ctx, session.SliceUpload) + if err != nil { + log.Error("UploadSliceComplete error", session.SliceUpload, err) + return nil, err + } + + return &reqres.UploadSliceCompleteResp{ + Complete: 1, + TaskID: session.TaskID, + }, nil + + default: + session.mutex.Lock() + tmpFile := session.tmpFile + session.mutex.Unlock() + + if tmpFile == nil { + err := fmt.Errorf("tmp file not found [%s]", taskID) + log.Error(err) + return nil, err + } + + var hashInfo utils.HashInfo + if session.HashMd5 != "" { + hashInfo = utils.NewHashInfo(utils.MD5, session.HashMd5) + } else if session.HashSha1 != "" { + hashInfo = utils.NewHashInfo(utils.SHA1, session.HashSha1) + } + + file := &stream.FileStream{ + Obj: &model.Object{ + Name: session.Name, + Size: session.Size, + Modified: time.Now(), + HashInfo: hashInfo, + }, + } + file.Mimetype = utils.GetMimeType(session.Name) + + if session.AsTask { + file.SetTmpFile(tmpFile) + session.mutex.Lock() + session.tmpFile = nil + session.TmpFile = "" + session.mutex.Unlock() + + _, err = putAsTask(ctx, session.DstPath, file) + if err != nil { + log.Error("putAsTask error", session.SliceUpload, err) + return nil, err + } + return &reqres.UploadSliceCompleteResp{ + Complete: 2, + TaskID: session.TaskID, + }, nil + } + + file.Reader = tmpFile + err = op.Put(ctx, storage, session.ActualPath, file, nil) + if err != nil { + log.Error("Put error", session.SliceUpload, err) + return nil, err + } + return &reqres.UploadSliceCompleteResp{ + Complete: 1, + TaskID: session.TaskID, + }, nil + } +} + +func (s *SliceUploadSession) ensureTmpFile() error { + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.TmpFile == "" { + filename := fmt.Sprintf("slice_upload_%s_%s", s.TaskID, s.Name) + filename = strings.ReplaceAll(filename, "/", "_") + filename = strings.ReplaceAll(filename, "\\", "_") + filename = strings.ReplaceAll(filename, ":", "_") + + tmpPath := filepath.Join(conf.GetPersistentTempDir(), filename) + + tf, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("create persistent temp file error: %w", err) + } + + if err = os.Truncate(tmpPath, int64(s.Size)); err != nil { + tf.Close() // 确保文件被关闭 + os.Remove(tmpPath) // 清理文件 + return fmt.Errorf("truncate persistent temp file error: %w", err) + } + + s.TmpFile = tmpPath + s.tmpFile = tf + + // 更新数据库中的临时文件路径 + if updateErr := db.UpdateSliceUpload(s.SliceUpload); updateErr != nil { + log.Errorf("Failed to update temp file path in database: %v", updateErr) + } + + log.Debugf("Created persistent temp file: %s", tmpPath) + return nil + } + + if s.tmpFile == nil { + var err error + s.tmpFile, err = os.OpenFile(s.TmpFile, os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("reopen persistent temp file error: %w", err) + } + log.Debugf("Reopened persistent temp file: %s", s.TmpFile) + } + return nil +} + +func (s *SliceUploadSession) cleanup() { + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.tmpFile != nil { + if closeErr := s.tmpFile.Close(); closeErr != nil { + log.Errorf("Failed to close tmp file: %v", closeErr) + } + s.tmpFile = nil + } + + if s.TmpFile != "" { + if removeErr := os.Remove(s.TmpFile); removeErr != nil && !os.IsNotExist(removeErr) { + log.Errorf("Failed to remove tmp file %s: %v", s.TmpFile, removeErr) + } + s.TmpFile = "" + } +} + +// 全局管理器实例使用延迟初始化 +var globalSliceManager *SliceUploadManager + +func InitSliceUploadManager() { + log.Info("Initializing slice upload manager...") + globalSliceManager = &SliceUploadManager{} + go globalSliceManager.cleanupIncompleteUploads() +} + +// sliceWriter 分片写入器 - 保持原始实现 +type sliceWriter struct { + file *os.File + offset int64 +} + +// Write implements io.Writer interface +// 虽然每个分片都定义了一个sliceWriter +// 但是Write方法会在同一个分片复制过程中多次调用, +// 所以要更新自身的offset +func (sw *sliceWriter) Write(p []byte) (int, error) { + n, err := sw.file.WriteAt(p, sw.offset) + sw.offset += int64(n) + return n, err +} + +// cleanupIncompleteUploads 清理重启后未完成的上传任务和临时文件 +func (m *SliceUploadManager) cleanupIncompleteUploads() { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic in cleanupIncompleteUploads: %v", r) + } + }() + + time.Sleep(10 * time.Second) + + log.Info("Starting cleanup of incomplete slice uploads after restart...") + + incompleteUploads, err := db.GetIncompleteSliceUploads() + if err != nil { + log.Errorf("Failed to get incomplete slice uploads: %v", err) + } else { + if len(incompleteUploads) == 0 { + log.Info("No incomplete slice uploads found in database") + } else { + log.Infof("Found %d incomplete slice uploads in database, starting cleanup...", len(incompleteUploads)) + cleanedCount := 0 + for _, upload := range incompleteUploads { + if m.cleanupSingleUpload(upload) { + cleanedCount++ + } + } + log.Infof("Database cleanup completed, cleaned up %d tasks", cleanedCount) + } + } + + m.cleanupOrphanedTempFiles() + + log.Info("Slice upload cleanup completed") +} + +func (m *SliceUploadManager) cleanupSingleUpload(upload *tables.SliceUpload) bool { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic in cleanupSingleUpload for task %s: %v", upload.TaskID, r) + } + }() + + log.Infof("Cleaning up upload task: %s, status: %s", upload.TaskID, upload.Status) + + if upload.TmpFile != "" { + if err := os.Remove(upload.TmpFile); err != nil && !os.IsNotExist(err) { + log.Warnf("Failed to remove temp file %s for task %s: %v", upload.TmpFile, upload.TaskID, err) + } else { + log.Debugf("Removed temp file: %s", upload.TmpFile) + } + } + + if err := db.DeleteSliceUploadByTaskID(upload.TaskID); err != nil { + log.Errorf("Failed to delete slice upload task %s: %v", upload.TaskID, err) + return false + } + + log.Infof("Successfully cleaned up task: %s", upload.TaskID) + return true +} + +func (m *SliceUploadManager) cleanupOrphanedTempFiles() { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic in cleanupOrphanedTempFiles: %v", r) + } + }() + + tempDir := conf.GetPersistentTempDir() + if tempDir == "" { + log.Warn("Persistent temp directory not configured, skipping orphaned file cleanup") + return + } + + log.Infof("Cleaning up orphaned temp files in: %s", tempDir) + + entries, err := os.ReadDir(tempDir) + if err != nil { + log.Errorf("Failed to read temp directory %s: %v", tempDir, err) + return + } + + orphanedCount := 0 + for _, entry := range entries { + if entry.IsDir() { + continue + } + + fileName := entry.Name() + if !strings.HasPrefix(fileName, "slice_upload_") { + continue + } + + filePath := filepath.Join(tempDir, fileName) + fileInfo, err := entry.Info() + if err != nil { + log.Warnf("Failed to get file info for %s: %v", filePath, err) + continue + } + + if time.Since(fileInfo.ModTime()) < 24*time.Hour { + continue + } + + if err := os.Remove(filePath); err != nil { + log.Warnf("Failed to remove orphaned temp file %s: %v", filePath, err) + } else { + log.Debugf("Removed orphaned temp file: %s", filePath) + orphanedCount++ + } + } + + if orphanedCount > 0 { + log.Infof("Cleaned up %d orphaned temp files", orphanedCount) + } else { + log.Info("No orphaned temp files found") + } +} diff --git a/internal/model/obj.go b/internal/model/obj.go index b3bf5ebe5..a30b531a2 100644 --- a/internal/model/obj.go +++ b/internal/model/obj.go @@ -243,3 +243,9 @@ func (om *ObjMerge) InitHideReg(hides string) { func (om *ObjMerge) Reset() { om.set.Clear() } + +type Hash struct { + Md5 string `json:"md5"` + Md5256KB string `json:"md5_256kb"` + Sha1 string `json:"sha1"` +} diff --git a/internal/model/reqres/upload.go b/internal/model/reqres/upload.go new file mode 100644 index 000000000..4e3684b56 --- /dev/null +++ b/internal/model/reqres/upload.go @@ -0,0 +1,41 @@ +package reqres + +import "github.com/OpenListTeam/OpenList/v4/internal/model" + +// PreupReq 预上传请求 +type PreupReq struct { + Path string `json:"path"` // 上传到的挂载路径 + Name string `json:"name"` + Size int64 `json:"size"` + Hash model.Hash `json:"hash"` + Overwrite bool `json:"overwrite"` // 是否覆盖同名文件 + AsTask bool `json:"as_task"` +} + +// PreupResp 预上传响应 +type PreupResp struct { + TaskID string `json:"task_id"` // 任务ID,使用UUID + SliceSize int64 `json:"slice_size"` //分片大小,单位:字节 + SliceCnt uint `json:"slice_cnt"` // 分片数量 + SliceUploadStatus []byte `json:"slice_upload_status"` // 分片上传状态 + Reuse bool `json:"reuse"` //是否秒传 +} + +// UploadSliceReq 上传分片请求 +type UploadSliceReq struct { + TaskID string `json:"task_id"` // 任务ID,使用UUID + SliceHash string `json:"slice_hash"` // 分片hash,如果是第一个分片,则需包含所有分片hash,用","分割 + SliceNum uint `json:"slice_num"` // 分片序号 +} + +// UploadSliceCompleteReq 分片上传完成请求 +type UploadSliceCompleteReq struct { + TaskID string `json:"task_id"` // 任务ID,使用UUID +} + +// UploadSliceCompleteResp 分片上传完成响应 +type UploadSliceCompleteResp struct { + TaskID string `json:"task_id"` // 任务ID,使用UUID + SliceUploadStatus []byte `json:"slice_upload_status"` // 分片上传状态 + Complete uint `json:"complete"` //完成状态 0 未完成,分片缺失 1 完成 2 成功上传到代理服务 +} diff --git a/internal/model/tables/slice_upload.go b/internal/model/tables/slice_upload.go new file mode 100644 index 000000000..b22149af1 --- /dev/null +++ b/internal/model/tables/slice_upload.go @@ -0,0 +1,78 @@ +package tables + +import "time" + +const ( + //SliceUploadStatusWaiting 等待上传 + SliceUploadStatusWaiting = iota + // SliceUploadStatusUploading 正在上传 + SliceUploadStatusUploading + // SliceUploadStatusCancelled 取消上传 + SliceUploadStatusCancelled + // SliceUploadStatusComplete 上传完成 + SliceUploadStatusComplete + // SliceUploadStatusFailed 上传失败 + SliceUploadStatusFailed + // SliceUploadStatusProxyComplete 成功上传到代理服务,等待上传到网盘 + SliceUploadStatusProxyComplete + // SliceUploadStatusPendingComplete 等待完成(所有切片已上传,等待最终完成处理) + SliceUploadStatusPendingComplete +) + +// SliceUpload 分片上传数据表 +type SliceUpload struct { + TaskID string `json:"task_id" gorm:"primaryKey;type:varchar(36)"` // 任务ID,使用UUID + CreatedAt time.Time `json:"created_at" gorm:"autoCreateTime"` + UpdatedAt time.Time `json:"updated_at" gorm:"autoUpdateTime"` + PreupID string `json:"preup_id"` // 网盘返回的预上传id + SliceSize int64 `json:"slice_size"` // 分片大小,单位:字节 + DstID string `json:"dst_id"` // 目标文件夹ID,部分网盘需要 + DstPath string `json:"dst_path"` // 挂载的父文件夹路径 + ActualPath string `json:"actual_path"` //网盘真实父文件夹路径,不同的网盘,这个值可能相同,比如有相同的目录的两个网盘 + Name string `json:"name"` // 文件名 + Size int64 `json:"size"` // 文件大小 + TmpFile string `json:"tmp_file"` //不支持分片上传的文件临时文件路径 + HashMd5 string `json:"hash_md5"` // md5 + HashMd5256KB string `json:"hash_md5_256kb" gorm:"column:hash_md5_256kb;type:varchar(32)"` // md5256KB + HashSha1 string `json:"hash_sha1"` // sha1 + SliceHash string `json:"slice_hash"` // 分片hash + SliceCnt uint `json:"slice_cnt"` // 分片数量 + SliceUploadStatus []byte `json:"slice_upload_status"` //分片上传状态,对应位置1表示分片已上传 + Server string `json:"server"` // 上传服务器 + Status int `json:"status"` //上传状态 + Message string `json:"message"` // 失败错误信息 + Overwrite bool `json:"overwrite"` // 是否覆盖同名文件 + UserID uint `json:"user_id"` //用户id + AsTask bool `json:"as_task"` +} + +// IsSliceUploaded 判断第i个分片是否已上传 +func IsSliceUploaded(status []byte, i int) bool { + return status[i/8]&(1<<(i%8)) != 0 +} + +// SetSliceUploaded 标记第i个分片已上传 +func SetSliceUploaded(status []byte, i int) { + status[i/8] |= 1 << (i % 8) +} + +// IsAllSliceUploaded 是否全部上传完成 +func IsAllSliceUploaded(status []byte, sliceCnt uint) bool { + for i := range sliceCnt { + if status[i/8]&(1<<(i%8)) == 0 { + return false + } + } + return true +} + +// CountUploadedSlices 统计已上传的分片数量 +func CountUploadedSlices(status []byte) uint { + count := uint(0) + for i := 0; i < len(status)*8; i++ { + if status[i/8]&(1<<(i%8)) != 0 { + count++ + } + } + return count +} diff --git a/internal/model/upload.go b/internal/model/upload.go new file mode 100644 index 000000000..5911ff74d --- /dev/null +++ b/internal/model/upload.go @@ -0,0 +1,17 @@ +package model + +// UploadInfo 上传所需信息 +type UploadInfo struct { + SliceHashNeed bool `json:"slice_hash_need"` //是否需要分片哈希 + HashMd5Need bool `json:"hash_md5_need"` //是否需要md5 + HashMd5256KBNeed bool `json:"hash_md5_256kb_need"` //是否需要前256KB的md5 + HashSha1Need bool `json:"hash_sha1_need"` //是否需要sha1 +} + +// PreupInfo 预上传信息 +type PreupInfo struct { + PreupID string `json:"preup_id"` //预上传id,由网盘返回 + SliceSize int64 `json:"slice_size"` //分片大小 + Server string `json:"server"` //上传服务器地址 + Reuse bool `json:"reuse"` //是否秒传 +} diff --git a/internal/offline_download/tool/add.go b/internal/offline_download/tool/add.go index aea88e2a4..50b95bac3 100644 --- a/internal/offline_download/tool/add.go +++ b/internal/offline_download/tool/add.go @@ -2,18 +2,15 @@ package tool import ( "context" - "github.com/OpenListTeam/OpenList/v4/drivers/thunder_browser" - - _115_open "github.com/OpenListTeam/OpenList/v4/drivers/115_open" - "github.com/OpenListTeam/OpenList/v4/server/common" - "net/url" stdpath "path" "path/filepath" _115 "github.com/OpenListTeam/OpenList/v4/drivers/115" + _115_open "github.com/OpenListTeam/OpenList/v4/drivers/115_open" "github.com/OpenListTeam/OpenList/v4/drivers/pikpak" "github.com/OpenListTeam/OpenList/v4/drivers/thunder" + "github.com/OpenListTeam/OpenList/v4/drivers/thunder_browser" "github.com/OpenListTeam/OpenList/v4/drivers/thunderx" "github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/errs" @@ -22,6 +19,7 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/setting" "github.com/OpenListTeam/OpenList/v4/internal/task" + "github.com/OpenListTeam/OpenList/v4/server/common" "github.com/google/uuid" "github.com/pkg/errors" ) @@ -87,7 +85,7 @@ func AddURL(ctx context.Context, args *AddURLArgs) (task.TaskExtensionInfo, erro } uid := uuid.NewString() - tempDir := filepath.Join(conf.Conf.TempDir, args.Tool, uid) + tempDir := filepath.Join(conf.GetPersistentTempDir(), args.Tool, uid) deletePolicy := args.DeletePolicy // 如果当前 storage 是对应网盘,则直接下载到目标路径,无需转存 diff --git a/server/handles/fsup.go b/server/handles/fsup.go index 71d9dbae7..bddb1051e 100644 --- a/server/handles/fsup.go +++ b/server/handles/fsup.go @@ -1,6 +1,7 @@ package handles import ( + "fmt" "io" "net/url" stdpath "path" @@ -8,8 +9,10 @@ import ( "time" "github.com/OpenListTeam/OpenList/v4/internal/conf" + "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/fs" "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/model/reqres" "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/internal/task" "github.com/OpenListTeam/OpenList/v4/pkg/utils" @@ -206,3 +209,135 @@ func FsForm(c *gin.Context) { "task": getTaskInfo(t), }) } + +// 分片上传流程如下 +// 1. 客户端调用FsUpHash获取上传所需的信息(目前主要是hash信息) +// 2. 根据获取到的hash信息,客户端调用FsPreup上传必要的参数,获取分片大小,及需上传的分片列表 +// 3. 客户端根据分片列表进行分片上传,如果分片是第一个,且需要sliceHash,那么需要把所有分片的hash带上 +// 4. 如果中途出现问题,可以重新进行分片上传流程,后端根据记录的信息进行恢复 +// 如果网盘不支持分片上传,则会进行本地中转,对客户端来说,仍然是分片上传 + +// FsUpInfo 获取上传所需的信息 +func FsUpInfo(c *gin.Context) { + storage := c.Request.Context().Value(conf.StorageKey) + + uh := &model.UploadInfo{ + SliceHashNeed: false, + HashMd5Need: true, + } + switch s := storage.(type) { + case driver.IUploadInfo: + uh = s.GetUploadInfo() + } + common.SuccessResp(c, uh) +} + +// FsPreup 预上传 +func FsPreup(c *gin.Context) { + req := &reqres.PreupReq{} + err := c.ShouldBindJSON(req) + if err != nil { + common.ErrorResp(c, fmt.Errorf("invalid request body: %w", err), 400) + return + } + + if req.Name == "" { + common.ErrorResp(c, fmt.Errorf("file name is required"), 400) + return + } + if req.Size <= 0 { + common.ErrorResp(c, fmt.Errorf("file size must be greater than 0"), 400) + return + } + + storage := c.Request.Context().Value(conf.StorageKey).(driver.Driver) + path := c.Request.Context().Value(conf.PathKey).(string) + if !req.Overwrite { + fullPath := utils.FixAndCleanPath(stdpath.Join(path, req.Name)) + if res, _ := fs.Get(c.Request.Context(), fullPath, &fs.GetArgs{NoLog: true}); res != nil { + common.ErrorStrResp(c, "file exists", 403) + return + } + } + + res, err := fs.Preup(c.Request.Context(), storage, path, req) + if err != nil { + common.ErrorResp(c, fmt.Errorf("preup failed: %w", err), 500) + return + } + common.SuccessResp(c, res) +} + +// FsUpSlice 流式上传分片 - 使用PUT方法进行流式上传,避免表单上传的内存占用 +func FsUpSlice(c *gin.Context) { + defer func() { + if n, _ := io.ReadFull(c.Request.Body, []byte{0}); n == 1 { + _, _ = utils.CopyWithBuffer(io.Discard, c.Request.Body) + } + _ = c.Request.Body.Close() + }() + taskID := c.GetHeader("X-Task-ID") + if taskID == "" { + common.ErrorResp(c, fmt.Errorf("X-Task-ID header is required"), 400) + return + } + + sliceNumStr := c.GetHeader("X-Slice-Num") + if sliceNumStr == "" { + common.ErrorResp(c, fmt.Errorf("X-Slice-Num header is required"), 400) + return + } + + sliceNum, err := strconv.ParseUint(sliceNumStr, 10, 32) + if err != nil { + common.ErrorResp(c, fmt.Errorf("invalid X-Slice-Num: %w", err), 400) + return + } + + sliceHash := c.GetHeader("X-Slice-Hash") + + req := &reqres.UploadSliceReq{ + TaskID: taskID, + SliceHash: sliceHash, + SliceNum: uint(sliceNum), + } + + reader := c.Request.Body + if reader == nil { + common.ErrorResp(c, fmt.Errorf("request body is required"), 400) + return + } + + storage := c.Request.Context().Value(conf.StorageKey).(driver.Driver) + + err = fs.UploadSlice(c.Request.Context(), storage, req, reader) + if err != nil { + common.ErrorResp(c, fmt.Errorf("upload slice failed: %w", err), 500) + return + } + + common.SuccessResp(c) +} + +// FsUpSliceComplete 上传分片完成 +func FsUpSliceComplete(c *gin.Context) { + req := &reqres.UploadSliceCompleteReq{} + err := c.ShouldBindJSON(req) + if err != nil { + common.ErrorResp(c, fmt.Errorf("invalid request body: %w", err), 400) + return + } + + if req.TaskID == "" { + common.ErrorResp(c, fmt.Errorf("task_id is required"), 400) + return + } + + storage := c.Request.Context().Value(conf.StorageKey).(driver.Driver) + rsp, err := fs.SliceUpComplete(c.Request.Context(), storage, req.TaskID) + if err != nil { + common.ErrorResp(c, fmt.Errorf("slice upload complete failed: %w", err), 500) + return + } + common.SuccessResp(c, rsp) +} diff --git a/server/middlewares/fs.go b/server/middlewares/fs.go new file mode 100644 index 000000000..7556219dc --- /dev/null +++ b/server/middlewares/fs.go @@ -0,0 +1,94 @@ +package middlewares + +import ( + "net/url" + stdpath "path" + + "github.com/OpenListTeam/OpenList/v4/internal/conf" + "github.com/OpenListTeam/OpenList/v4/internal/errs" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/op" + "github.com/OpenListTeam/OpenList/v4/server/common" + "github.com/gin-gonic/gin" + "github.com/pkg/errors" +) + +// 文件操作相关中间件 +// 把文件路径校验及鉴权统一放在这里处理,并将处理结果放到上下文里,避免后续重复处理 +// // Middleware for file operations +// Centralizes file path validation and authentication here, and stores the results in the context +// to avoid redundant processing in subsequent steps + +type permissionFunc func(user *model.User, meta *model.Meta, path string, password string) bool + +func FsUp(c *gin.Context) { + fs(c, false, func(user *model.User, meta *model.Meta, path string, password string) bool { + return common.CanAccess(user, meta, path, password) && (user.CanWrite() || common.CanWrite(meta, stdpath.Dir(path))) + }) +} +func FsRename(c *gin.Context) { + fs(c, true, func(user *model.User, meta *model.Meta, path string, password string) bool { + return user.CanRename() + }) +} +func FsRemove(c *gin.Context) { + fs(c, true, func(user *model.User, meta *model.Meta, path string, password string) bool { + return user.CanRemove() + }) +} +func FsSliceUp(c *gin.Context) { + fs(c, true, func(user *model.User, meta *model.Meta, path string, password string) bool { + return common.CanAccess(user, meta, path, password) && (user.CanWrite() || common.CanWrite(meta, stdpath.Dir(path))) + }) +} + +func fs(c *gin.Context, withstorage bool, permission permissionFunc) { + path := c.GetHeader("File-Path") + password := c.GetHeader("Password") + path, err := url.PathUnescape(path) + if err != nil { + common.ErrorResp(c, err, 400) + c.Abort() + return + } + user := c.Request.Context().Value(conf.UserKey).(*model.User) + path, err = user.JoinPath(path) + if err != nil { + common.ErrorResp(c, err, 403) + c.Abort() + return + } + meta, err := op.GetNearestMeta(stdpath.Dir(path)) + if err != nil { + if !errors.Is(errors.Cause(err), errs.MetaNotFound) { + common.ErrorResp(c, err, 500, true) + c.Abort() + return + } + } + + if !permission(user, meta, path, password) { + common.ErrorResp(c, errs.PermissionDenied, 403) + c.Abort() + return + } + + if withstorage { + storage, actualPath, err := op.GetStorageAndActualPath(path) + if err != nil { + common.ErrorResp(c, err, 400) + c.Abort() + return + } + if storage.Config().NoUpload { + common.ErrorStrResp(c, "Current storage doesn't support upload", 403) + c.Abort() + return + } + common.GinWithValue(c, conf.StorageKey, storage) + common.GinWithValue(c, conf.PathKey, actualPath) //这里的路径已经是网盘真实路径了 + + } + + c.Next() +} diff --git a/server/middlewares/fsup.go b/server/middlewares/fsup.go deleted file mode 100644 index 08b160ee5..000000000 --- a/server/middlewares/fsup.go +++ /dev/null @@ -1,45 +0,0 @@ -package middlewares - -import ( - "net/url" - stdpath "path" - - "github.com/OpenListTeam/OpenList/v4/internal/conf" - "github.com/OpenListTeam/OpenList/v4/internal/errs" - "github.com/OpenListTeam/OpenList/v4/internal/model" - "github.com/OpenListTeam/OpenList/v4/internal/op" - "github.com/OpenListTeam/OpenList/v4/server/common" - "github.com/gin-gonic/gin" - "github.com/pkg/errors" -) - -func FsUp(c *gin.Context) { - path := c.GetHeader("File-Path") - password := c.GetHeader("Password") - path, err := url.PathUnescape(path) - if err != nil { - common.ErrorResp(c, err, 400) - c.Abort() - return - } - user := c.Request.Context().Value(conf.UserKey).(*model.User) - path, err = user.JoinPath(path) - if err != nil { - common.ErrorResp(c, err, 403) - return - } - meta, err := op.GetNearestMeta(stdpath.Dir(path)) - if err != nil { - if !errors.Is(errors.Cause(err), errs.MetaNotFound) { - common.ErrorResp(c, err, 500, true) - c.Abort() - return - } - } - if !(common.CanAccess(user, meta, path, password) && (user.CanWrite() || common.CanWrite(meta, stdpath.Dir(path)))) { - common.ErrorResp(c, errs.PermissionDenied, 403) - c.Abort() - return - } - c.Next() -} diff --git a/server/router.go b/server/router.go index 66f0539ba..a88f36cbe 100644 --- a/server/router.go +++ b/server/router.go @@ -205,6 +205,12 @@ func _fs(g *gin.RouterGroup) { g.PUT("/put", middlewares.FsUp, uploadLimiter, handles.FsStream) g.PUT("/form", middlewares.FsUp, uploadLimiter, handles.FsForm) g.POST("/link", middlewares.AuthAdmin, handles.Link) + + g.GET("/upload/info", middlewares.FsSliceUp, handles.FsUpInfo) + g.POST("/preup", middlewares.FsSliceUp, handles.FsPreup) + g.PUT("/slice_upload", middlewares.FsSliceUp, handles.FsUpSlice) + g.POST("/slice_upload_complete", middlewares.FsSliceUp, handles.FsUpSliceComplete) + // g.POST("/add_aria2", handles.AddOfflineDownload) // g.POST("/add_qbit", handles.AddQbittorrent) // g.POST("/add_transmission", handles.SetTransmission)