Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion drivers/115/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (d *Pan115) UploadByMultipart(ctx context.Context, params *driver115.Upload
if _, err = tmpF.ReadAt(buf, chunk.Offset); err != nil && !errors.Is(err, io.EOF) {
continue
}
if part, err = bucket.UploadPart(imur, driver.NewLimitedUploadStream(ctx, bytes.NewBuffer(buf)),
if part, err = bucket.UploadPart(imur, driver.NewLimitedUploadStream(ctx, bytes.NewReader(buf)),
chunk.Size, chunk.Number, driver115.OssOption(params, ossToken)...); err == nil {
break
}
Expand Down
21 changes: 3 additions & 18 deletions drivers/123/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package _123

import (
"context"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"sync"
Expand All @@ -18,6 +15,7 @@ import (
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/errs"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/stream"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
Expand Down Expand Up @@ -187,25 +185,12 @@ func (d *Pan123) Remove(ctx context.Context, obj model.Obj) error {

func (d *Pan123) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) error {
etag := file.GetHash().GetHash(utils.MD5)
var err error
if len(etag) < utils.MD5.Width {
// const DEFAULT int64 = 10485760
h := md5.New()
// need to calculate md5 of the full content
tempFile, err := file.CacheFullInTempFile()
_, etag, err = stream.CacheFullInTempFileAndHash(file, utils.MD5)
if err != nil {
return err
}
defer func() {
_ = tempFile.Close()
}()
if _, err = utils.CopyWithBuffer(h, tempFile); err != nil {
return err
}
_, err = tempFile.Seek(0, io.SeekStart)
if err != nil {
return err
}
etag = hex.EncodeToString(h.Sum(nil))
}
data := base.Json{
"driveId": 0,
Expand Down
34 changes: 20 additions & 14 deletions drivers/123/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"math"
"net/http"
"strconv"

Expand Down Expand Up @@ -70,27 +69,33 @@ func (d *Pan123) completeS3(ctx context.Context, upReq *UploadResp, file model.F
}

func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.FileStreamer, up driver.UpdateProgress) error {
chunkSize := int64(1024 * 1024 * 16)
tmpF, err := file.CacheFullInTempFile()
if err != nil {
return err
}
// fetch s3 pre signed urls
chunkCount := int(math.Ceil(float64(file.GetSize()) / float64(chunkSize)))
size := file.GetSize()
chunkSize := min(size, 16*utils.MB)
chunkCount := int(size / chunkSize)
lastChunkSize := size % chunkSize
if lastChunkSize > 0 {
chunkCount++
} else {
lastChunkSize = chunkSize
}
// only 1 batch is allowed
isMultipart := chunkCount > 1
batchSize := 1
getS3UploadUrl := d.getS3Auth
if isMultipart {
if chunkCount > 1 {
batchSize = 10
getS3UploadUrl = d.getS3PreSignedUrls
}
limited := driver.NewLimitedUploadStream(ctx, file)
for i := 1; i <= chunkCount; i += batchSize {
if utils.IsCanceled(ctx) {
return ctx.Err()
}
start := i
end := i + batchSize
if end > chunkCount+1 {
end = chunkCount + 1
}
end := min(i+batchSize, chunkCount+1)
s3PreSignedUrls, err := getS3UploadUrl(ctx, upReq, start, end)
if err != nil {
return err
Expand All @@ -102,9 +107,9 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
}
curSize := chunkSize
if j == chunkCount {
curSize = file.GetSize() - (int64(chunkCount)-1)*chunkSize
curSize = lastChunkSize
}
err = d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, j, end, io.LimitReader(limited, chunkSize), curSize, false, getS3UploadUrl)
err = d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, j, end, io.NewSectionReader(tmpF, chunkSize*int64(j-1), curSize), curSize, false, getS3UploadUrl)
if err != nil {
return err
}
Expand All @@ -115,12 +120,12 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
return d.completeS3(ctx, upReq, file, chunkCount > 1)
}

func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSignedUrls *S3PreSignedURLs, cur, end int, reader io.Reader, curSize int64, retry bool, getS3UploadUrl func(ctx context.Context, upReq *UploadResp, start int, end int) (*S3PreSignedURLs, error)) error {
func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSignedUrls *S3PreSignedURLs, cur, end int, reader *io.SectionReader, curSize int64, retry bool, getS3UploadUrl func(ctx context.Context, upReq *UploadResp, start int, end int) (*S3PreSignedURLs, error)) error {
uploadUrl := s3PreSignedUrls.Data.PreSignedUrls[strconv.Itoa(cur)]
if uploadUrl == "" {
return fmt.Errorf("upload url is empty, s3PreSignedUrls: %+v", s3PreSignedUrls)
}
req, err := http.NewRequest("PUT", uploadUrl, reader)
req, err := http.NewRequest("PUT", uploadUrl, driver.NewLimitedUploadStream(ctx, reader))
if err != nil {
return err
}
Expand All @@ -143,6 +148,7 @@ func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSign
}
s3PreSignedUrls.Data.PreSignedUrls = newS3PreSignedUrls.Data.PreSignedUrls
// retry
reader.Seek(0, io.SeekStart)
return d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, cur, end, reader, curSize, true, getS3UploadUrl)
}
if res.StatusCode != http.StatusOK {
Expand Down
106 changes: 46 additions & 60 deletions drivers/139/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ package _139

import (
"context"
"encoding/base64"
"fmt"
"io"
"net/http"
"path"
"strconv"
"strings"
"time"

"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/errs"
"github.com/alist-org/alist/v3/internal/model"
streamPkg "github.com/alist-org/alist/v3/internal/stream"
"github.com/alist-org/alist/v3/pkg/cron"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/alist-org/alist/v3/pkg/utils/random"
Expand Down Expand Up @@ -71,28 +70,29 @@ func (d *Yun139) Init(ctx context.Context) error {
default:
return errs.NotImplement
}
if d.ref != nil {
return nil
}
decode, err := base64.StdEncoding.DecodeString(d.Authorization)
if err != nil {
return err
}
decodeStr := string(decode)
splits := strings.Split(decodeStr, ":")
if len(splits) < 2 {
return fmt.Errorf("authorization is invalid, splits < 2")
}
d.Account = splits[1]
_, err = d.post("/orchestration/personalCloud/user/v1.0/qryUserExternInfo", base.Json{
"qryUserExternInfoReq": base.Json{
"commonAccountInfo": base.Json{
"account": d.getAccount(),
"accountType": 1,
},
},
}, nil)
return err
// if d.ref != nil {
// return nil
// }
// decode, err := base64.StdEncoding.DecodeString(d.Authorization)
// if err != nil {
// return err
// }
// decodeStr := string(decode)
// splits := strings.Split(decodeStr, ":")
// if len(splits) < 2 {
// return fmt.Errorf("authorization is invalid, splits < 2")
// }
// d.Account = splits[1]
// _, err = d.post("/orchestration/personalCloud/user/v1.0/qryUserExternInfo", base.Json{
// "qryUserExternInfoReq": base.Json{
// "commonAccountInfo": base.Json{
// "account": d.getAccount(),
// "accountType": 1,
// },
// },
// }, nil)
// return err
return nil
}

func (d *Yun139) InitReference(storage driver.Driver) error {
Expand Down Expand Up @@ -502,53 +502,42 @@ func (d *Yun139) Remove(ctx context.Context, obj model.Obj) error {
}
}

const (
_ = iota //ignore first value by assigning to blank identifier
KB = 1 << (10 * iota)
MB
GB
TB
)

func (d *Yun139) getPartSize(size int64) int64 {
if d.CustomUploadPartSize != 0 {
return d.CustomUploadPartSize
}
// 网盘对于分片数量存在上限
if size/GB > 30 {
return 512 * MB
if size/utils.GB > 30 {
return 512 * utils.MB
}
return 100 * MB
return 100 * utils.MB
}

func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
switch d.Addition.Type {
case MetaPersonalNew:
var err error
fullHash := stream.GetHash().GetHash(utils.SHA256)
if len(fullHash) <= 0 {
tmpF, err := stream.CacheFullInTempFile()
if err != nil {
return err
}
fullHash, err = utils.HashFile(utils.SHA256, tmpF)
if len(fullHash) != utils.SHA256.Width {
_, fullHash, err = streamPkg.CacheFullInTempFileAndHash(stream, utils.SHA256)
if err != nil {
return err
}
}

partInfos := []PartInfo{}
var partSize = d.getPartSize(stream.GetSize())
part := (stream.GetSize() + partSize - 1) / partSize
if part == 0 {
part = 1
size := stream.GetSize()
var partSize = d.getPartSize(size)
part := size / partSize
if size%partSize > 0 {
part++
}
partInfos := make([]PartInfo, 0, part)
Comment thread
j2rong4cn marked this conversation as resolved.
Comment thread
j2rong4cn marked this conversation as resolved.
for i := int64(0); i < part; i++ {
if utils.IsCanceled(ctx) {
return ctx.Err()
}
start := i * partSize
byteSize := stream.GetSize() - start
byteSize := size - start
if byteSize > partSize {
byteSize = partSize
}
Expand Down Expand Up @@ -576,7 +565,7 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
"contentType": "application/octet-stream",
"parallelUpload": false,
"partInfos": firstPartInfos,
"size": stream.GetSize(),
"size": size,
"parentFileId": dstDir.GetID(),
"name": stream.GetName(),
"type": "file",
Expand Down Expand Up @@ -629,7 +618,7 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
}

// Progress
p := driver.NewProgress(stream.GetSize(), up)
p := driver.NewProgress(size, up)

rateLimited := driver.NewLimitedUploadStream(ctx, stream)
// 上传所有分片
Expand Down Expand Up @@ -780,13 +769,13 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
return err
}

size := stream.GetSize()
// Progress
p := driver.NewProgress(stream.GetSize(), up)

var partSize = d.getPartSize(stream.GetSize())
part := (stream.GetSize() + partSize - 1) / partSize
if part == 0 {
part = 1
p := driver.NewProgress(size, up)
var partSize = d.getPartSize(size)
part := size / partSize
if size%partSize > 0 {
part++
Comment thread
j2rong4cn marked this conversation as resolved.
}
rateLimited := driver.NewLimitedUploadStream(ctx, stream)
for i := int64(0); i < part; i++ {
Expand All @@ -795,10 +784,7 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
}

start := i * partSize
byteSize := stream.GetSize() - start
if byteSize > partSize {
byteSize = partSize
}
byteSize := min(size-start, partSize)

limitReader := io.LimitReader(rateLimited, byteSize)
// Update Progress
Expand All @@ -810,7 +796,7 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr

req = req.WithContext(ctx)
req.Header.Set("Content-Type", "text/plain;name="+unicode(stream.GetName()))
req.Header.Set("contentSize", strconv.FormatInt(stream.GetSize(), 10))
req.Header.Set("contentSize", strconv.FormatInt(size, 10))
req.Header.Set("range", fmt.Sprintf("bytes=%d-%d", start, start+byteSize-1))
req.Header.Set("uploadtaskID", resp.Data.UploadResult.UploadTaskID)
req.Header.Set("rangeType", "0")
Expand Down
1 change: 1 addition & 0 deletions drivers/139/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (d *Yun139) refreshToken() error {
if len(splits) < 3 {
return fmt.Errorf("authorization is invalid, splits < 3")
}
d.Account = splits[1]
strs := strings.Split(splits[2], "|")
if len(strs) < 4 {
return fmt.Errorf("authorization is invalid, strs < 4")
Expand Down
Loading
Loading